francisoliverlee commented on issue #1007: URL: https://github.com/apache/rocketmq-client-go/issues/1007#issuecomment-1445759410
> > 我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下: > > > > 1. 初始化rmqClient,维护一个producerMap, > > https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172 > > > > 启动更新topic路由的TimeTicker > > https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438 > > 2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。 > > 3. 正常发送消息 > > RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样 > > ``` > func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) { > traceTopic := td.traceTopic > if td.access == primitive.Cloud { > traceTopic = td.traceTopic + regionID > } > msg := primitive.NewMessage(traceTopic, []byte(data)) > msg.WithKeys(keySet.slice()) > > mq, addr := td.findMq(regionID) > if mq == nil { > return > } > > var req = td.buildSendRequest(mq, msg) > ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) > err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) { > cancel() > resp := primitive.NewSendResult() > if e != nil { > rlog.Info("send trace data error.", map[string]interface{}{ > "traceData": data, > }) > } else { > td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg) > rlog.Debug("send trace data success:", map[string]interface{}{ > "SendResult": resp, > "traceData": data, > }) > } > }) > ... > } > ``` > > 这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo 分析完全正确, 这里是bug,非常欢迎你提个PR修复这个问题。 正常的producer在生产的时候会把自己注册到client的producerMap中,在第一次发送的时候,会拉取路由,更新在producerMap对应实例的路由信息。 以后是由定时任务检查client.producerMap中每个生产者的topic的路由信息。 traceDispatcher在初始化的时候,并没有初始化生产者, 注册生产者到producerMap。这样定时任务检查路由的机制是存在的,但是没有topic需要检查。 处理办法:仿照java的逻辑, 初始化一个真正的producer来发送trace信息, 而不是直接调用通信层的invokeSync。这样每个producer的全部topic可以正常更新路由。 java的trace producer: https://github.com/apache/rocketmq/blob/06f2208a34907211591114f6b0d327168c250fb3/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java#L62 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
