This is an automated email from the ASF dual-hosted git repository.

scarb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3e81fae62a [ISSUE #8681] fix trace topic name (#8680)
3e81fae62a is described below

commit 3e81fae62a309521414398c607e589c2be49ee1e
Author: yuz10 <[email protected]>
AuthorDate: Thu Sep 19 15:13:23 2024 +0800

    [ISSUE #8681] fix trace topic name (#8680)
    
    * fix trace topic
---
 .../client/trace/AsyncTraceDispatcher.java         | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 6d62617eb8..e321e1583d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -302,14 +302,24 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
 
         public void sendTraceData(List<TraceContext> contextList) {
             Map<String, List<TraceTransferBean>> transBeanMap = new 
HashMap<>(16);
-            String currentRegionId;
+            String traceTopic;
             for (TraceContext context : contextList) {
-                currentRegionId = context.getRegionId();
+                AccessChannel accessChannel = context.getAccessChannel();
+                if (accessChannel == null) {
+                    accessChannel = AsyncTraceDispatcher.this.accessChannel;
+                }
+                String currentRegionId = context.getRegionId();
                 if (currentRegionId == null || 
context.getTraceBeans().isEmpty()) {
                     continue;
                 }
+                if (AccessChannel.CLOUD == accessChannel) {
+                    traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + 
currentRegionId;
+                } else {
+                    traceTopic = traceTopicName;
+                }
+
                 String topic = context.getTraceBeans().get(0).getTopic();
-                String key = topic + TraceConstants.CONTENT_SPLITOR + 
currentRegionId;
+                String key = topic + TraceConstants.CONTENT_SPLITOR + 
traceTopic;
                 List<TraceTransferBean> transBeanList = 
transBeanMap.computeIfAbsent(key, k -> new ArrayList<>());
                 TraceTransferBean traceData = 
TraceDataEncoder.encoderFromContextBean(context);
                 transBeanList.add(traceData);
@@ -320,7 +330,7 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
             }
         }
 
-        private void flushData(List<TraceTransferBean> transBeanList, String 
topic, String currentRegionId) {
+        private void flushData(List<TraceTransferBean> transBeanList, String 
topic, String traceTopic) {
             if (transBeanList.size() == 0) {
                 return;
             }
@@ -332,14 +342,14 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
                 buffer.append(bean.getTransData());
                 count++;
                 if (buffer.length() >= traceProducer.getMaxMessageSize()) {
-                    sendTraceDataByMQ(keySet, buffer.toString(), 
TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
+                    sendTraceDataByMQ(keySet, buffer.toString(), traceTopic);
                     buffer.delete(0, buffer.length());
                     keySet.clear();
                     count = 0;
                 }
             }
             if (count > 0) {
-                sendTraceDataByMQ(keySet, buffer.toString(), 
TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
+                sendTraceDataByMQ(keySet, buffer.toString(), traceTopic);
             }
             transBeanList.clear();
         }

Reply via email to