wangzeping722 opened a new issue, #897:
URL: https://github.com/apache/rocketmq-client-go/issues/897

   **BUG REPORT**  
   1. Please describe the issue you observed:
   
       - What did you do (The steps to reproduce)?
   Client consumes message for broker. Sometime, 
       - What did you expect to see?
   consumer keep consuming messages.
       - What did you see instead?
   all consumer goroutines block on `consumer/process_queue.go:201 
pq.mutex.Lock()`,  when `updateProcessQueueTable` is called and meet 
`pq.isPullExpired() && dc.cType == _PushConsume`, the channel——`pq.closeChan` 
will be closed, but the receiver doesn't release `pq.mutex`. 
   
   please look the deadlock of the following pprof log.
   2. Please tell us about your environment:
   
        - What is your OS?
   
        - What is your client version?
   
        - What is your RocketMQ version?
   
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions on how to fix, etc):
   pprof:
   ``` txt
   goroutine profile: total 73
   20 @ 0x1045aeafc 0x1045aeb88 0x1045bf7dc 0x1045d9fa8 0x1045e6a50 0x1045e66d4 
0x1045e8e7c 0x1049905f4 0x10499c5c4 0x104937f1c 0x1045de424
   #    0x1045d9fa7     sync.runtime_SemacquireMutex+0x27                       
                                                
/Users/wero/.g/versions/1.18.3/src/runtime/sema.go:71
   #    0x1045e6a4f     sync.(*Mutex).lockSlow+0x34f                            
                                                
/Users/wero/.g/versions/1.18.3/src/sync/mutex.go:162
   #    0x1045e66d3     sync.(*Mutex).Lock+0xa3                                 
                                                
/Users/wero/.g/versions/1.18.3/src/sync/mutex.go:81
   #    0x1045e8e7b     sync.(*RWMutex).Lock+0x2b                               
                                                
/Users/wero/.g/versions/1.18.3/src/sync/rwmutex.go:139
   #    0x1049905f3     
github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).removeMessage+0x53
                     
/Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:201
   #    0x10499c5c3     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).consumeMessageCurrently.func1+0xba3
    /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:1101
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
                        
/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   5 @ 0x1045aeafc 0x1045becfc 0x10499f920 0x104937f1c 0x1045de424
   #    0x10499f91f     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func4+0x15f
       /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:218
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   5 @ 0x1045aeafc 0x1045becfc 0x10499fb64 0x104937f1c 0x1045de424
   #    0x10499fb63     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func3+0x103
       /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:204
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   5 @ 0x1045aeafc 0x1045becfc 0x10499fda4 0x104937f1c 0x1045de424
   #    0x10499fda3     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func2+0x103
       /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:191
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   5 @ 0x1045aeafc 0x1045becfc 0x10499ffe4 0x104937f1c 0x1045de424
   #    0x10499ffe3     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func1+0x103
       /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:178
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   5 @ 0x1045aeafc 0x1045daee8 0x10499f308 0x104937f1c 0x1045de424
   #    0x1045daee7     time.Sleep+0x117                                        
                                
/Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
   #    0x10499f307     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func6+0x87
        /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:242
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   5 @ 0x1045aeafc 0x1045daee8 0x10499f5a8 0x104937f1c 0x1045de424
   #    0x1045daee7     time.Sleep+0x117                                        
                                
/Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
   #    0x10499f5a7     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func5+0x87
        /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:228
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   3 @ 0x1045aeafc 0x10457a93c 0x10457a728 0x10499b810 0x104999af0 0x104937f1c 
0x1045de424
   #    0x10499b80f     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).consumeMessageCurrently+0x1ff
  /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:1029
   #    0x104999aef     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).pullMessage.func1+0x15f
        
/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:580
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
                
/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   3 @ 0x1045aeafc 0x1045becfc 0x10498fae0 0x104999130 0x104993e64 0x1045de424
   #    0x10498fadf     
github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).putMessage+0x13f
       /Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:105
   #    0x10499912f     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).pullMessage+0x23ef
     /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:813
   #    0x104993e63     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.1.1+0x33
   /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:164
   
   2 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 
0x10472ffe4 0x1047417e0 0x104645684 0x104645830 0x104943518 0x1049431b4 
0x104937f1c 0x1045de424
   #    0x1045d8b37     internal/poll.runtime_pollWait+0x47                     
                                        
/Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
   #    0x10464b0f7     internal/poll.(*pollDesc).wait+0x87                     
                                        
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
   #    0x10464b187     internal/poll.(*pollDesc).waitRead+0x37                 
                                        
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
   #    0x10464c39b     internal/poll.(*FD).Read+0x33b                          
                                        
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
   #    0x10472ffe3     net.(*netFD).Read+0x53                                  
                                        
/Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
   #    0x1047417df     net.(*conn).Read+0x6f                                   
                                        
/Users/wero/.g/versions/1.18.3/src/net/net.go:183
   #    0x104645683     io.ReadAtLeast+0x133                                    
                                        
/Users/wero/.g/versions/1.18.3/src/io/io.go:331
   #    0x10464582f     io.ReadFull+0x5f                                        
                                        
/Users/wero/.g/versions/1.18.3/src/io/io.go:350
   #    0x104943517     
github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).receiveResponse+0x2d7
 
/Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:195
   #    0x1049431b3     
github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).connect.func1+0x33
    
/Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:165
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
                
/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   2 @ 0x1045aeafc 0x1045aeb88 0x1045bf7dc 0x1045d9fa8 0x1045e8c48 0x104991e40 
0x10499570c 0x1045e6174 0x104995008 0x104962950 0x10495b92c 0x104943d28 
0x104937f1c 0x1045de424
   #    0x1045d9fa7     sync.runtime_SemacquireMutex+0x27                       
                                                
/Users/wero/.g/versions/1.18.3/src/runtime/sema.go:71
   #    0x1045e8c47     sync.(*RWMutex).RLock+0x97                              
                                                
/Users/wero/.g/versions/1.18.3/src/sync/rwmutex.go:63
   #    0x104991e3f     
github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).currentInfo+0x4f
                       
/Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:400
   #    0x10499570b     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).GetConsumerRunningInfo.func2+0x10b
     /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:414
   #    0x1045e6173     sync.(*Map).Range+0x2a3                                 
                                                
/Users/wero/.g/versions/1.18.3/src/sync/map.go:347
   #    0x104995007     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).GetConsumerRunningInfo+0xe7
            
/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:411
   #    0x10496294f     
github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).getConsumerRunningInfo+0xcf
               
/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:911
   #    0x10495b92b     
github.com/apache/rocketmq-client-go/v2/internal.GetOrNewRocketMQClient.func3+0x18b
                     
/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:277
   #    0x104943d27     
github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).processCMD.func2+0x87
         
/Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:244
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
                        
/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   1 @ 0x1045aeafc 0x10457b308 0x10457b064 0x1049c6aac 0x1045ae714 0x1045de424
   #    0x1049c6aab     main.main+0x33b         
/Users/wero/workspace/go/rocketmq-client-go/examples/consumer/simple/main.go:59
   #    0x1045ae713     runtime.main+0x223      
/Users/wero/.g/versions/1.18.3/src/runtime/proc.go:250
   
   1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 
0x10472ffe4 0x1047417e0 0x1048faf14 0x1045de424
   #    0x1045d8b37     internal/poll.runtime_pollWait+0x47             
/Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
   #    0x10464b0f7     internal/poll.(*pollDesc).wait+0x87             
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
   #    0x10464b187     internal/poll.(*pollDesc).waitRead+0x37         
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
   #    0x10464c39b     internal/poll.(*FD).Read+0x33b                  
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
   #    0x10472ffe3     net.(*netFD).Read+0x53                          
/Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
   #    0x1047417df     net.(*conn).Read+0x6f                           
/Users/wero/.g/versions/1.18.3/src/net/net.go:183
   #    0x1048faf13     net/http.(*connReader).backgroundRead+0x73      
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:672
   
   1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 
0x10472ffe4 0x1047417e0 0x1048fb6dc 0x104779234 0x10477aa04 0x10477aa98 
0x10487fa2c 0x10487f92c 0x1048f55a0 0x1048fcab4 0x104902c80 0x1045de424
   #    0x1045d8b37     internal/poll.runtime_pollWait+0x47             
/Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
   #    0x10464b0f7     internal/poll.(*pollDesc).wait+0x87             
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
   #    0x10464b187     internal/poll.(*pollDesc).waitRead+0x37         
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
   #    0x10464c39b     internal/poll.(*FD).Read+0x33b                  
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
   #    0x10472ffe3     net.(*netFD).Read+0x53                          
/Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
   #    0x1047417df     net.(*conn).Read+0x6f                           
/Users/wero/.g/versions/1.18.3/src/net/net.go:183
   #    0x1048fb6db     net/http.(*connReader).Read+0x1fb               
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:780
   #    0x104779233     bufio.(*Reader).fill+0x233                      
/Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:106
   #    0x10477aa03     bufio.(*Reader).ReadSlice+0x353                 
/Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:371
   #    0x10477aa97     bufio.(*Reader).ReadLine+0x47                   
/Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:400
   #    0x10487fa2b     net/textproto.(*Reader).readLineSlice+0x6b      
/Users/wero/.g/versions/1.18.3/src/net/textproto/reader.go:57
   #    0x10487f92b     net/textproto.(*Reader).ReadLine+0x3b           
/Users/wero/.g/versions/1.18.3/src/net/textproto/reader.go:38
   #    0x1048f559f     net/http.readRequest+0x5f                       
/Users/wero/.g/versions/1.18.3/src/net/http/request.go:1029
   #    0x1048fcab3     net/http.(*conn).readRequest+0x393              
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:988
   #    0x104902c7f     net/http.(*conn).serve+0xa6f                    
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:1891
   
   1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464cf6c 
0x10473157c 0x10474b644 0x104749ef0 0x10490831c 0x104907e44 0x1049090ec 
0x1045de424
   #    0x1045d8b37     internal/poll.runtime_pollWait+0x47     
/Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
   #    0x10464b0f7     internal/poll.(*pollDesc).wait+0x87     
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
   #    0x10464b187     internal/poll.(*pollDesc).waitRead+0x37 
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
   #    0x10464cf6b     internal/poll.(*FD).Accept+0x32b        
/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:614
   #    0x10473157b     net.(*netFD).accept+0x4b                
/Users/wero/.g/versions/1.18.3/src/net/fd_unix.go:172
   #    0x10474b643     net.(*TCPListener).accept+0x43          
/Users/wero/.g/versions/1.18.3/src/net/tcpsock_posix.go:139
   #    0x104749eef     net.(*TCPListener).Accept+0x4f          
/Users/wero/.g/versions/1.18.3/src/net/tcpsock.go:288
   #    0x10490831b     net/http.(*Server).Serve+0x3eb          
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:3039
   #    0x104907e43     net/http.(*Server).ListenAndServe+0x123 
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:2968
   #    0x1049090eb     net/http.ListenAndServe+0xfb            
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:3222
   
   1 @ 0x1045aeafc 0x1045becfc 0x104958b24 0x1045de424
   #    0x104958b23     github.com/patrickmn/go-cache.(*janitor).Run+0xa3       
/Users/wero/go/pkg/mod/github.com/patrickmn/[email protected]+incompatible/cache.go:1079
   
   1 @ 0x1045aeafc 0x1045becfc 0x10495e3b4 0x104937f1c 0x1045de424
   #    0x10495e3b3     
github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.5+0x103
       /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:498
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   1 @ 0x1045aeafc 0x1045becfc 0x10495e6dc 0x104937f1c 0x1045de424
   #    0x10495e6db     
github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.4+0x14b
       /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:482
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   1 @ 0x1045aeafc 0x1045becfc 0x10495ea68 0x104937f1c 0x1045de424
   #    0x10495ea67     
github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.3+0x147
       /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:450
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   1 @ 0x1045aeafc 0x1045becfc 0x10495edf8 0x104937f1c 0x1045de424
   #    0x10495edf7     
github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.2+0x147
       /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:426
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   1 @ 0x1045aeafc 0x1045becfc 0x10495f16c 0x104937f1c 0x1045de424
   #    0x10495f16b     
github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.1+0x14b
       /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:402
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   1 @ 0x1045aeafc 0x1045becfc 0x104993c44 0x1045de424
   #    0x104993c43     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.1+0xb3
     /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:161
   
   1 @ 0x1045aeafc 0x1045daee8 0x104993920 0x104937f1c 0x1045de424
   #    0x1045daee7     time.Sleep+0x117                                        
                                
/Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
   #    0x10499391f     
github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.2+0x6f
     /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:179
   #    0x104937f1b     
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b              
        /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
   
   1 @ 0x1045d8778 0x10497c704 0x10497c41c 0x104978150 0x1049c425c 0x1049c4d68 
0x104904348 0x104906594 0x104907bd4 0x104903720 0x1045de424
   #    0x1045d8777     runtime/pprof.runtime_goroutineProfileWithLabels+0x27   
/Users/wero/.g/versions/1.18.3/src/runtime/mprof.go:753
   #    0x10497c703     runtime/pprof.writeRuntimeProfile+0x113                 
/Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:725
   #    0x10497c41b     runtime/pprof.writeGoroutine+0x8b                       
/Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:685
   #    0x10497814f     runtime/pprof.(*Profile).WriteTo+0x7f                   
/Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:332
   #    0x1049c425b     net/http/pprof.handler.ServeHTTP+0x38b                  
/Users/wero/.g/versions/1.18.3/src/net/http/pprof/pprof.go:253
   #    0x1049c4d67     net/http/pprof.Index+0xb7                               
/Users/wero/.g/versions/1.18.3/src/net/http/pprof/pprof.go:371
   #    0x104904347     net/http.HandlerFunc.ServeHTTP+0x47                     
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:2084
   #    0x104906593     net/http.(*ServeMux).ServeHTTP+0x133                    
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:2462
   #    0x104907bd3     net/http.serverHandler.ServeHTTP+0x423                  
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:2916
   #    0x10490371f     net/http.(*conn).serve+0x150f                           
/Users/wero/.g/versions/1.18.3/src/net/http/server.go:1966
   
   ```
   there should Unlock
   
   
   **REPRODUCE**
   Run the consumer, then run the producer, and after a while there will be a 
deadlock for the following reasons.
   
   producer:
   ```go
   package main
   
   import (
        "context"
        "fmt"
        "os"
        "strconv"
   
        "github.com/apache/rocketmq-client-go/v2"
        "github.com/apache/rocketmq-client-go/v2/primitive"
        "github.com/apache/rocketmq-client-go/v2/producer"
   )
   
   // Package main implements a simple producer to send message.
   func main() {
        p, _ := rocketmq.NewProducer(
                
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
                producer.WithRetry(2),
        )
        err := p.Start()
        if err != nil {
                fmt.Printf("start producer error: %s", err.Error())
                os.Exit(1)
        }
        topic := "test"
   
        for i := 0; i < 100000; i++ {
                msg := &primitive.Message{
                        Topic: topic,
                        Body:  []byte("H" + strconv.Itoa(i)),
                }
                res, err := p.SendSync(context.Background(), msg)
   
                if err != nil {
                        fmt.Printf("send message error: %s\n", err)
                } else {
                        fmt.Printf("send message success: result=%s\n", 
res.String())
                }
        }
        err = p.Shutdown()
        if err != nil {
                fmt.Printf("shutdown producer error: %s", err.Error())
        }
   }
   ```
   
   consumer, If it doesn't reproduce, we can run it a few more times:
   ```go
   package main
   
   import (
        "context"
        "fmt"
        "github.com/apache/rocketmq-client-go/v2"
        "github.com/apache/rocketmq-client-go/v2/consumer"
        "github.com/apache/rocketmq-client-go/v2/primitive"
        "net/http"
        _ "net/http/pprof"
        "os"
   )
   
   func main() {
        sig := make(chan os.Signal)
        c, _ := rocketmq.NewPushConsumer(
                consumer.WithGroupName("testGroup"),
                
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
                consumer.WithConsumeMessageBatchMaxSize(5),
        )
        go http.ListenAndServe("0.0.0.0:6060", nil)
        err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
context.Context,
                msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
                for i := range msgs {
                        fmt.Printf("subscribe callback: %v \n", msgs[i].MsgId)
                }
                return consumer.ConsumeRetryLater, nil
        })
        if err != nil {
                fmt.Println(err.Error())
        }
        // Note: start after subscribe
        err = c.Start()
        if err != nil {
                fmt.Println(err.Error())
                os.Exit(-1)
        }
        <-sig
        err = c.Shutdown()
        if err != nil {
                fmt.Printf("shutdown Consumer error: %s", err.Error())
        }
   }
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to