This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new baad0a427 [ISSUE #6579] Prevent the properties of trace message from
exceeding the maximum value of short (#6580)
baad0a427 is described below
commit baad0a427f68e6957ef64f2b34403a4439f1d4b7
Author: rongtong <[email protected]>
AuthorDate: Thu Apr 13 09:17:58 2023 +0800
[ISSUE #6579] Prevent the properties of trace message from exceeding the
maximum value of short (#6580)
* Prevent the properties of trace message from exceeding the maximum value
of short
* Use reduce instead of foreach
---
.../apache/rocketmq/client/trace/AsyncTraceDispatcher.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 95b7c8330..ea423b717 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -53,6 +53,7 @@ import static
org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAM
public class AsyncTraceDispatcher implements TraceDispatcher {
private final static Logger log =
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
private final static AtomicInteger COUNTER = new AtomicInteger();
+ private final static short MAX_MSG_KEY_SIZE = Short.MAX_VALUE - 10000;
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
@@ -315,6 +316,7 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
class TraceDataSegment {
private long firstBeanAddTime;
private int currentMsgSize;
+ private int currentMsgKeySize;
private final String traceTopicName;
private final String regionId;
private final List<TraceTransferBean> traceTransferBeanList = new
ArrayList();
@@ -328,13 +330,14 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
initFirstBeanAddTime();
this.traceTransferBeanList.add(traceTransferBean);
this.currentMsgSize += traceTransferBean.getTransData().length();
- if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 *
1000) {
+
+ this.currentMsgKeySize = traceTransferBean.getTransKey().stream()
+ .reduce(currentMsgKeySize, (acc, x) -> acc + x.length(),
Integer::sum);
+ if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 *
1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) {
List<TraceTransferBean> dataToSend = new
ArrayList(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
traceExecutor.submit(asyncDataSendTask);
-
this.clear();
-
}
}
@@ -358,11 +361,11 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
private void clear() {
this.firstBeanAddTime = 0;
this.currentMsgSize = 0;
+ this.currentMsgKeySize = 0;
this.traceTransferBeanList.clear();
}
}
-
class AsyncDataSendTask implements Runnable {
private final String traceTopicName;
private final String regionId;