This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new baad0a427 [ISSUE #6579] Prevent the properties of trace message from 
exceeding the maximum value of short (#6580)
baad0a427 is described below

commit baad0a427f68e6957ef64f2b34403a4439f1d4b7
Author: rongtong <[email protected]>
AuthorDate: Thu Apr 13 09:17:58 2023 +0800

    [ISSUE #6579] Prevent the properties of trace message from exceeding the 
maximum value of short (#6580)
    
    * Prevent the properties of trace message from exceeding the maximum value 
of short
    
    * Use reduce instead of foreach
---
 .../apache/rocketmq/client/trace/AsyncTraceDispatcher.java    | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 95b7c8330..ea423b717 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -53,6 +53,7 @@ import static 
org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAM
 public class AsyncTraceDispatcher implements TraceDispatcher {
     private final static Logger log = 
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
     private final static AtomicInteger COUNTER = new AtomicInteger();
+    private final static short MAX_MSG_KEY_SIZE = Short.MAX_VALUE - 10000;
     private final int queueSize;
     private final int batchSize;
     private final int maxMsgSize;
@@ -315,6 +316,7 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
     class TraceDataSegment {
         private long firstBeanAddTime;
         private int currentMsgSize;
+        private int currentMsgKeySize;
         private final String traceTopicName;
         private final String regionId;
         private final List<TraceTransferBean> traceTransferBeanList = new 
ArrayList();
@@ -328,13 +330,14 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
             initFirstBeanAddTime();
             this.traceTransferBeanList.add(traceTransferBean);
             this.currentMsgSize += traceTransferBean.getTransData().length();
-            if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 
1000) {
+
+            this.currentMsgKeySize = traceTransferBean.getTransKey().stream()
+                .reduce(currentMsgKeySize, (acc, x) -> acc + x.length(), 
Integer::sum);
+            if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 
1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) {
                 List<TraceTransferBean> dataToSend = new 
ArrayList(traceTransferBeanList);
                 AsyncDataSendTask asyncDataSendTask = new 
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
                 traceExecutor.submit(asyncDataSendTask);
-
                 this.clear();
-
             }
         }
 
@@ -358,11 +361,11 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
         private void clear() {
             this.firstBeanAddTime = 0;
             this.currentMsgSize = 0;
+            this.currentMsgKeySize = 0;
             this.traceTransferBeanList.clear();
         }
     }
 
-
     class AsyncDataSendTask implements Runnable {
         private final String traceTopicName;
         private final String regionId;

Reply via email to