dsmlily opened a new issue #144: For a pulsar.Client, Once Client.Subscribe() 
times out, all subsequent Client.Subscribe() fail
URL: https://github.com/apache/pulsar-client-go/issues/144
 
 
   #### Expected behavior
   
   When the network is restored, the subscription is successfully created
   
   #### Actual behavior
   
   When the network is restored, creating a subscription fails
   
   #### Steps to reproduce
   ##### 1、code file  test.go
   
   func main(){
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://10.32.32.20:6650",
                ConnectionTimeout:time.Second * time.Duration(2),
                OperationTimeout: time.Second * time.Duration(2),
        })
        if err != nil {
                log.Fatal(err)
        }
   
        GConsumerClient = client
   
        defer client.Close()
        for i := 0; i < 10; i++ {
                index := i
                time.Sleep(time.Second*2)
                roomId := strconv.Itoa(index)
                subscriptionName := fmt.Sprintf("sub-name-%s", roomId)
                c, e := initRoomPulsarConsumer(roomId,subscriptionName)
                if e != nil {
                        fmt.Printf("create faild  -- %d\n", index)
                        //wg.Done()
                        //log.Fatal(e)
                        continue
                }
                time.Sleep(time.Second*1)
                go func(con pulsar.Consumer,sb string) {
                        fmt.Printf("to close room %s.....\n",sb)
                        err := con.Unsubscribe()
                        if err != nil {
                                fmt.Printf("******* Unsubscribe error  
%s.....\n",err.Error())
                        }
                        con.Close()
                        fmt.Printf("to close room [%s close done***]\n",sb)
                }(c,subscriptionName)
                fmt.Printf("wait group -- %d done\n", index)    
        }
   }
   
   func initRoomPulsarConsumer(roomId string, subscriptionName string) 
(consumer pulsar.Consumer, err error) {
        fmt.Printf("to creat room consumer SubscriptionName:%s 
.....\n",subscriptionName)
        cfg := pulsar.ConsumerOptions{
                Topic:               fmt.Sprintf("dev_pid2/signal_geo/%s", 
roomId),
                SubscriptionName:    subscriptionName,
                Type:                pulsar.Failover,
        }
      
        consumer, err = GConsumerClient.Subscribe(cfg)
        if err != nil {
                fmt.Printf("*****   could not establish subscription %v, err: 
%s", roomId, err.Error())
                return nil, err
        }
        fmt.Printf("creat room consumer done, SubscriptionName:%s  ***creat 
done\n",cfg.SubscriptionName)
        return
   }
   
   ##### 2、steps
   
   - go run test.go
   
   - When the first call to subscribe (), the network packet loss simulation is 
started. After the fourth call to subscribe (), the network packet loss 
simulation is stopped.
   
   ##### 3、logs
   $ go run test.go
   to creat room consumer SubscriptionName:sub-name-0 .....
   INFO[0002] Connecting to broker                          
remote_addr="pulsar://10.32.32.20:6650"
   INFO[0002] TCP connection established                    
local_addr="172.23.111.52:63844" remote_addr="pulsar://10.32.32.20:6650"
   INFO[0002] Connection is ready                           
local_addr="172.23.111.52:63844" remote_addr="pulsar://10.32.32.20:6650"
   INFO[0002] Connecting to broker                          
remote_addr="pulsar://10.32.32.20:6652"
   INFO[0002] TCP connection established                    
local_addr="172.23.111.52:63845" remote_addr="pulsar://10.32.32.20:6652"
   INFO[0002] Connection is ready                           
local_addr="172.23.111.52:63845" remote_addr="pulsar://10.32.32.20:6652"
   INFO[0002] Connected consumer                            name=cuvse 
subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
   INFO[0002] Created consumer                              name=cuvse 
subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
   creat room consumer done, SubscriptionName:sub-name-0  ***creat done
   wait group -- 0 done
   to close room sub-name-0.....
   INFO[0003] Closing consumer=1                            name=cuvse 
subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
   WARN[0003] Failed to close consumer                      error="server 
error: MetadataError: Consumer not found" name=cuvse subscription=sub-name-0 
topic="persistent://dev_pid2/signal_geo/0"
   INFO[0003] exiting events loop                           name=cuvse 
subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
   to close room [sub-name-0 close done***]
   INFO[0003] exiting dispatch loop                         name=cuvse 
subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
   to creat room consumer SubscriptionName:sub-name-1 .....
   WARN[0007] Failed to lookup topic                        error="request 
timed out" name=gqivy subscription=sub-name-1 
topic="persistent://dev_pid2/signal_geo/1"
   ERRO[0007] Failed to create consumer                     error="request 
timed out"
   *****   could not establish subscription 1, err: request timed outcreate 
faild  -- 1
   to creat room consumer SubscriptionName:sub-name-2 .....
   *****   could not establish subscription 2, err: request timed outcreate 
faild  -- 2
   to creat room consumer SubscriptionName:sub-name-3 .....
   *****   could not establish subscription 3, err: request timed outcreate 
faild  -- 3
   to creat room consumer SubscriptionName:sub-name-4 .....
   *****   could not establish subscription 4, err: request timed outcreate 
faild  -- 4
   to creat room consumer SubscriptionName:sub-name-5 .....
   *****   could not establish subscription 5, err: request timed outcreate 
faild  -- 5
   to creat room consumer SubscriptionName:sub-name-6 .....
   *****   could not establish subscription 6, err: request timed outcreate 
faild  -- 6
   to creat room consumer SubscriptionName:sub-name-7 .....
   *****   could not establish subscription 7, err: request timed outcreate 
faild  -- 7
   to creat room consumer SubscriptionName:sub-name-8 .....
   *****   could not establish subscription 8, err: request timed outcreate 
faild  -- 8
   to creat room consumer SubscriptionName:sub-name-9 .....
   *****   could not establish subscription 9, err: request timed outcreate 
faild  -- 9
   
   #### System configuration
   **Pulsar version**: 2.4.0
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to