tenhan opened a new issue #617:
URL: https://github.com/apache/rocketmq-client-go/issues/617


   The issue tracker is **ONLY** used for the go client (feature request of 
RocketMQ need to follow [RIP 
process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)).
 Keep in mind, please check whether there is an existing same report before 
your raise a new one.
   
   Alternately (especially if your communication is not a bug report), you can 
send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We 
welcome any friendly suggestions, bug fixes, collaboration, and other 
improvements.
   
   Please ensure that your bug report is clear and that it is complete. 
Otherwise, we may be unable to understand it or to reproduce it, either of 
which would prevent us from fixing the bug. We strongly recommend the 
report(bug report or feature request) could include some hints as to the 
following:
   
   **BUG REPORT**  
   **Please add the branch name [Native]/[Master] at the header of the Isssue 
title.**
   
   1. Please describe the issue you observed:
   
       - What did you do (The steps to reproduce)?
       Do like the demo example 
(https://github.com/apache/rocketmq-client-go/blob/master/examples/producer/tag/main.go
 )
       The difference is that our application is an http server, and 
p.shutdown() is called only after the application ends.
        
       - What did you expect to see?
       The memory usage should not keep rising.
   
       - What did you see instead?
       After sending a message synchronously, about one hour later,there were 
more than 10,000 goroutine exist,and the memory was exhausted.
      pprof:
   `goroutine profile: total 20452
   20410 @ 0x43bbc5 0x4076ea 0x407495 0x7e8490 0x662389 0x652149 0x470881
   #    0x7e848f        
github.com/apache/rocketmq-client-go/v2/internal.GetOrNewRocketMQClient.func2+0x24f
             
/home/tenhan/go/pkg/mod/github.com/apache/rocketmq-client-go/[email protected]/internal/client.go:220
   #    0x662388        
github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).processCMD.func2+0x68
 
/home/tenhan/go/pkg/mod/github.com/apache/rocketmq-client-go/[email protected]/internal/remote/remote_client.go:207
   #    0x652148        
github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x48              
                
/home/tenhan/go/pkg/mod/github.com/apache/rocketmq-client-go/[email protected]/primitive/base.go:96
   `
   
   2. Please tell us about your environment:
   
        - What is your OS?
        Debian , Linux 5.10.0
        - What is your client version?
       v2.0.0
        - What is your RocketMQ version?
        
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions on how to fix, etc):
   
   When the defaultProducer receives cmd.Code=39 (ReqCheckTransactionState), a 
new goroutine will be created, and this goroutine will finally call callbackCh 
<- callback. Because there is no corresponding goroutine to consume callbackCh, 
the goroutine will be blocked (the number of goroutines will increase until the 
memory is exhausted).
   
   The approximate process of the call is:
   github.com\tenhan\rocketmq-client-go\v2/internal\remote\remote_client.go
   remotingClient.connect()
   defaultProducer.SendSync()
   -> client.InvokeSync()
   -> remoteClient.InvokeSync
   -> remotingClient.connect()
   -> new goroutine( remotingClient.receiveResponse()) -> 
foreach(remotingClient.processCMD())
   -> if(cmd.Code==39) new goroutine( callbackCh <- callback, blocked here)
   
   The related code is in this file: 
github.com/apache/rocketmq-client-go/v2/internal/client.go
   `client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
      header := new(CheckTransactionStateRequestHeader)
      header.Decode(req.ExtFields)
      msgExts := primitive.DecodeMessage(req.Body)
      if len(msgExts) == 0 {
         rlog.Warning("checkTransactionState, decode message failed", nil)
         return nil
      }
      msgExt := msgExts[0]
      // TODO: add namespace support
      transactionID := 
msgExt.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
      if len(transactionID) > 0 {
         msgExt.TransactionId = transactionID
      }
      group := msgExt.GetProperty(primitive.PropertyProducerGroup)
      if group == "" {
         rlog.Warning("checkTransactionState, pick producer group failed", nil)
         return nil
      }
      if option.GroupName != group {
         rlog.Warning("producer group is not equal", nil)
         return nil
      }
      callback := &CheckTransactionStateCallback{
         Addr:   addr,
         Msg:    msgExt,
         Header: *header,
      }
      
      callbackCh <- callback  // the goroutine were blocked here
      return nil
   })`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to