This is an automated email from the ASF dual-hosted git repository.
scarb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3e81fae62a [ISSUE #8681] fix trace topic name (#8680)
3e81fae62a is described below
commit 3e81fae62a309521414398c607e589c2be49ee1e
Author: yuz10 <[email protected]>
AuthorDate: Thu Sep 19 15:13:23 2024 +0800
[ISSUE #8681] fix trace topic name (#8680)
* fix trace topic
---
.../client/trace/AsyncTraceDispatcher.java | 22 ++++++++++++++++------
1 file changed, 16 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 6d62617eb8..e321e1583d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -302,14 +302,24 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new
HashMap<>(16);
- String currentRegionId;
+ String traceTopic;
for (TraceContext context : contextList) {
- currentRegionId = context.getRegionId();
+ AccessChannel accessChannel = context.getAccessChannel();
+ if (accessChannel == null) {
+ accessChannel = AsyncTraceDispatcher.this.accessChannel;
+ }
+ String currentRegionId = context.getRegionId();
if (currentRegionId == null ||
context.getTraceBeans().isEmpty()) {
continue;
}
+ if (AccessChannel.CLOUD == accessChannel) {
+ traceTopic = TraceConstants.TRACE_TOPIC_PREFIX +
currentRegionId;
+ } else {
+ traceTopic = traceTopicName;
+ }
+
String topic = context.getTraceBeans().get(0).getTopic();
- String key = topic + TraceConstants.CONTENT_SPLITOR +
currentRegionId;
+ String key = topic + TraceConstants.CONTENT_SPLITOR +
traceTopic;
List<TraceTransferBean> transBeanList =
transBeanMap.computeIfAbsent(key, k -> new ArrayList<>());
TraceTransferBean traceData =
TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
@@ -320,7 +330,7 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
}
}
- private void flushData(List<TraceTransferBean> transBeanList, String
topic, String currentRegionId) {
+ private void flushData(List<TraceTransferBean> transBeanList, String
topic, String traceTopic) {
if (transBeanList.size() == 0) {
return;
}
@@ -332,14 +342,14 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
buffer.append(bean.getTransData());
count++;
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
- sendTraceDataByMQ(keySet, buffer.toString(),
TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
+ sendTraceDataByMQ(keySet, buffer.toString(), traceTopic);
buffer.delete(0, buffer.length());
keySet.clear();
count = 0;
}
}
if (count > 0) {
- sendTraceDataByMQ(keySet, buffer.toString(),
TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
+ sendTraceDataByMQ(keySet, buffer.toString(), traceTopic);
}
transBeanList.clear();
}