This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 217fc8da53 [ISSUE #8429] Fix trace message loss when traffic is heavy
(#8430)
217fc8da53 is described below
commit 217fc8da53ca5f49e164f772112469cb1359e73c
Author: LetLetMe <[email protected]>
AuthorDate: Mon Jul 29 17:11:16 2024 +0800
[ISSUE #8429] Fix trace message loss when traffic is heavy (#8430)
---
.../org/apache/rocketmq/client/ClientConfig.java | 11 +
.../client/consumer/DefaultLitePullConsumer.java | 14 +-
.../client/consumer/DefaultMQPushConsumer.java | 97 ++++---
.../client/producer/DefaultMQProducer.java | 2 +-
.../client/trace/AsyncTraceDispatcher.java | 292 +++++++++------------
.../rocketmq/client/trace/TraceDataEncoder.java | 5 +-
6 files changed, 187 insertions(+), 234 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 0fc04fcccb..696b073b37 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -64,6 +65,8 @@ public class ClientConfig {
*/
private int persistConsumerOffsetInterval = 1000 * 5;
private long pullTimeDelayMillsWhenException = 1000;
+
+ private int traceMsgBatchNum = 10;
private boolean unitMode = false;
private String unitName;
private boolean decodeReadBody =
Boolean.parseBoolean(System.getProperty(DECODE_READ_BODY, "true"));
@@ -127,6 +130,14 @@ public class ClientConfig {
return sb.toString();
}
+ public int getTraceMsgBatchNum() {
+ return traceMsgBatchNum;
+ }
+
+ public void setTraceMsgBatchNum(int traceMsgBatchNum) {
+ this.traceMsgBatchNum = traceMsgBatchNum;
+ }
+
public String getClientIP() {
return clientIP;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 3364df48f8..20857f14e0 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -200,7 +200,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
* Constructor specifying consumer group, RPC hook
*
* @param consumerGroup Consumer group.
- * @param rpcHook RPC hook to execute before each remoting command.
+ * @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook
rpcHook) {
this.consumerGroup = consumerGroup;
@@ -213,7 +213,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
* Constructor specifying namespace, consumer group and RPC hook.
*
* @param consumerGroup Consumer group.
- * @param rpcHook RPC hook to execute before each remoting command.
+ * @param rpcHook RPC hook to execute before each remoting command.
*/
@Deprecated
public DefaultLitePullConsumer(final String namespace, final String
consumerGroup, RPCHook rpcHook) {
@@ -270,6 +270,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
+
@Override
public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
@@ -338,7 +339,8 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
this.defaultLitePullConsumerImpl.commitAll();
}
- @Override public void commit(Map<MessageQueue, Long> offsetMap, boolean
persist) {
+ @Override
+ public void commit(Map<MessageQueue, Long> offsetMap, boolean persist) {
this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
}
@@ -361,11 +363,11 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
* @param messageQueueListener
*/
@Override
- public void subscribe(String topic, String subExpression,
MessageQueueListener messageQueueListener) throws MQClientException {
+ public void subscribe(String topic, String subExpression,
+ MessageQueueListener messageQueueListener) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic),
subExpression, messageQueueListener);
}
-
@Override
public void commit(final Set<MessageQueue> messageQueues, boolean persist)
{
this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
@@ -589,7 +591,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
private void setTraceDispatcher() {
if (enableTrace) {
try {
- AsyncTraceDispatcher traceDispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic,
rpcHook);
+ AsyncTraceDispatcher traceDispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME,
getTraceMsgBatchNum(), traceTopic, rpcHook);
traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
traceDispatcher.setNamespaceV2(namespaceV2);
this.traceDispatcher = traceDispatcher;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 38a412c237..2d9fb73cec 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -51,11 +51,9 @@ import java.util.Set;
/**
* In most scenarios, this is the mostly recommended class to consume messages.
* </p>
- *
* Technically speaking, this push client is virtually a wrapper of the
underlying pull service. Specifically, on
* arrival of messages pulled from brokers, it roughly invokes the registered
callback handler to feed the messages.
* </p>
- *
* See quickstart/Consumer in the example module for a typical usage.
* </p>
*
@@ -76,7 +74,6 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
* Consumers of the same role is required to have exactly same
subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
* </p>
- *
* See <a
href="https://rocketmq.apache.org/docs/introduction/02concepts">here</a> for
further discussion.
*/
private String consumerGroup;
@@ -84,13 +81,11 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Message model defines the way how messages are delivered to each
consumer clients.
* </p>
- *
* RocketMQ supports two message models: clustering and broadcasting. If
clustering is set, consumer clients with
* the same {@link #consumerGroup} would only consume shards of the
messages subscribed, which achieves load
* balances; Conversely, if the broadcasting is set, each consumer client
will consume all subscribed messages
* separately.
* </p>
- *
* This field defaults to clustering.
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
@@ -98,7 +93,6 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Consuming point on consumer booting.
* </p>
- *
* There are three consuming points:
* <ul>
* <li>
@@ -239,7 +233,6 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
*/
private int pullBatchSize = 32;
-
private int pullBatchSizeInBytes = 256 * 1024;
/**
@@ -256,7 +249,6 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
* Max re-consume times.
* In concurrently mode, -1 means 16;
* In orderly mode, -1 means Integer.MAX_VALUE.
- *
* If messages are re-consumed more than {@link #maxReconsumeTimes} before
success.
*/
private int maxReconsumeTimes = -1;
@@ -312,7 +304,6 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
-
/**
* Constructor specifying RPC hook.
*
@@ -326,29 +317,29 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
* Constructor specifying consumer group, RPC hook.
*
* @param consumerGroup Consumer group.
- * @param rpcHook RPC hook to execute before each remoting command.
+ * @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
}
-
/**
* Constructor specifying consumer group, enabled msg trace flag and
customized trace topic name.
*
- * @param consumerGroup Consumer group.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param consumerGroup Consumer group.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If
you don't config,you can use the default trace topic name.
*/
- public DefaultMQPushConsumer(final String consumerGroup, boolean
enableMsgTrace, final String customizedTraceTopic) {
+ public DefaultMQPushConsumer(final String consumerGroup, boolean
enableMsgTrace,
+ final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(),
enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying consumer group, RPC hook and message queue
allocating algorithm.
*
- * @param consumerGroup Consume queue.
- * @param rpcHook RPC hook to execute before each remoting command.
+ * @param consumerGroup Consume queue.
+ * @param rpcHook RPC hook to execute before each
remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
@@ -359,14 +350,15 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Constructor specifying consumer group, RPC hook, message queue
allocating algorithm, enabled msg trace flag and customized trace topic name.
*
- * @param consumerGroup Consume queue.
- * @param rpcHook RPC hook to execute before each remoting command.
+ * @param consumerGroup Consume queue.
+ * @param rpcHook RPC hook to execute before each
remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
- * @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If
you don't config,you can use the default trace topic name.
+ * @param enableMsgTrace Switch flag instance for message
trace.
+ * @param customizedTraceTopic The name value of message trace
topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
enableMsgTrace, final String customizedTraceTopic) {
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
enableMsgTrace,
+ final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
@@ -378,7 +370,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Constructor specifying namespace and consumer group.
*
- * @param namespace Namespace for this MQ Producer instance.
+ * @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consumer group.
*/
@Deprecated
@@ -389,9 +381,9 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Constructor specifying namespace, consumer group and RPC hook .
*
- * @param namespace Namespace for this MQ Producer instance.
+ * @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consumer group.
- * @param rpcHook RPC hook to execute before each remoting command.
+ * @param rpcHook RPC hook to execute before each remoting command.
*/
@Deprecated
public DefaultMQPushConsumer(final String namespace, final String
consumerGroup, RPCHook rpcHook) {
@@ -401,9 +393,9 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Constructor specifying namespace, consumer group, RPC hook and message
queue allocating algorithm.
*
- * @param namespace Namespace for this MQ Producer instance.
- * @param consumerGroup Consume queue.
- * @param rpcHook RPC hook to execute before each remoting command.
+ * @param namespace Namespace for this MQ Producer
instance.
+ * @param consumerGroup Consume queue.
+ * @param rpcHook RPC hook to execute before each
remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
@Deprecated
@@ -419,16 +411,17 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Constructor specifying namespace, consumer group, RPC hook, message
queue allocating algorithm, enabled msg trace flag and customized trace topic
name.
*
- * @param namespace Namespace for this MQ Producer instance.
- * @param consumerGroup Consume queue.
- * @param rpcHook RPC hook to execute before each remoting command.
+ * @param namespace Namespace for this MQ Producer
instance.
+ * @param consumerGroup Consume queue.
+ * @param rpcHook RPC hook to execute before each
remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
- * @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If
you don't config,you can use the default trace topic name.
+ * @param enableMsgTrace Switch flag instance for message
trace.
+ * @param customizedTraceTopic The name value of message trace
topic.If you don't config,you can use the default trace topic name.
*/
@Deprecated
public DefaultMQPushConsumer(final String namespace, final String
consumerGroup, RPCHook rpcHook,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
enableMsgTrace, final String customizedTraceTopic) {
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
enableMsgTrace,
+ final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.rpcHook = rpcHook;
@@ -443,7 +436,8 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum,
Map<String, String> attributes) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum,
+ Map<String, String> attributes) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}
@@ -457,7 +451,8 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum, int
topicSysFlag, Map<String, String> attributes) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum, int
topicSysFlag,
+ Map<String, String> attributes) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key,
withNamespace(newTopic), queueNum, topicSysFlag);
}
@@ -677,16 +672,16 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Send message back to broker which will be re-delivered in future.
- *
+ * <p>
* This method will be removed or it's visibility will be changed in a
certain version after April 5, 2020, so
* please do not use this method.
*
- * @param msg Message to send back.
+ * @param msg Message to send back.
* @param delayLevel delay level.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQClientException if there is any client error.
+ * @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
@@ -699,17 +694,17 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Send message back to the broker whose name is <code>brokerName</code>
and the message will be re-delivered in
* future.
- *
+ * <p>
* This method will be removed or it's visibility will be changed in a
certain version after April 5, 2020, so
* please do not use this method.
*
- * @param msg Message to send back.
+ * @param msg Message to send back.
* @param delayLevel delay level.
* @param brokerName broker name.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQClientException if there is any client error.
+ * @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
@@ -735,7 +730,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
this.defaultMQPushConsumerImpl.start();
if (enableTrace) {
try {
- AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic,
rpcHook);
+ AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME,
getTraceMsgBatchNum(), traceTopic, rpcHook);
dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
dispatcher.setNamespaceV2(namespaceV2);
traceDispatcher = dispatcher;
@@ -799,9 +794,9 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Subscribe a topic to consuming subscription.
*
- * @param topic topic to subscribe.
+ * @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or
operation such as "tag1 || tag2 || tag3" <br>
- * if null or * expression,meaning subscribe all
+ * if null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
@@ -812,8 +807,8 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Subscribe a topic to consuming subscription.
*
- * @param topic topic to consume.
- * @param fullClassName full class name,must extend
org.apache.rocketmq.common.filter. MessageFilter
+ * @param topic topic to consume.
+ * @param fullClassName full class name,must extend
org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file
encoding,must be responsible for your code safety
*/
@Override
@@ -824,7 +819,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
/**
* Subscribe a topic by message selector.
*
- * @param topic topic to consume.
+ * @param topic topic to consume.
* @param messageSelector {@link
org.apache.rocketmq.client.consumer.MessageSelector}
* @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
* @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 4fd038663b..3ecd5987c3 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -348,7 +348,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
}
if (enableTrace) {
try {
- AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, traceTopic,
rpcHook);
+ AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE,
getTraceMsgBatchNum(), traceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
dispatcher.setNamespaceV2(this.namespaceV2);
traceDispatcher = dispatcher;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 1fe19773a5..6d62617eb8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -16,19 +16,6 @@
*/
package org.apache.rocketmq.client.trace;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -44,68 +31,75 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.RPCHook;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
public class AsyncTraceDispatcher implements TraceDispatcher {
- private final static Logger log =
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
- private final static AtomicInteger COUNTER = new AtomicInteger();
- private final static short MAX_MSG_KEY_SIZE = Short.MAX_VALUE - 10000;
- private final int queueSize;
- private final int batchSize;
+ private static final Logger log =
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+ private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(0);
+ private volatile boolean stopped = false;
+ private final int traceInstanceId = INSTANCE_NUM.getAndIncrement();
+ private final int batchNum;
private final int maxMsgSize;
- private final long pollingTimeMil;
- private final long waitTimeThresholdMil;
private final DefaultMQProducer traceProducer;
- private final ThreadPoolExecutor traceExecutor;
- // The last discard number of log
private AtomicLong discardCount;
private Thread worker;
+ private final ThreadPoolExecutor traceExecutor;
private final ArrayBlockingQueue<TraceContext> traceContextQueue;
- private final HashMap<String, TraceDataSegment> taskQueueByTopic;
- private ArrayBlockingQueue<Runnable> appenderQueue;
+ private final ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
- private volatile boolean stopped = false;
+
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
- private String dispatcherId = UUID.randomUUID().toString();
private volatile String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
private String group;
private Type type;
private String namespaceV2;
+ private final int flushTraceInterval = 5000;
+
+ private long lastFlushTime = System.currentTimeMillis();
- public AsyncTraceDispatcher(String group, Type type, String
traceTopicName, RPCHook rpcHook) {
- // queueSize is greater than or equal to the n power of 2 of value
- this.queueSize = 2048;
- this.batchSize = 100;
+ public AsyncTraceDispatcher(String group, Type type, int batchNum, String
traceTopicName, RPCHook rpcHook) {
+ this.batchNum = Math.min(batchNum, 20);/* max value 20*/
this.maxMsgSize = 128000;
- this.pollingTimeMil = 100;
- this.waitTimeThresholdMil = 500;
this.discardCount = new AtomicLong(0L);
- this.traceContextQueue = new ArrayBlockingQueue<>(1024);
- this.taskQueueByTopic = new HashMap();
+ this.traceContextQueue = new ArrayBlockingQueue<>(2048);
this.group = group;
this.type = type;
-
- this.appenderQueue = new ArrayBlockingQueue<>(queueSize);
+ this.appenderQueue = new ArrayBlockingQueue<>(2048);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
- 10, //
- 20, //
+ 2, //
+ 4, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
- new ThreadFactoryImpl("MQTraceSendThread_"));
+ new ThreadFactoryImpl("MQTraceSendThread_" + traceInstanceId +
"_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
@@ -153,7 +147,6 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
this.namespaceV2 = namespaceV2;
}
- @Override
public void start(String nameSrvAddr, AccessChannel accessChannel) throws
MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
@@ -163,7 +156,8 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
traceProducer.start();
}
this.accessChannel = accessChannel;
- this.worker = new Thread(new AsyncRunnable(),
"MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
+ this.worker = new ThreadFactoryImpl("MQ-AsyncArrayDispatcher-Thread" +
traceInstanceId, true)
+ .newThread(new AsyncRunnable());
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
@@ -197,37 +191,28 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
@Override
public void flush() {
- // The maximum waiting time for refresh,avoid being written all the
time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
- while (System.currentTimeMillis() <= end) {
- synchronized (taskQueueByTopic) {
- for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
- taskInfo.sendAllData();
- }
- }
- synchronized (traceContextQueue) {
- if (traceContextQueue.size() == 0 && appenderQueue.size() ==
0) {
- break;
- }
- }
+ while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 &&
System.currentTimeMillis() <= end) {
try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- break;
+ flushTraceContext(true);
+ } catch (Throwable throwable) {
+ log.error("flushTraceContext error", throwable);
}
}
- log.info("------end trace send " + traceContextQueue.size() + " " +
appenderQueue.size());
+ if (appenderQueue.size() > 0) {
+ log.error("There are still some traces that haven't been sent " +
traceContextQueue.size() + " " + appenderQueue.size());
+ }
}
@Override
public void shutdown() {
- this.stopped = true;
flush();
this.traceExecutor.shutdown();
if (isStarted.get()) {
traceProducer.shutdown();
}
this.removeShutdownHook();
+ stopped = true;
}
public void registerShutDownHook() {
@@ -259,152 +244,111 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
}
class AsyncRunnable implements Runnable {
- private boolean stopped;
+ private volatile boolean stopped = false;
@Override
public void run() {
while (!stopped) {
- synchronized (traceContextQueue) {
- long endTime = System.currentTimeMillis() + pollingTimeMil;
- while (System.currentTimeMillis() < endTime) {
- try {
- TraceContext traceContext = traceContextQueue.poll(
- endTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
- );
-
- if (traceContext != null &&
!traceContext.getTraceBeans().isEmpty()) {
- // get the topic which the trace message will
send to
- String traceTopicName =
this.getTraceTopicName(traceContext.getRegionId());
-
- // get the traceDataSegment which will save
this trace message, create if null
- TraceDataSegment traceDataSegment =
taskQueueByTopic.get(traceTopicName);
- if (traceDataSegment == null) {
- traceDataSegment = new
TraceDataSegment(traceTopicName, traceContext.getRegionId());
- taskQueueByTopic.put(traceTopicName,
traceDataSegment);
- }
-
- // encode traceContext and save it into
traceDataSegment
- // NOTE if data size in traceDataSegment more
than maxMsgSize,
- // a AsyncDataSendTask will be created and
submitted
- TraceTransferBean traceTransferBean =
TraceDataEncoder.encoderFromContextBean(traceContext);
-
traceDataSegment.addTraceTransferBean(traceTransferBean);
- }
- } catch (InterruptedException ignore) {
- log.debug("traceContextQueue#poll exception");
- }
- }
-
- // NOTE send the data in traceDataSegment which the first
TraceTransferBean
- // is longer than waitTimeThreshold
- sendDataByTimeThreshold();
-
- if (AsyncTraceDispatcher.this.stopped) {
- this.stopped = true;
- }
+ try {
+ flushTraceContext(false);
+ } catch (Throwable e) {
+ log.error("flushTraceContext error", e);
}
}
-
+ if (AsyncTraceDispatcher.this.stopped) {
+ this.stopped = true;
+ }
}
+ }
- private void sendDataByTimeThreshold() {
- long now = System.currentTimeMillis();
- for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
- if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
- taskInfo.sendAllData();
+ private void flushTraceContext(boolean forceFlush) throws
InterruptedException {
+ List<TraceContext> contextList = new ArrayList<>(batchNum);
+ int size = traceContextQueue.size();
+ if (size != 0) {
+ if (forceFlush || size >= batchNum || System.currentTimeMillis() -
lastFlushTime > flushTraceInterval) {
+ for (int i = 0; i < batchNum; i++) {
+ TraceContext context = traceContextQueue.poll();
+ if (context != null) {
+ contextList.add(context);
+ } else {
+ break;
+ }
}
+ asyncSendTraceMessage(contextList);
+ return;
}
}
+ // To prevent an infinite loop, add a wait time between each two task
executions
+ Thread.sleep(5);
+ }
- private String getTraceTopicName(String regionId) {
- AccessChannel accessChannel =
AsyncTraceDispatcher.this.getAccessChannel();
- if (AccessChannel.CLOUD == accessChannel) {
- return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
- }
-
- return AsyncTraceDispatcher.this.getTraceTopicName();
- }
+ private void asyncSendTraceMessage(List<TraceContext> contextList) {
+ AsyncDataSendTask request = new AsyncDataSendTask(contextList);
+ traceExecutor.submit(request);
+ lastFlushTime = System.currentTimeMillis();
}
- class TraceDataSegment {
- private long firstBeanAddTime;
- private int currentMsgSize;
- private int currentMsgKeySize;
- private final String traceTopicName;
- private final String regionId;
- private final List<TraceTransferBean> traceTransferBeanList = new
ArrayList<>();
+ class AsyncDataSendTask implements Runnable {
+ private final List<TraceContext> contextList;
- TraceDataSegment(String traceTopicName, String regionId) {
- this.traceTopicName = traceTopicName;
- this.regionId = regionId;
+ public AsyncDataSendTask(List<TraceContext> contextList) {
+ this.contextList = contextList;
}
- public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
- initFirstBeanAddTime();
- this.traceTransferBeanList.add(traceTransferBean);
- this.currentMsgSize += traceTransferBean.getTransData().length();
-
- this.currentMsgKeySize = traceTransferBean.getTransKey().stream()
- .reduce(currentMsgKeySize, (acc, x) -> acc + x.length(),
Integer::sum);
- if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 *
1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) {
- List<TraceTransferBean> dataToSend = new
ArrayList<>(traceTransferBeanList);
- AsyncDataSendTask asyncDataSendTask = new
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
- traceExecutor.submit(asyncDataSendTask);
- this.clear();
- }
+ @Override
+ public void run() {
+ sendTraceData(contextList);
}
- public void sendAllData() {
- if (this.traceTransferBeanList.isEmpty()) {
- return;
+ public void sendTraceData(List<TraceContext> contextList) {
+ Map<String, List<TraceTransferBean>> transBeanMap = new
HashMap<>(16);
+ String currentRegionId;
+ for (TraceContext context : contextList) {
+ currentRegionId = context.getRegionId();
+ if (currentRegionId == null ||
context.getTraceBeans().isEmpty()) {
+ continue;
+ }
+ String topic = context.getTraceBeans().get(0).getTopic();
+ String key = topic + TraceConstants.CONTENT_SPLITOR +
currentRegionId;
+ List<TraceTransferBean> transBeanList =
transBeanMap.computeIfAbsent(key, k -> new ArrayList<>());
+ TraceTransferBean traceData =
TraceDataEncoder.encoderFromContextBean(context);
+ transBeanList.add(traceData);
}
- List<TraceTransferBean> dataToSend = new
ArrayList<>(traceTransferBeanList);
- AsyncDataSendTask asyncDataSendTask = new
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
- traceExecutor.submit(asyncDataSendTask);
-
- this.clear();
- }
-
- private void initFirstBeanAddTime() {
- if (firstBeanAddTime == 0) {
- firstBeanAddTime = System.currentTimeMillis();
+ for (Map.Entry<String, List<TraceTransferBean>> entry :
transBeanMap.entrySet()) {
+ String[] key =
entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
+ flushData(entry.getValue(), key[0], key[1]);
}
}
- private void clear() {
- this.firstBeanAddTime = 0;
- this.currentMsgSize = 0;
- this.currentMsgKeySize = 0;
- this.traceTransferBeanList.clear();
- }
- }
-
- class AsyncDataSendTask implements Runnable {
- private final String traceTopicName;
- private final String regionId;
- private final List<TraceTransferBean> traceTransferBeanList;
-
- public AsyncDataSendTask(String traceTopicName, String regionId,
List<TraceTransferBean> traceTransferBeanList) {
- this.traceTopicName = traceTopicName;
- this.regionId = regionId;
- this.traceTransferBeanList = traceTransferBeanList;
- }
-
- @Override
- public void run() {
+ private void flushData(List<TraceTransferBean> transBeanList, String
topic, String currentRegionId) {
+ if (transBeanList.size() == 0) {
+ return;
+ }
StringBuilder buffer = new StringBuilder(1024);
- Set<String> keySet = new HashSet<>();
- for (TraceTransferBean bean : traceTransferBeanList) {
+ int count = 0;
+ Set<String> keySet = new HashSet<String>();
+ for (TraceTransferBean bean : transBeanList) {
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
+ count++;
+ if (buffer.length() >= traceProducer.getMaxMessageSize()) {
+ sendTraceDataByMQ(keySet, buffer.toString(),
TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
+ buffer.delete(0, buffer.length());
+ keySet.clear();
+ count = 0;
+ }
+ }
+ if (count > 0) {
+ sendTraceDataByMQ(keySet, buffer.toString(),
TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
}
- sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName);
+ transBeanList.clear();
}
/**
* Send message trace data
*
- * @param keySet the keyset in this batch(including msgId in original
message not offsetMsgId)
- * @param data the message trace data in this batch
+ * @param keySet the keyset in this batch(including msgId in
original message not offsetMsgId)
+ * @param data the message trace data in this batch
* @param traceTopic the topic which message trace data will send to
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data,
String traceTopic) {
@@ -467,4 +411,4 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
}
}
-}
+}
\ No newline at end of file
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index 0fdd95243a..57e9b6410d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -193,9 +193,10 @@ public class TraceDataEncoder {
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR);
if (!ctx.getAccessChannel().equals(AccessChannel.CLOUD)) {
-
sb.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
-
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+
sb.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR);
+ sb.append(ctx.getGroupName());
}
+ sb.append(TraceConstants.FIELD_SPLITOR);
}
}
break;