tenhan opened a new issue #617: URL: https://github.com/apache/rocketmq-client-go/issues/617
The issue tracker is **ONLY** used for the go client (feature request of RocketMQ need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one. Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration, and other improvements. Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as to the following: **BUG REPORT** **Please add the branch name [Native]/[Master] at the header of the Isssue title.** 1. Please describe the issue you observed: - What did you do (The steps to reproduce)? Do like the demo example (https://github.com/apache/rocketmq-client-go/blob/master/examples/producer/tag/main.go ) The difference is that our application is an http server, and p.shutdown() is called only after the application ends. - What did you expect to see? The memory usage should not keep rising. - What did you see instead? After sending a message synchronously, about one hour later,there were more than 10,000 goroutine exist,and the memory was exhausted. pprof: `goroutine profile: total 20452 20410 @ 0x43bbc5 0x4076ea 0x407495 0x7e8490 0x662389 0x652149 0x470881 # 0x7e848f github.com/apache/rocketmq-client-go/v2/internal.GetOrNewRocketMQClient.func2+0x24f /home/tenhan/go/pkg/mod/github.com/apache/rocketmq-client-go/[email protected]/internal/client.go:220 # 0x662388 github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).processCMD.func2+0x68 /home/tenhan/go/pkg/mod/github.com/apache/rocketmq-client-go/[email protected]/internal/remote/remote_client.go:207 # 0x652148 github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x48 /home/tenhan/go/pkg/mod/github.com/apache/rocketmq-client-go/[email protected]/primitive/base.go:96 ` 2. Please tell us about your environment: - What is your OS? Debian , Linux 5.10.0 - What is your client version? v2.0.0 - What is your RocketMQ version? 3. Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc): When the defaultProducer receives cmd.Code=39 (ReqCheckTransactionState), a new goroutine will be created, and this goroutine will finally call callbackCh <- callback. Because there is no corresponding goroutine to consume callbackCh, the goroutine will be blocked (the number of goroutines will increase until the memory is exhausted). The approximate process of the call is: github.com\tenhan\rocketmq-client-go\v2/internal\remote\remote_client.go remotingClient.connect() defaultProducer.SendSync() -> client.InvokeSync() -> remoteClient.InvokeSync -> remotingClient.connect() -> new goroutine( remotingClient.receiveResponse()) -> foreach(remotingClient.processCMD()) -> if(cmd.Code==39) new goroutine( callbackCh <- callback, blocked here) The related code is in this file: github.com/apache/rocketmq-client-go/v2/internal/client.go `client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand { header := new(CheckTransactionStateRequestHeader) header.Decode(req.ExtFields) msgExts := primitive.DecodeMessage(req.Body) if len(msgExts) == 0 { rlog.Warning("checkTransactionState, decode message failed", nil) return nil } msgExt := msgExts[0] // TODO: add namespace support transactionID := msgExt.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) if len(transactionID) > 0 { msgExt.TransactionId = transactionID } group := msgExt.GetProperty(primitive.PropertyProducerGroup) if group == "" { rlog.Warning("checkTransactionState, pick producer group failed", nil) return nil } if option.GroupName != group { rlog.Warning("producer group is not equal", nil) return nil } callback := &CheckTransactionStateCallback{ Addr: addr, Msg: msgExt, Header: *header, } callbackCh <- callback // the goroutine were blocked here return nil })` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
