This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 217fc8da53 [ISSUE #8429] Fix trace message loss when traffic is heavy 
(#8430)
217fc8da53 is described below

commit 217fc8da53ca5f49e164f772112469cb1359e73c
Author: LetLetMe <[email protected]>
AuthorDate: Mon Jul 29 17:11:16 2024 +0800

    [ISSUE #8429] Fix trace message loss when traffic is heavy (#8430)
---
 .../org/apache/rocketmq/client/ClientConfig.java   |  11 +
 .../client/consumer/DefaultLitePullConsumer.java   |  14 +-
 .../client/consumer/DefaultMQPushConsumer.java     |  97 ++++---
 .../client/producer/DefaultMQProducer.java         |   2 +-
 .../client/trace/AsyncTraceDispatcher.java         | 292 +++++++++------------
 .../rocketmq/client/trace/TraceDataEncoder.java    |   5 +-
 6 files changed, 187 insertions(+), 234 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java 
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 0fc04fcccb..696b073b37 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -64,6 +65,8 @@ public class ClientConfig {
      */
     private int persistConsumerOffsetInterval = 1000 * 5;
     private long pullTimeDelayMillsWhenException = 1000;
+
+    private int traceMsgBatchNum = 10;
     private boolean unitMode = false;
     private String unitName;
     private boolean decodeReadBody = 
Boolean.parseBoolean(System.getProperty(DECODE_READ_BODY, "true"));
@@ -127,6 +130,14 @@ public class ClientConfig {
         return sb.toString();
     }
 
+    public int getTraceMsgBatchNum() {
+        return traceMsgBatchNum;
+    }
+
+    public void setTraceMsgBatchNum(int traceMsgBatchNum) {
+        this.traceMsgBatchNum = traceMsgBatchNum;
+    }
+
     public String getClientIP() {
         return clientIP;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 3364df48f8..20857f14e0 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -200,7 +200,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
      * Constructor specifying consumer group, RPC hook
      *
      * @param consumerGroup Consumer group.
-     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param rpcHook       RPC hook to execute before each remoting command.
      */
     public DefaultLitePullConsumer(final String consumerGroup, RPCHook 
rpcHook) {
         this.consumerGroup = consumerGroup;
@@ -213,7 +213,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
      * Constructor specifying namespace, consumer group and RPC hook.
      *
      * @param consumerGroup Consumer group.
-     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param rpcHook       RPC hook to execute before each remoting command.
      */
     @Deprecated
     public DefaultLitePullConsumer(final String namespace, final String 
consumerGroup, RPCHook rpcHook) {
@@ -270,6 +270,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     public void unsubscribe(String topic) {
         this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
     }
+
     @Override
     public void assign(Collection<MessageQueue> messageQueues) {
         defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
@@ -338,7 +339,8 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
         this.defaultLitePullConsumerImpl.commitAll();
     }
 
-    @Override public void commit(Map<MessageQueue, Long> offsetMap, boolean 
persist) {
+    @Override
+    public void commit(Map<MessageQueue, Long> offsetMap, boolean persist) {
         this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
     }
 
@@ -361,11 +363,11 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
      * @param messageQueueListener
      */
     @Override
-    public void subscribe(String topic, String subExpression, 
MessageQueueListener messageQueueListener) throws MQClientException {
+    public void subscribe(String topic, String subExpression,
+        MessageQueueListener messageQueueListener) throws MQClientException {
         this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), 
subExpression, messageQueueListener);
     }
 
-
     @Override
     public void commit(final Set<MessageQueue> messageQueues, boolean persist) 
{
         this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
@@ -589,7 +591,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     private void setTraceDispatcher() {
         if (enableTrace) {
             try {
-                AsyncTraceDispatcher traceDispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, 
rpcHook);
+                AsyncTraceDispatcher traceDispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, 
getTraceMsgBatchNum(), traceTopic, rpcHook);
                 traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
                 traceDispatcher.setNamespaceV2(namespaceV2);
                 this.traceDispatcher = traceDispatcher;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 38a412c237..2d9fb73cec 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -51,11 +51,9 @@ import java.util.Set;
 /**
  * In most scenarios, this is the mostly recommended class to consume messages.
  * </p>
- *
  * Technically speaking, this push client is virtually a wrapper of the 
underlying pull service. Specifically, on
  * arrival of messages pulled from brokers, it roughly invokes the registered 
callback handler to feed the messages.
  * </p>
- *
  * See quickstart/Consumer in the example module for a typical usage.
  * </p>
  *
@@ -76,7 +74,6 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      * Consumers of the same role is required to have exactly same 
subscriptions and consumerGroup to correctly achieve
      * load balance. It's required and needs to be globally unique.
      * </p>
-     *
      * See <a 
href="https://rocketmq.apache.org/docs/introduction/02concepts";>here</a> for 
further discussion.
      */
     private String consumerGroup;
@@ -84,13 +81,11 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Message model defines the way how messages are delivered to each 
consumer clients.
      * </p>
-     *
      * RocketMQ supports two message models: clustering and broadcasting. If 
clustering is set, consumer clients with
      * the same {@link #consumerGroup} would only consume shards of the 
messages subscribed, which achieves load
      * balances; Conversely, if the broadcasting is set, each consumer client 
will consume all subscribed messages
      * separately.
      * </p>
-     *
      * This field defaults to clustering.
      */
     private MessageModel messageModel = MessageModel.CLUSTERING;
@@ -98,7 +93,6 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Consuming point on consumer booting.
      * </p>
-     *
      * There are three consuming points:
      * <ul>
      * <li>
@@ -239,7 +233,6 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      */
     private int pullBatchSize = 32;
 
-
     private int pullBatchSizeInBytes = 256 * 1024;
 
     /**
@@ -256,7 +249,6 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      * Max re-consume times.
      * In concurrently mode, -1 means 16;
      * In orderly mode, -1 means Integer.MAX_VALUE.
-     *
      * If messages are re-consumed more than {@link #maxReconsumeTimes} before 
success.
      */
     private int maxReconsumeTimes = -1;
@@ -312,7 +304,6 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         this(consumerGroup, null, new AllocateMessageQueueAveragely());
     }
 
-
     /**
      * Constructor specifying RPC hook.
      *
@@ -326,29 +317,29 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      * Constructor specifying consumer group, RPC hook.
      *
      * @param consumerGroup Consumer group.
-     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param rpcHook       RPC hook to execute before each remoting command.
      */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
         this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
     }
 
-
     /**
      * Constructor specifying consumer group, enabled msg trace flag and 
customized trace topic name.
      *
-     * @param consumerGroup Consumer group.
-     * @param enableMsgTrace Switch flag instance for message trace.
+     * @param consumerGroup        Consumer group.
+     * @param enableMsgTrace       Switch flag instance for message trace.
      * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default trace topic name.
      */
-    public DefaultMQPushConsumer(final String consumerGroup, boolean 
enableMsgTrace, final String customizedTraceTopic) {
+    public DefaultMQPushConsumer(final String consumerGroup, boolean 
enableMsgTrace,
+        final String customizedTraceTopic) {
         this(consumerGroup, null, new AllocateMessageQueueAveragely(), 
enableMsgTrace, customizedTraceTopic);
     }
 
     /**
      * Constructor specifying consumer group, RPC hook and message queue 
allocating algorithm.
      *
-     * @param consumerGroup Consume queue.
-     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param consumerGroup                Consume queue.
+     * @param rpcHook                      RPC hook to execute before each 
remoting command.
      * @param allocateMessageQueueStrategy Message queue allocating algorithm.
      */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
@@ -359,14 +350,15 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Constructor specifying consumer group, RPC hook, message queue 
allocating algorithm, enabled msg trace flag and customized trace topic name.
      *
-     * @param consumerGroup Consume queue.
-     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param consumerGroup                Consume queue.
+     * @param rpcHook                      RPC hook to execute before each 
remoting command.
      * @param allocateMessageQueueStrategy message queue allocating algorithm.
-     * @param enableMsgTrace Switch flag instance for message trace.
-     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default trace topic name.
+     * @param enableMsgTrace               Switch flag instance for message 
trace.
+     * @param customizedTraceTopic         The name value of message trace 
topic.If you don't config,you can use the default trace topic name.
      */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
-        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace, final String customizedTraceTopic) {
+        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace,
+        final String customizedTraceTopic) {
         this.consumerGroup = consumerGroup;
         this.rpcHook = rpcHook;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
@@ -378,7 +370,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Constructor specifying namespace and consumer group.
      *
-     * @param namespace Namespace for this MQ Producer instance.
+     * @param namespace     Namespace for this MQ Producer instance.
      * @param consumerGroup Consumer group.
      */
     @Deprecated
@@ -389,9 +381,9 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Constructor specifying namespace, consumer group and RPC hook .
      *
-     * @param namespace Namespace for this MQ Producer instance.
+     * @param namespace     Namespace for this MQ Producer instance.
      * @param consumerGroup Consumer group.
-     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param rpcHook       RPC hook to execute before each remoting command.
      */
     @Deprecated
     public DefaultMQPushConsumer(final String namespace, final String 
consumerGroup, RPCHook rpcHook) {
@@ -401,9 +393,9 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Constructor specifying namespace, consumer group, RPC hook and message 
queue allocating algorithm.
      *
-     * @param namespace Namespace for this MQ Producer instance.
-     * @param consumerGroup Consume queue.
-     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param namespace                    Namespace for this MQ Producer 
instance.
+     * @param consumerGroup                Consume queue.
+     * @param rpcHook                      RPC hook to execute before each 
remoting command.
      * @param allocateMessageQueueStrategy Message queue allocating algorithm.
      */
     @Deprecated
@@ -419,16 +411,17 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Constructor specifying namespace, consumer group, RPC hook, message 
queue allocating algorithm, enabled msg trace flag and customized trace topic 
name.
      *
-     * @param namespace Namespace for this MQ Producer instance.
-     * @param consumerGroup Consume queue.
-     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param namespace                    Namespace for this MQ Producer 
instance.
+     * @param consumerGroup                Consume queue.
+     * @param rpcHook                      RPC hook to execute before each 
remoting command.
      * @param allocateMessageQueueStrategy message queue allocating algorithm.
-     * @param enableMsgTrace Switch flag instance for message trace.
-     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default trace topic name.
+     * @param enableMsgTrace               Switch flag instance for message 
trace.
+     * @param customizedTraceTopic         The name value of message trace 
topic.If you don't config,you can use the default trace topic name.
      */
     @Deprecated
     public DefaultMQPushConsumer(final String namespace, final String 
consumerGroup, RPCHook rpcHook,
-        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace, final String customizedTraceTopic) {
+        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace,
+        final String customizedTraceTopic) {
         this.consumerGroup = consumerGroup;
         this.namespace = namespace;
         this.rpcHook = rpcHook;
@@ -443,7 +436,8 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, 
Map<String, String> attributes) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum,
+        Map<String, String> attributes) throws MQClientException {
         createTopic(key, withNamespace(newTopic), queueNum, 0, null);
     }
 
@@ -457,7 +451,8 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag, Map<String, String> attributes) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag,
+        Map<String, String> attributes) throws MQClientException {
         this.defaultMQPushConsumerImpl.createTopic(key, 
withNamespace(newTopic), queueNum, topicSysFlag);
     }
 
@@ -677,16 +672,16 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
 
     /**
      * Send message back to broker which will be re-delivered in future.
-     *
+     * <p>
      * This method will be removed or it's visibility will be changed in a 
certain version after April 5, 2020, so
      * please do not use this method.
      *
-     * @param msg Message to send back.
+     * @param msg        Message to send back.
      * @param delayLevel delay level.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any broker error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any broker error.
      * @throws InterruptedException if the thread is interrupted.
-     * @throws MQClientException if there is any client error.
+     * @throws MQClientException    if there is any client error.
      */
     @Deprecated
     @Override
@@ -699,17 +694,17 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Send message back to the broker whose name is <code>brokerName</code> 
and the message will be re-delivered in
      * future.
-     *
+     * <p>
      * This method will be removed or it's visibility will be changed in a 
certain version after April 5, 2020, so
      * please do not use this method.
      *
-     * @param msg Message to send back.
+     * @param msg        Message to send back.
      * @param delayLevel delay level.
      * @param brokerName broker name.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any broker error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any broker error.
      * @throws InterruptedException if the thread is interrupted.
-     * @throws MQClientException if there is any client error.
+     * @throws MQClientException    if there is any client error.
      */
     @Deprecated
     @Override
@@ -735,7 +730,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         this.defaultMQPushConsumerImpl.start();
         if (enableTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, 
rpcHook);
+                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, 
getTraceMsgBatchNum(), traceTopic, rpcHook);
                 dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
                 dispatcher.setNamespaceV2(namespaceV2);
                 traceDispatcher = dispatcher;
@@ -799,9 +794,9 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Subscribe a topic to consuming subscription.
      *
-     * @param topic topic to subscribe.
+     * @param topic         topic to subscribe.
      * @param subExpression subscription expression.it only support or 
operation such as "tag1 || tag2 || tag3" <br>
-     * if null or * expression,meaning subscribe all
+     *                      if null or * expression,meaning subscribe all
      * @throws MQClientException if there is any client error.
      */
     @Override
@@ -812,8 +807,8 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Subscribe a topic to consuming subscription.
      *
-     * @param topic topic to consume.
-     * @param fullClassName full class name,must extend 
org.apache.rocketmq.common.filter. MessageFilter
+     * @param topic             topic to consume.
+     * @param fullClassName     full class name,must extend 
org.apache.rocketmq.common.filter. MessageFilter
      * @param filterClassSource class source code,used UTF-8 file 
encoding,must be responsible for your code safety
      */
     @Override
@@ -824,7 +819,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Subscribe a topic by message selector.
      *
-     * @param topic topic to consume.
+     * @param topic           topic to consume.
      * @param messageSelector {@link 
org.apache.rocketmq.client.consumer.MessageSelector}
      * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
      * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 4fd038663b..3ecd5987c3 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -348,7 +348,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         }
         if (enableTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, traceTopic, 
rpcHook);
+                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, 
getTraceMsgBatchNum(), traceTopic, rpcHook);
                 dispatcher.setHostProducer(this.defaultMQProducerImpl);
                 dispatcher.setNamespaceV2(this.namespaceV2);
                 traceDispatcher = dispatcher;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 1fe19773a5..6d62617eb8 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -16,19 +16,6 @@
  */
 package org.apache.rocketmq.client.trace;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -44,68 +31,75 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.RPCHook;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
 
 public class AsyncTraceDispatcher implements TraceDispatcher {
-    private final static Logger log = 
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
-    private final static AtomicInteger COUNTER = new AtomicInteger();
-    private final static short MAX_MSG_KEY_SIZE = Short.MAX_VALUE - 10000;
-    private final int queueSize;
-    private final int batchSize;
+    private static final Logger log = 
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
+    private static final AtomicInteger COUNTER = new AtomicInteger();
+    private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(0);
+    private volatile boolean stopped = false;
+    private final int traceInstanceId = INSTANCE_NUM.getAndIncrement();
+    private final int batchNum;
     private final int maxMsgSize;
-    private final long pollingTimeMil;
-    private final long waitTimeThresholdMil;
     private final DefaultMQProducer traceProducer;
-    private final ThreadPoolExecutor traceExecutor;
-    // The last discard number of log
     private AtomicLong discardCount;
     private Thread worker;
+    private final ThreadPoolExecutor traceExecutor;
     private final ArrayBlockingQueue<TraceContext> traceContextQueue;
-    private final HashMap<String, TraceDataSegment> taskQueueByTopic;
-    private ArrayBlockingQueue<Runnable> appenderQueue;
+    private final ArrayBlockingQueue<Runnable> appenderQueue;
     private volatile Thread shutDownHook;
-    private volatile boolean stopped = false;
+
     private DefaultMQProducerImpl hostProducer;
     private DefaultMQPushConsumerImpl hostConsumer;
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
-    private String dispatcherId = UUID.randomUUID().toString();
     private volatile String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
     private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
     private String group;
     private Type type;
     private String namespaceV2;
+    private final int flushTraceInterval = 5000;
+
+    private long lastFlushTime = System.currentTimeMillis();
 
-    public AsyncTraceDispatcher(String group, Type type, String 
traceTopicName, RPCHook rpcHook) {
-        // queueSize is greater than or equal to the n power of 2 of value
-        this.queueSize = 2048;
-        this.batchSize = 100;
+    public AsyncTraceDispatcher(String group, Type type, int batchNum, String 
traceTopicName, RPCHook rpcHook) {
+        this.batchNum = Math.min(batchNum, 20);/* max value 20*/
         this.maxMsgSize = 128000;
-        this.pollingTimeMil = 100;
-        this.waitTimeThresholdMil = 500;
         this.discardCount = new AtomicLong(0L);
-        this.traceContextQueue = new ArrayBlockingQueue<>(1024);
-        this.taskQueueByTopic = new HashMap();
+        this.traceContextQueue = new ArrayBlockingQueue<>(2048);
         this.group = group;
         this.type = type;
-
-        this.appenderQueue = new ArrayBlockingQueue<>(queueSize);
+        this.appenderQueue = new ArrayBlockingQueue<>(2048);
         if (!UtilAll.isBlank(traceTopicName)) {
             this.traceTopicName = traceTopicName;
         } else {
             this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
         }
         this.traceExecutor = new ThreadPoolExecutor(//
-            10, //
-            20, //
+            2, //
+            4, //
             1000 * 60, //
             TimeUnit.MILLISECONDS, //
             this.appenderQueue, //
-            new ThreadFactoryImpl("MQTraceSendThread_"));
+            new ThreadFactoryImpl("MQTraceSendThread_" + traceInstanceId + 
"_"));
         traceProducer = getAndCreateTraceProducer(rpcHook);
     }
 
@@ -153,7 +147,6 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
         this.namespaceV2 = namespaceV2;
     }
 
-    @Override
     public void start(String nameSrvAddr, AccessChannel accessChannel) throws 
MQClientException {
         if (isStarted.compareAndSet(false, true)) {
             traceProducer.setNamesrvAddr(nameSrvAddr);
@@ -163,7 +156,8 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
             traceProducer.start();
         }
         this.accessChannel = accessChannel;
-        this.worker = new Thread(new AsyncRunnable(), 
"MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
+        this.worker = new ThreadFactoryImpl("MQ-AsyncArrayDispatcher-Thread" + 
traceInstanceId, true)
+            .newThread(new AsyncRunnable());
         this.worker.setDaemon(true);
         this.worker.start();
         this.registerShutDownHook();
@@ -197,37 +191,28 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
 
     @Override
     public void flush() {
-        // The maximum waiting time for refresh,avoid being written all the 
time, resulting in failure to return.
         long end = System.currentTimeMillis() + 500;
-        while (System.currentTimeMillis() <= end) {
-            synchronized (taskQueueByTopic) {
-                for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
-                    taskInfo.sendAllData();
-                }
-            }
-            synchronized (traceContextQueue) {
-                if (traceContextQueue.size() == 0 && appenderQueue.size() == 
0) {
-                    break;
-                }
-            }
+        while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && 
System.currentTimeMillis() <= end) {
             try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                break;
+                flushTraceContext(true);
+            } catch (Throwable throwable) {
+                log.error("flushTraceContext error", throwable);
             }
         }
-        log.info("------end trace send " + traceContextQueue.size() + "   " + 
appenderQueue.size());
+        if (appenderQueue.size() > 0) {
+            log.error("There are still some traces that haven't been sent " + 
traceContextQueue.size() + "   " + appenderQueue.size());
+        }
     }
 
     @Override
     public void shutdown() {
-        this.stopped = true;
         flush();
         this.traceExecutor.shutdown();
         if (isStarted.get()) {
             traceProducer.shutdown();
         }
         this.removeShutdownHook();
+        stopped = true;
     }
 
     public void registerShutDownHook() {
@@ -259,152 +244,111 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
     }
 
     class AsyncRunnable implements Runnable {
-        private boolean stopped;
+        private volatile boolean stopped = false;
 
         @Override
         public void run() {
             while (!stopped) {
-                synchronized (traceContextQueue) {
-                    long endTime = System.currentTimeMillis() + pollingTimeMil;
-                    while (System.currentTimeMillis() < endTime) {
-                        try {
-                            TraceContext traceContext = traceContextQueue.poll(
-                                endTime - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS
-                            );
-
-                            if (traceContext != null && 
!traceContext.getTraceBeans().isEmpty()) {
-                                // get the topic which the trace message will 
send to
-                                String traceTopicName = 
this.getTraceTopicName(traceContext.getRegionId());
-
-                                // get the traceDataSegment which will save 
this trace message, create if null
-                                TraceDataSegment traceDataSegment = 
taskQueueByTopic.get(traceTopicName);
-                                if (traceDataSegment == null) {
-                                    traceDataSegment = new 
TraceDataSegment(traceTopicName, traceContext.getRegionId());
-                                    taskQueueByTopic.put(traceTopicName, 
traceDataSegment);
-                                }
-
-                                // encode traceContext and save it into 
traceDataSegment
-                                // NOTE if data size in traceDataSegment more 
than maxMsgSize,
-                                //  a AsyncDataSendTask will be created and 
submitted
-                                TraceTransferBean traceTransferBean = 
TraceDataEncoder.encoderFromContextBean(traceContext);
-                                
traceDataSegment.addTraceTransferBean(traceTransferBean);
-                            }
-                        } catch (InterruptedException ignore) {
-                            log.debug("traceContextQueue#poll exception");
-                        }
-                    }
-
-                    // NOTE send the data in traceDataSegment which the first 
TraceTransferBean
-                    //  is longer than waitTimeThreshold
-                    sendDataByTimeThreshold();
-
-                    if (AsyncTraceDispatcher.this.stopped) {
-                        this.stopped = true;
-                    }
+                try {
+                    flushTraceContext(false);
+                } catch (Throwable e) {
+                    log.error("flushTraceContext error", e);
                 }
             }
-
+            if (AsyncTraceDispatcher.this.stopped) {
+                this.stopped = true;
+            }
         }
+    }
 
-        private void sendDataByTimeThreshold() {
-            long now = System.currentTimeMillis();
-            for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
-                if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
-                    taskInfo.sendAllData();
+    private void flushTraceContext(boolean forceFlush) throws 
InterruptedException {
+        List<TraceContext> contextList = new ArrayList<>(batchNum);
+        int size = traceContextQueue.size();
+        if (size != 0) {
+            if (forceFlush || size >= batchNum || System.currentTimeMillis() - 
lastFlushTime > flushTraceInterval) {
+                for (int i = 0; i < batchNum; i++) {
+                    TraceContext context = traceContextQueue.poll();
+                    if (context != null) {
+                        contextList.add(context);
+                    } else {
+                        break;
+                    }
                 }
+                asyncSendTraceMessage(contextList);
+                return;
             }
         }
+        // To prevent an infinite loop, add a wait time between each two task 
executions
+        Thread.sleep(5);
+    }
 
-        private String getTraceTopicName(String regionId) {
-            AccessChannel accessChannel = 
AsyncTraceDispatcher.this.getAccessChannel();
-            if (AccessChannel.CLOUD == accessChannel) {
-                return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
-            }
-
-            return AsyncTraceDispatcher.this.getTraceTopicName();
-        }
+    private void asyncSendTraceMessage(List<TraceContext> contextList) {
+        AsyncDataSendTask request = new AsyncDataSendTask(contextList);
+        traceExecutor.submit(request);
+        lastFlushTime = System.currentTimeMillis();
     }
 
-    class TraceDataSegment {
-        private long firstBeanAddTime;
-        private int currentMsgSize;
-        private int currentMsgKeySize;
-        private final String traceTopicName;
-        private final String regionId;
-        private final List<TraceTransferBean> traceTransferBeanList = new 
ArrayList<>();
+    class AsyncDataSendTask implements Runnable {
+        private final List<TraceContext> contextList;
 
-        TraceDataSegment(String traceTopicName, String regionId) {
-            this.traceTopicName = traceTopicName;
-            this.regionId = regionId;
+        public AsyncDataSendTask(List<TraceContext> contextList) {
+            this.contextList = contextList;
         }
 
-        public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
-            initFirstBeanAddTime();
-            this.traceTransferBeanList.add(traceTransferBean);
-            this.currentMsgSize += traceTransferBean.getTransData().length();
-
-            this.currentMsgKeySize = traceTransferBean.getTransKey().stream()
-                .reduce(currentMsgKeySize, (acc, x) -> acc + x.length(), 
Integer::sum);
-            if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 
1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) {
-                List<TraceTransferBean> dataToSend = new 
ArrayList<>(traceTransferBeanList);
-                AsyncDataSendTask asyncDataSendTask = new 
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
-                traceExecutor.submit(asyncDataSendTask);
-                this.clear();
-            }
+        @Override
+        public void run() {
+            sendTraceData(contextList);
         }
 
-        public void sendAllData() {
-            if (this.traceTransferBeanList.isEmpty()) {
-                return;
+        public void sendTraceData(List<TraceContext> contextList) {
+            Map<String, List<TraceTransferBean>> transBeanMap = new 
HashMap<>(16);
+            String currentRegionId;
+            for (TraceContext context : contextList) {
+                currentRegionId = context.getRegionId();
+                if (currentRegionId == null || 
context.getTraceBeans().isEmpty()) {
+                    continue;
+                }
+                String topic = context.getTraceBeans().get(0).getTopic();
+                String key = topic + TraceConstants.CONTENT_SPLITOR + 
currentRegionId;
+                List<TraceTransferBean> transBeanList = 
transBeanMap.computeIfAbsent(key, k -> new ArrayList<>());
+                TraceTransferBean traceData = 
TraceDataEncoder.encoderFromContextBean(context);
+                transBeanList.add(traceData);
             }
-            List<TraceTransferBean> dataToSend = new 
ArrayList<>(traceTransferBeanList);
-            AsyncDataSendTask asyncDataSendTask = new 
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
-            traceExecutor.submit(asyncDataSendTask);
-
-            this.clear();
-        }
-
-        private void initFirstBeanAddTime() {
-            if (firstBeanAddTime == 0) {
-                firstBeanAddTime = System.currentTimeMillis();
+            for (Map.Entry<String, List<TraceTransferBean>> entry : 
transBeanMap.entrySet()) {
+                String[] key = 
entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
+                flushData(entry.getValue(), key[0], key[1]);
             }
         }
 
-        private void clear() {
-            this.firstBeanAddTime = 0;
-            this.currentMsgSize = 0;
-            this.currentMsgKeySize = 0;
-            this.traceTransferBeanList.clear();
-        }
-    }
-
-    class AsyncDataSendTask implements Runnable {
-        private final String traceTopicName;
-        private final String regionId;
-        private final List<TraceTransferBean> traceTransferBeanList;
-
-        public AsyncDataSendTask(String traceTopicName, String regionId, 
List<TraceTransferBean> traceTransferBeanList) {
-            this.traceTopicName = traceTopicName;
-            this.regionId = regionId;
-            this.traceTransferBeanList = traceTransferBeanList;
-        }
-
-        @Override
-        public void run() {
+        private void flushData(List<TraceTransferBean> transBeanList, String 
topic, String currentRegionId) {
+            if (transBeanList.size() == 0) {
+                return;
+            }
             StringBuilder buffer = new StringBuilder(1024);
-            Set<String> keySet = new HashSet<>();
-            for (TraceTransferBean bean : traceTransferBeanList) {
+            int count = 0;
+            Set<String> keySet = new HashSet<String>();
+            for (TraceTransferBean bean : transBeanList) {
                 keySet.addAll(bean.getTransKey());
                 buffer.append(bean.getTransData());
+                count++;
+                if (buffer.length() >= traceProducer.getMaxMessageSize()) {
+                    sendTraceDataByMQ(keySet, buffer.toString(), 
TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
+                    buffer.delete(0, buffer.length());
+                    keySet.clear();
+                    count = 0;
+                }
+            }
+            if (count > 0) {
+                sendTraceDataByMQ(keySet, buffer.toString(), 
TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
             }
-            sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName);
+            transBeanList.clear();
         }
 
         /**
          * Send message trace data
          *
-         * @param keySet the keyset in this batch(including msgId in original 
message not offsetMsgId)
-         * @param data   the message trace data in this batch
+         * @param keySet     the keyset in this batch(including msgId in 
original message not offsetMsgId)
+         * @param data       the message trace data in this batch
          * @param traceTopic the topic which message trace data will send to
          */
         private void sendTraceDataByMQ(Set<String> keySet, final String data, 
String traceTopic) {
@@ -467,4 +411,4 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
         }
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index 0fdd95243a..57e9b6410d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -193,9 +193,10 @@ public class TraceDataEncoder {
                         
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
                         
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR);
                     if (!ctx.getAccessChannel().equals(AccessChannel.CLOUD)) {
-                        
sb.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
-                            
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+                        
sb.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR);
+                        sb.append(ctx.getGroupName());
                     }
+                    sb.append(TraceConstants.FIELD_SPLITOR);
                 }
             }
             break;


Reply via email to