This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch separate_trace
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/separate_trace by this push:
     new 83633de  feat(client) plug-in message tracing  feature (#1624)
83633de is described below

commit 83633deae16f6aeecf6aded8a9169d7c98ea7bd9
Author: Heng Du <[email protected]>
AuthorDate: Fri Nov 29 12:23:18 2019 +0800

    feat(client) plug-in message tracing  feature (#1624)
    
    * feat(client) seprated the messaging tracing to ons
    
    * fix(trace) fix the rpchook is null exception
---
 client/pom.xml                                     |  13 +-
 .../client/consumer/DefaultMQPushConsumer.java     | 106 +++---
 .../client/producer/DefaultMQProducer.java         |  74 ++--
 .../client/trace/AsyncTraceDispatcher.java         | 413 ---------------------
 .../apache/rocketmq/client/trace/TraceBean.java    | 144 -------
 .../rocketmq/client/trace/TraceConstants.java      |  28 --
 .../apache/rocketmq/client/trace/TraceContext.java | 136 -------
 .../rocketmq/client/trace/TraceDataEncoder.java    | 173 ---------
 .../rocketmq/client/trace/TraceDispatcher.java     |  51 ---
 .../rocketmq/client/trace/TraceDispatcherType.java |  22 --
 .../rocketmq/client/trace/TraceTransferBean.java   |  44 ---
 .../apache/rocketmq/client/trace/TraceType.java    |  23 --
 .../trace/hook/ConsumeMessageTraceHookImpl.java    | 114 ------
 .../trace/hook/SendMessageTraceHookImpl.java       |  98 -----
 .../trace/DefaultMQConsumerWithTraceTest.java      |   5 +-
 .../trace/DefaultMQProducerWithTraceTest.java      |   7 +-
 pom.xml                                            |   7 +-
 17 files changed, 130 insertions(+), 1328 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index 1cf292b..f1b61d3 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -15,7 +15,8 @@
   limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
@@ -62,5 +63,15 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>ons-trace-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.rocketmq</groupId>
+                    <artifactId>rocketmq-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 339f799..07239f8 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -18,7 +18,9 @@ package org.apache.rocketmq.client.consumer;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -30,9 +32,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
 import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -42,6 +41,11 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
+import 
org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsConsumeMessageHookImpl;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
@@ -100,17 +104,15 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      * <ul>
      * <li>
      * <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where 
it stopped previously.
-     * If it were a newly booting up consumer client, according aging of the 
consumer group, there are two
-     * cases:
+     * If it were a newly booting up consumer client, according aging of the 
consumer group, there are two cases:
      * <ol>
      * <li>
-     * if the consumer group is created so recently that the earliest message 
being subscribed has yet
-     * expired, which means the consumer group represents a lately launched 
business, consuming will
-     * start from the very beginning;
+     * if the consumer group is created so recently that the earliest message 
being subscribed has yet expired, which
+     * means the consumer group represents a lately launched business, 
consuming will start from the very beginning;
      * </li>
      * <li>
-     * if the earliest message being subscribed has expired, consuming will 
start from the latest
-     * messages, meaning messages born prior to the booting timestamp would be 
ignored.
+     * if the earliest message being subscribed has expired, consuming will 
start from the latest messages, meaning
+     * messages born prior to the booting timestamp would be ignored.
      * </li>
      * </ol>
      * </li>
@@ -126,10 +128,8 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     private ConsumeFromWhere consumeFromWhere = 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
 
     /**
-     * Backtracking consumption time with second precision. Time format is
-     * 20131223171201<br>
-     * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
-     * Default backtracking consumption time Half an hour ago.
+     * Backtracking consumption time with second precision. Time format is 
20131223171201<br> Implying Seventeen twelve
+     * and 01 seconds on December 23, 2013 year<br> Default backtracking 
consumption time Half an hour ago.
      */
     private String consumeTimestamp = 
UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
 
@@ -174,8 +174,8 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     private int consumeConcurrentlyMaxSpan = 2000;
 
     /**
-     * Flow control threshold on queue level, each message queue will cache at 
most 1000 messages by default,
-     * Consider the {@code pullBatchSize}, the instantaneous value may exceed 
the limit
+     * Flow control threshold on queue level, each message queue will cache at 
most 1000 messages by default, Consider
+     * the {@code pullBatchSize}, the instantaneous value may exceed the limit
      */
     private int pullThresholdForQueue = 1000;
 
@@ -191,8 +191,8 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Flow control threshold on topic level, default value is -1(Unlimited)
      * <p>
-     * The value of {@code pullThresholdForQueue} will be overwrote and 
calculated based on
-     * {@code pullThresholdForTopic} if it is't unlimited
+     * The value of {@code pullThresholdForQueue} will be overwrote and 
calculated based on {@code
+     * pullThresholdForTopic} if it is't unlimited
      * <p>
      * For example, if the value of pullThresholdForTopic is 1000 and 10 
message queues are assigned to this consumer,
      * then pullThresholdForQueue will be set to 100
@@ -202,11 +202,11 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Limit the cached message size on topic level, default value is -1 
MiB(Unlimited)
      * <p>
-     * The value of {@code pullThresholdSizeForQueue} will be overwrote and 
calculated based on
-     * {@code pullThresholdSizeForTopic} if it is't unlimited
+     * The value of {@code pullThresholdSizeForQueue} will be overwrote and 
calculated based on {@code
+     * pullThresholdSizeForTopic} if it is't unlimited
      * <p>
-     * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 
10 message queues are
-     * assigned to this consumer, then pullThresholdSizeForQueue will be set 
to 100 MiB
+     * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 
10 message queues are assigned to this
+     * consumer, then pullThresholdSizeForQueue will be set to 100 MiB
      */
     private int pullThresholdSizeForTopic = -1;
 
@@ -257,7 +257,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     /**
      * Interface of asynchronous transfer data
      */
-    private TraceDispatcher traceDispatcher = null;
+    private AsyncDispatcher traceDispatcher = null;
 
     /**
      * Default constructor.
@@ -285,7 +285,6 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         this(namespace, consumerGroup, null, new 
AllocateMessageQueueAveragely());
     }
 
-
     /**
      * Constructor specifying RPC hook.
      *
@@ -349,52 +348,70 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      *
      * @param consumerGroup Consumer group.
      * @param enableMsgTrace Switch flag instance for message trace.
-     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default trace topic name.
+     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default
+     * trace topic name.
      */
-    public DefaultMQPushConsumer(final String consumerGroup, boolean 
enableMsgTrace, final String customizedTraceTopic) {
+    public DefaultMQPushConsumer(final String consumerGroup, boolean 
enableMsgTrace,
+        final String customizedTraceTopic) {
         this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), 
enableMsgTrace, customizedTraceTopic);
     }
 
-
     /**
-     * Constructor specifying consumer group, RPC hook, message queue 
allocating algorithm, enabled msg trace flag and customized trace topic name.
+     * Constructor specifying consumer group, RPC hook, message queue 
allocating algorithm, enabled msg trace flag and
+     * customized trace topic name.
      *
      * @param consumerGroup Consume queue.
      * @param rpcHook RPC hook to execute before each remoting command.
      * @param allocateMessageQueueStrategy message queue allocating algorithm.
      * @param enableMsgTrace Switch flag instance for message trace.
-     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default trace topic name.
+     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default
+     * trace topic name.
      */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
-        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace, final String customizedTraceTopic) {
+        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace,
+        final String customizedTraceTopic) {
         this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy, 
enableMsgTrace, customizedTraceTopic);
     }
 
     /**
-     * Constructor specifying namespace, consumer group, RPC hook, message 
queue allocating algorithm, enabled msg trace flag and customized trace topic 
name.
+     * Constructor specifying namespace, consumer group, RPC hook, message 
queue allocating algorithm, enabled msg trace
+     * flag and customized trace topic name.
      *
      * @param namespace Namespace for this MQ Producer instance.
      * @param consumerGroup Consume queue.
      * @param rpcHook RPC hook to execute before each remoting command.
      * @param allocateMessageQueueStrategy message queue allocating algorithm.
      * @param enableMsgTrace Switch flag instance for message trace.
-     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default trace topic name.
+     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default
+     * trace topic name.
      */
     public DefaultMQPushConsumer(final String namespace, final String 
consumerGroup, RPCHook rpcHook,
-        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace, final String customizedTraceTopic) {
+        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace,
+        final String customizedTraceTopic) {
         this.consumerGroup = consumerGroup;
         this.namespace = namespace;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
         if (enableMsgTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
-                
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
-                traceDispatcher = dispatcher;
-                this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
-                    new ConsumeMessageTraceHookImpl(traceDispatcher));
+                Properties tempProperties = new Properties();
+                tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
+                tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
+                tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
+                tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, 
this.getNamesrvAddr());
+                tempProperties.put(OnsTraceConstants.InstanceName, 
"PID_CLIENT_INNER_TRACE_CONSUMER");
+                tempProperties.put(OnsTraceConstants.TraceDispatcherType, 
OnsTraceDispatcherType.CONSUMER.name());
+                if (customizedTraceTopic != null) {
+                    tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, 
customizedTraceTopic);
+                } else {
+                    tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, 
MixAll.RMQ_SYS_TRACE_TOPIC);
+                }
+                this.traceDispatcher = new 
AsyncArrayDispatcher(tempProperties, rpcHook);
+
+                ((AsyncArrayDispatcher) 
traceDispatcher).setHostConsumer(this.getDefaultMQPushConsumerImpl());
+                
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new 
OnsConsumeMessageHookImpl(traceDispatcher));
             } catch (Throwable e) {
-                log.error("system mqtrace hook init failed ,maybe can't send 
msg trace data");
+                log.error("system mqtrace hook init failed ,maybe can't send 
msg trace data", e);
             }
         }
     }
@@ -693,7 +710,10 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         this.defaultMQPushConsumerImpl.start();
         if (null != traceDispatcher) {
             try {
-                traceDispatcher.start(this.getNamesrvAddr(), 
this.getAccessChannel());
+                if (this.accessChannel == AccessChannel.CLOUD) {
+                    ((AsyncArrayDispatcher) 
this.traceDispatcher).setCustomizedTraceTopic(null);
+                }
+                traceDispatcher.start(this.getNamesrvAddr());
             } catch (MQClientException e) {
                 log.warn("trace dispatcher start failed ", e);
             }
@@ -744,8 +764,8 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      * Subscribe a topic to consuming subscription.
      *
      * @param topic topic to subscribe.
-     * @param subExpression subscription expression.it only support or 
operation such as "tag1 || tag2 || tag3" <br>
-     * if null or * expression,meaning subscribe all
+     * @param subExpression subscription expression.it only support or 
operation such as "tag1 || tag2 || tag3" <br> if
+     * null or * expression,meaning subscribe all
      * @throws MQClientException if there is any client error.
      */
     @Override
@@ -886,7 +906,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         this.consumeTimeout = consumeTimeout;
     }
 
-    public TraceDispatcher getTraceDispatcher() {
+    public AsyncDispatcher getTraceDispatcher() {
         return traceDispatcher;
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index faa79f5..63b337f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -18,7 +18,9 @@ package org.apache.rocketmq.client.producer;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
@@ -27,9 +29,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
@@ -39,6 +38,11 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
+import 
org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import 
org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
@@ -51,8 +55,8 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
  * This class aggregates various <code>send</code> methods to deliver messages 
to brokers. Each of them has pros and
  * cons; you'd better understand strengths and weakness of them before 
actually coding. </p>
  *
- * <p> <strong>Thread Safety:</strong> After configuring and starting process, 
this class can be regarded as thread-safe
- * and used among multiple threads context. </p>
+ * <p> <strong>Thread Safety:</strong> After configuring and starting process, 
this class can be regarded as
+ * thread-safe and used among multiple threads context. </p>
  */
 public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
@@ -118,7 +122,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Interface of asynchronous transfer data
      */
-    private TraceDispatcher traceDispatcher = null;
+    private AsyncDispatcher traceDispatcher = null;
 
     /**
      * Default constructor.
@@ -158,17 +162,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         final String customizedTraceTopic) {
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
-        //if client open the message trace feature
         if (enableMsgTrace) {
-            try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
-                dispatcher.setHostProducer(this.defaultMQProducerImpl);
-                traceDispatcher = dispatcher;
-                this.defaultMQProducerImpl.registerSendMessageHook(
-                    new SendMessageTraceHookImpl(traceDispatcher));
-            } catch (Throwable e) {
-                log.error("system mqtrace hook init failed ,maybe can't send 
msg trace data");
-            }
+            enableTrace(customizedTraceTopic, rpcHook);
         }
     }
 
@@ -243,17 +238,31 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
-        //if client open the message trace feature
         if (enableMsgTrace) {
-            try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
-                dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
-                traceDispatcher = dispatcher;
-                this.getDefaultMQProducerImpl().registerSendMessageHook(
-                    new SendMessageTraceHookImpl(traceDispatcher));
-            } catch (Throwable e) {
-                log.error("system mqtrace hook init failed ,maybe can't send 
msg trace data");
+            enableTrace(customizedTraceTopic, rpcHook);
+        }
+    }
+
+    private void enableTrace(String customizedTraceTopic, RPCHook rpcHook) {
+        try {
+            Properties tempProperties = new Properties();
+            tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
+            tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
+            tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
+            tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, 
this.getNamesrvAddr());
+            tempProperties.put(OnsTraceConstants.InstanceName, 
"PID_CLIENT_INNER_TRACE_PRODUCER");
+            tempProperties.put(OnsTraceConstants.TraceDispatcherType, 
OnsTraceDispatcherType.CONSUMER.name());
+            if (customizedTraceTopic != null) {
+                tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, 
customizedTraceTopic);
+            } else {
+                tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, 
MixAll.RMQ_SYS_TRACE_TOPIC);
             }
+            this.traceDispatcher = new AsyncArrayDispatcher(tempProperties, 
rpcHook);
+
+            ((AsyncArrayDispatcher) 
traceDispatcher).setHostProducer(this.defaultMQProducerImpl);
+            this.defaultMQProducerImpl.registerSendMessageHook(new 
OnsClientSendMessageHookImpl(traceDispatcher));
+        } catch (Throwable e) {
+            log.error("system mqtrace hook init failed ,maybe can't send msg 
trace data");
         }
     }
 
@@ -271,7 +280,10 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         this.defaultMQProducerImpl.start();
         if (null != traceDispatcher) {
             try {
-                traceDispatcher.start(this.getNamesrvAddr(), 
this.getAccessChannel());
+                if (this.accessChannel == AccessChannel.CLOUD) {
+                    ((AsyncArrayDispatcher) 
this.traceDispatcher).setCustomizedTraceTopic(null);
+                }
+                traceDispatcher.start(this.getNamesrvAddr());
             } catch (MQClientException e) {
                 log.warn("trace dispatcher start failed ", e);
             }
@@ -566,7 +578,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     }
 
     /**
-     * Send request message in synchronous mode. This method returns only when 
the consumer consume the request message and reply a message. </p>
+     * Send request message in synchronous mode. This method returns only when 
the consumer consume the request message
+     * and reply a message. </p>
      *
      * <strong>Warn:</strong> this method has internal retry-mechanism, that 
is, internal implementation will retry
      * {@link #retryTimesWhenSendFailed} times before claiming failure. As a 
result, multiple messages may potentially
@@ -589,8 +602,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     }
 
     /**
-     * Request asynchronously. </p>
-     * This method returns immediately. On receiving reply message, 
<code>requestCallback</code> will be executed. </p>
+     * Request asynchronously. </p> This method returns immediately. On 
receiving reply message,
+     * <code>requestCallback</code> will be executed. </p>
      *
      * Similar to {@link #request(Message, long)}, internal implementation 
would potentially retry up to {@link
      * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, 
which may yield message duplication and
@@ -1062,8 +1075,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
     }
 
-    public TraceDispatcher getTraceDispatcher() {
+    public AsyncDispatcher getTraceDispatcher() {
         return traceDispatcher;
     }
-
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
deleted file mode 100644
index 06a28e4..0000000
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.common.ThreadLocalIndex;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
-import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MessageQueueSelector;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.remoting.RPCHook;
-
-import static 
org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
-
-public class AsyncTraceDispatcher implements TraceDispatcher {
-
-    private final static InternalLogger log = ClientLogger.getLog();
-    private final int queueSize;
-    private final int batchSize;
-    private final int maxMsgSize;
-    private final DefaultMQProducer traceProducer;
-    private final ThreadPoolExecutor traceExecutor;
-    // The last discard number of log
-    private AtomicLong discardCount;
-    private Thread worker;
-    private ArrayBlockingQueue<TraceContext> traceContextQueue;
-    private ArrayBlockingQueue<Runnable> appenderQueue;
-    private volatile Thread shutDownHook;
-    private volatile boolean stopped = false;
-    private DefaultMQProducerImpl hostProducer;
-    private DefaultMQPushConsumerImpl hostConsumer;
-    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
-    private String dispatcherId = UUID.randomUUID().toString();
-    private String traceTopicName;
-    private AtomicBoolean isStarted = new AtomicBoolean(false);
-    private AccessChannel accessChannel = AccessChannel.LOCAL;
-
-    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
-        // queueSize is greater than or equal to the n power of 2 of value
-        this.queueSize = 2048;
-        this.batchSize = 100;
-        this.maxMsgSize = 128000;
-        this.discardCount = new AtomicLong(0L);
-        this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
-        this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
-        if (!UtilAll.isBlank(traceTopicName)) {
-            this.traceTopicName = traceTopicName;
-        } else {
-            this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
-        }
-        this.traceExecutor = new ThreadPoolExecutor(//
-            10, //
-            20, //
-            1000 * 60, //
-            TimeUnit.MILLISECONDS, //
-            this.appenderQueue, //
-            new ThreadFactoryImpl("MQTraceSendThread_"));
-        traceProducer = getAndCreateTraceProducer(rpcHook);
-    }
-
-    public AccessChannel getAccessChannel() {
-        return accessChannel;
-    }
-
-    public void setAccessChannel(AccessChannel accessChannel) {
-        this.accessChannel = accessChannel;
-    }
-
-    public String getTraceTopicName() {
-        return traceTopicName;
-    }
-
-    public void setTraceTopicName(String traceTopicName) {
-        this.traceTopicName = traceTopicName;
-    }
-
-    public DefaultMQProducer getTraceProducer() {
-        return traceProducer;
-    }
-
-    public DefaultMQProducerImpl getHostProducer() {
-        return hostProducer;
-    }
-
-    public void setHostProducer(DefaultMQProducerImpl hostProducer) {
-        this.hostProducer = hostProducer;
-    }
-
-    public DefaultMQPushConsumerImpl getHostConsumer() {
-        return hostConsumer;
-    }
-
-    public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
-        this.hostConsumer = hostConsumer;
-    }
-
-    public void start(String nameSrvAddr, AccessChannel accessChannel) throws 
MQClientException {
-        if (isStarted.compareAndSet(false, true)) {
-            traceProducer.setNamesrvAddr(nameSrvAddr);
-            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + 
nameSrvAddr);
-            traceProducer.start();
-        }
-        this.accessChannel = accessChannel;
-        this.worker = new Thread(new AsyncRunnable(), 
"MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
-        this.worker.setDaemon(true);
-        this.worker.start();
-        this.registerShutDownHook();
-    }
-
-    private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
-        DefaultMQProducer traceProducerInstance = this.traceProducer;
-        if (traceProducerInstance == null) {
-            traceProducerInstance = new DefaultMQProducer(rpcHook);
-            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
-            traceProducerInstance.setSendMsgTimeout(5000);
-            traceProducerInstance.setVipChannelEnabled(false);
-            // The max size of message is 128K
-            traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
-        }
-        return traceProducerInstance;
-    }
-
-    @Override
-    public boolean append(final Object ctx) {
-        boolean result = traceContextQueue.offer((TraceContext) ctx);
-        if (!result) {
-            log.info("buffer full" + discardCount.incrementAndGet() + " 
,context is " + ctx);
-        }
-        return result;
-    }
-
-    @Override
-    public void flush() throws IOException {
-        // The maximum waiting time for refresh,avoid being written all the 
time, resulting in failure to return.
-        long end = System.currentTimeMillis() + 500;
-        while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && 
System.currentTimeMillis() <= end) {
-            try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
-        log.info("------end trace send " + traceContextQueue.size() + "   " + 
appenderQueue.size());
-    }
-
-    @Override
-    public void shutdown() {
-        this.stopped = true;
-        this.traceExecutor.shutdown();
-        if (isStarted.get()) {
-            traceProducer.shutdown();
-        }
-        this.removeShutdownHook();
-    }
-
-    public void registerShutDownHook() {
-        if (shutDownHook == null) {
-            shutDownHook = new Thread(new Runnable() {
-                private volatile boolean hasShutdown = false;
-
-                @Override
-                public void run() {
-                    synchronized (this) {
-                        if (!this.hasShutdown) {
-                            try {
-                                flush();
-                            } catch (IOException e) {
-                                log.error("system MQTrace hook shutdown failed 
,maybe loss some trace data");
-                            }
-                        }
-                    }
-                }
-            }, "ShutdownHookMQTrace");
-            Runtime.getRuntime().addShutdownHook(shutDownHook);
-        }
-    }
-
-    public void removeShutdownHook() {
-        if (shutDownHook != null) {
-            try {
-                Runtime.getRuntime().removeShutdownHook(shutDownHook);
-            } catch (IllegalStateException e) {
-                // ignore - VM is already shutting down
-            }
-        }
-    }
-
-    class AsyncRunnable implements Runnable {
-        private boolean stopped;
-
-        @Override
-        public void run() {
-            while (!stopped) {
-                List<TraceContext> contexts = new 
ArrayList<TraceContext>(batchSize);
-                for (int i = 0; i < batchSize; i++) {
-                    TraceContext context = null;
-                    try {
-                        //get trace data element from blocking Queue — 
traceContextQueue
-                        context = traceContextQueue.poll(5, 
TimeUnit.MILLISECONDS);
-                    } catch (InterruptedException e) {
-                    }
-                    if (context != null) {
-                        contexts.add(context);
-                    } else {
-                        break;
-                    }
-                }
-                if (contexts.size() > 0) {
-                    AsyncAppenderRequest request = new 
AsyncAppenderRequest(contexts);
-                    traceExecutor.submit(request);
-                } else if (AsyncTraceDispatcher.this.stopped) {
-                    this.stopped = true;
-                }
-            }
-
-        }
-    }
-
-    class AsyncAppenderRequest implements Runnable {
-        List<TraceContext> contextList;
-
-        public AsyncAppenderRequest(final List<TraceContext> contextList) {
-            if (contextList != null) {
-                this.contextList = contextList;
-            } else {
-                this.contextList = new ArrayList<TraceContext>(1);
-            }
-        }
-
-        @Override
-        public void run() {
-            sendTraceData(contextList);
-        }
-
-        public void sendTraceData(List<TraceContext> contextList) {
-            Map<String, List<TraceTransferBean>> transBeanMap = new 
HashMap<String, List<TraceTransferBean>>();
-            for (TraceContext context : contextList) {
-                if (context.getTraceBeans().isEmpty()) {
-                    continue;
-                }
-                // Topic value corresponding to original message entity content
-                String topic = context.getTraceBeans().get(0).getTopic();
-                String regionId = context.getRegionId();
-                // Use  original message entity's topic as key
-                String key = topic;
-                if (!StringUtils.isBlank(regionId)) {
-                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;
-                }
-                List<TraceTransferBean> transBeanList = transBeanMap.get(key);
-                if (transBeanList == null) {
-                    transBeanList = new ArrayList<TraceTransferBean>();
-                    transBeanMap.put(key, transBeanList);
-                }
-                TraceTransferBean traceData = 
TraceDataEncoder.encoderFromContextBean(context);
-                transBeanList.add(traceData);
-            }
-            for (Map.Entry<String, List<TraceTransferBean>> entry : 
transBeanMap.entrySet()) {
-                String[] key = 
entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
-                String dataTopic = entry.getKey();
-                String regionId = null;
-                if (key.length > 1) {
-                    dataTopic = key[0];
-                    regionId = key[1];
-                }
-                flushData(entry.getValue(), dataTopic, regionId);
-            }
-        }
-
-        /**
-         * Batch sending data actually
-         */
-        private void flushData(List<TraceTransferBean> transBeanList, String 
dataTopic, String regionId) {
-            if (transBeanList.size() == 0) {
-                return;
-            }
-            // Temporary buffer
-            StringBuilder buffer = new StringBuilder(1024);
-            int count = 0;
-            Set<String> keySet = new HashSet<String>();
-
-            for (TraceTransferBean bean : transBeanList) {
-                // Keyset of message trace includes msgId of or original 
message
-                keySet.addAll(bean.getTransKey());
-                buffer.append(bean.getTransData());
-                count++;
-                // Ensure that the size of the package should not exceed the 
upper limit.
-                if (buffer.length() >= traceProducer.getMaxMessageSize()) {
-                    sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, 
regionId);
-                    // Clear temporary buffer after finishing
-                    buffer.delete(0, buffer.length());
-                    keySet.clear();
-                    count = 0;
-                }
-            }
-            if (count > 0) {
-                sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, 
regionId);
-            }
-            transBeanList.clear();
-        }
-
-        /**
-         * Send message trace data
-         *
-         * @param keySet the keyset in this batch(including msgId in original 
message not offsetMsgId)
-         * @param data the message trace data in this batch
-         */
-        private void sendTraceDataByMQ(Set<String> keySet, final String data, 
String dataTopic, String regionId) {
-            String traceTopic = traceTopicName;
-            if (AccessChannel.CLOUD == accessChannel) {
-                traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
-            }
-            final Message message = new Message(traceTopic, data.getBytes());
-            // Keyset of message trace includes msgId of or original message
-            message.setKeys(keySet);
-            try {
-                Set<String> traceBrokerSet = 
tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), 
traceTopic);
-                SendCallback callback = new SendCallback() {
-                    @Override
-                    public void onSuccess(SendResult sendResult) {
-
-                    }
-
-                    @Override
-                    public void onException(Throwable e) {
-                        log.info("send trace data ,the traceData is " + data);
-                    }
-                };
-                if (traceBrokerSet.isEmpty()) {
-                    // No cross set
-                    traceProducer.send(message, callback, 5000);
-                } else {
-                    traceProducer.send(message, new MessageQueueSelector() {
-                        @Override
-                        public MessageQueue select(List<MessageQueue> mqs, 
Message msg, Object arg) {
-                            Set<String> brokerSet = (Set<String>) arg;
-                            List<MessageQueue> filterMqs = new 
ArrayList<MessageQueue>();
-                            for (MessageQueue queue : mqs) {
-                                if (brokerSet.contains(queue.getBrokerName())) 
{
-                                    filterMqs.add(queue);
-                                }
-                            }
-                            int index = sendWhichQueue.getAndIncrement();
-                            int pos = Math.abs(index) % filterMqs.size();
-                            if (pos < 0) {
-                                pos = 0;
-                            }
-                            return filterMqs.get(pos);
-                        }
-                    }, traceBrokerSet, callback);
-                }
-
-            } catch (Exception e) {
-                log.info("send trace data,the traceData is" + data);
-            }
-        }
-
-        private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl 
producer, String topic) {
-            Set<String> brokerSet = new HashSet<String>();
-            TopicPublishInfo topicPublishInfo = 
producer.getTopicPublishInfoTable().get(topic);
-            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
-                producer.getTopicPublishInfoTable().putIfAbsent(topic, new 
TopicPublishInfo());
-                
producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
-                topicPublishInfo = 
producer.getTopicPublishInfoTable().get(topic);
-            }
-            if (topicPublishInfo.isHaveTopicRouterInfo() || 
topicPublishInfo.ok()) {
-                for (MessageQueue queue : 
topicPublishInfo.getMessageQueueList()) {
-                    brokerSet.add(queue.getBrokerName());
-                }
-            }
-            return brokerSet;
-        }
-    }
-
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
deleted file mode 100644
index f93aa38..0000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageType;
-
-public class TraceBean {
-    private static final String LOCAL_ADDRESS = 
UtilAll.ipToIPv4Str(UtilAll.getIP());
-    private String topic = "";
-    private String msgId = "";
-    private String offsetMsgId = "";
-    private String tags = "";
-    private String keys = "";
-    private String storeHost = LOCAL_ADDRESS;
-    private String clientHost = LOCAL_ADDRESS;
-    private long storeTime;
-    private int retryTimes;
-    private int bodyLength;
-    private MessageType msgType;
-
-
-    public MessageType getMsgType() {
-        return msgType;
-    }
-
-
-    public void setMsgType(final MessageType msgType) {
-        this.msgType = msgType;
-    }
-
-
-    public String getOffsetMsgId() {
-        return offsetMsgId;
-    }
-
-
-    public void setOffsetMsgId(final String offsetMsgId) {
-        this.offsetMsgId = offsetMsgId;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-
-    public String getMsgId() {
-        return msgId;
-    }
-
-
-    public void setMsgId(String msgId) {
-        this.msgId = msgId;
-    }
-
-
-    public String getTags() {
-        return tags;
-    }
-
-
-    public void setTags(String tags) {
-        this.tags = tags;
-    }
-
-
-    public String getKeys() {
-        return keys;
-    }
-
-
-    public void setKeys(String keys) {
-        this.keys = keys;
-    }
-
-
-    public String getStoreHost() {
-        return storeHost;
-    }
-
-
-    public void setStoreHost(String storeHost) {
-        this.storeHost = storeHost;
-    }
-
-
-    public String getClientHost() {
-        return clientHost;
-    }
-
-
-    public void setClientHost(String clientHost) {
-        this.clientHost = clientHost;
-    }
-
-
-    public long getStoreTime() {
-        return storeTime;
-    }
-
-
-    public void setStoreTime(long storeTime) {
-        this.storeTime = storeTime;
-    }
-
-
-    public int getRetryTimes() {
-        return retryTimes;
-    }
-
-
-    public void setRetryTimes(int retryTimes) {
-        this.retryTimes = retryTimes;
-    }
-
-
-    public int getBodyLength() {
-        return bodyLength;
-    }
-
-
-    public void setBodyLength(int bodyLength) {
-        this.bodyLength = bodyLength;
-    }
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
deleted file mode 100644
index e61ea9d..0000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.common.MixAll;
-
-public class TraceConstants {
-
-    public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
-    public static final char CONTENT_SPLITOR = (char) 1;
-    public static final char FIELD_SPLITOR = (char) 2;
-    public static final String TRACE_INSTANCE_NAME = 
"PID_CLIENT_INNER_TRACE_PRODUCER";
-    public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX 
+ "TRACE_DATA_";
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
deleted file mode 100644
index f61ba88..0000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
-
-import java.util.List;
-
-/**
- * The context of Trace
- */
-public class TraceContext implements Comparable<TraceContext> {
-
-    private TraceType traceType;
-    private long timeStamp = System.currentTimeMillis();
-    private String regionId = "";
-    private String regionName = "";
-    private String groupName = "";
-    private int costTime = 0;
-    private boolean isSuccess = true;
-    private String requestId = MessageClientIDSetter.createUniqID();
-    private int contextCode = 0;
-    private List<TraceBean> traceBeans;
-
-    public int getContextCode() {
-        return contextCode;
-    }
-
-    public void setContextCode(final int contextCode) {
-        this.contextCode = contextCode;
-    }
-
-    public List<TraceBean> getTraceBeans() {
-        return traceBeans;
-    }
-
-    public void setTraceBeans(List<TraceBean> traceBeans) {
-        this.traceBeans = traceBeans;
-    }
-
-    public String getRegionId() {
-        return regionId;
-    }
-
-    public void setRegionId(String regionId) {
-        this.regionId = regionId;
-    }
-
-    public TraceType getTraceType() {
-        return traceType;
-    }
-
-    public void setTraceType(TraceType traceType) {
-        this.traceType = traceType;
-    }
-
-    public long getTimeStamp() {
-        return timeStamp;
-    }
-
-    public void setTimeStamp(long timeStamp) {
-        this.timeStamp = timeStamp;
-    }
-
-    public String getGroupName() {
-        return groupName;
-    }
-
-    public void setGroupName(String groupName) {
-        this.groupName = groupName;
-    }
-
-    public int getCostTime() {
-        return costTime;
-    }
-
-    public void setCostTime(int costTime) {
-        this.costTime = costTime;
-    }
-
-    public boolean isSuccess() {
-        return isSuccess;
-    }
-
-    public void setSuccess(boolean success) {
-        isSuccess = success;
-    }
-
-    public String getRequestId() {
-        return requestId;
-    }
-
-    public void setRequestId(String requestId) {
-        this.requestId = requestId;
-    }
-
-    public String getRegionName() {
-        return regionName;
-    }
-
-    public void setRegionName(String regionName) {
-        this.regionName = regionName;
-    }
-
-    @Override
-    public int compareTo(TraceContext o) {
-        return (int) (this.timeStamp - o.getTimeStamp());
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder(1024);
-        sb.append(traceType).append("_").append(groupName)
-            
.append("_").append(regionId).append("_").append(isSuccess).append("_");
-        if (traceBeans != null && traceBeans.size() > 0) {
-            for (TraceBean bean : traceBeans) {
-                sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_");
-            }
-        }
-        return "TraceContext{" + sb.toString() + '}';
-    }
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
deleted file mode 100644
index 5a1afaf..0000000
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.common.message.MessageType;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Encode/decode for Trace Data
- */
-public class TraceDataEncoder {
-
-    /**
-     * Resolving traceContext list From trace data String
-     *
-     * @param traceData
-     * @return
-     */
-    public static List<TraceContext> decoderFromTraceDataString(String 
traceData) {
-        List<TraceContext> resList = new ArrayList<TraceContext>();
-        if (traceData == null || traceData.length() <= 0) {
-            return resList;
-        }
-        String[] contextList = 
traceData.split(String.valueOf(TraceConstants.FIELD_SPLITOR));
-        for (String context : contextList) {
-            String[] line = 
context.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
-            if (line[0].equals(TraceType.Pub.name())) {
-                TraceContext pubContext = new TraceContext();
-                pubContext.setTraceType(TraceType.Pub);
-                pubContext.setTimeStamp(Long.parseLong(line[1]));
-                pubContext.setRegionId(line[2]);
-                pubContext.setGroupName(line[3]);
-                TraceBean bean = new TraceBean();
-                bean.setTopic(line[4]);
-                bean.setMsgId(line[5]);
-                bean.setTags(line[6]);
-                bean.setKeys(line[7]);
-                bean.setStoreHost(line[8]);
-                bean.setBodyLength(Integer.parseInt(line[9]));
-                pubContext.setCostTime(Integer.parseInt(line[10]));
-                
bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]);
-
-                if (line.length == 13) {
-                    pubContext.setSuccess(Boolean.parseBoolean(line[12]));
-                } else if (line.length == 14) {
-                    bean.setOffsetMsgId(line[12]);
-                    pubContext.setSuccess(Boolean.parseBoolean(line[13]));
-                }
-                pubContext.setTraceBeans(new ArrayList<TraceBean>(1));
-                pubContext.getTraceBeans().add(bean);
-                resList.add(pubContext);
-            } else if (line[0].equals(TraceType.SubBefore.name())) {
-                TraceContext subBeforeContext = new TraceContext();
-                subBeforeContext.setTraceType(TraceType.SubBefore);
-                subBeforeContext.setTimeStamp(Long.parseLong(line[1]));
-                subBeforeContext.setRegionId(line[2]);
-                subBeforeContext.setGroupName(line[3]);
-                subBeforeContext.setRequestId(line[4]);
-                TraceBean bean = new TraceBean();
-                bean.setMsgId(line[5]);
-                bean.setRetryTimes(Integer.parseInt(line[6]));
-                bean.setKeys(line[7]);
-                subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
-                subBeforeContext.getTraceBeans().add(bean);
-                resList.add(subBeforeContext);
-            } else if (line[0].equals(TraceType.SubAfter.name())) {
-                TraceContext subAfterContext = new TraceContext();
-                subAfterContext.setTraceType(TraceType.SubAfter);
-                subAfterContext.setRequestId(line[1]);
-                TraceBean bean = new TraceBean();
-                bean.setMsgId(line[2]);
-                bean.setKeys(line[5]);
-                subAfterContext.setTraceBeans(new ArrayList<TraceBean>(1));
-                subAfterContext.getTraceBeans().add(bean);
-                subAfterContext.setCostTime(Integer.parseInt(line[3]));
-                subAfterContext.setSuccess(Boolean.parseBoolean(line[4]));
-                if (line.length >= 7) {
-                    // add the context type
-                    subAfterContext.setContextCode(Integer.parseInt(line[6]));
-                }
-                resList.add(subAfterContext);
-            }
-        }
-        return resList;
-    }
-
-    /**
-     * Encoding the trace context into data strings and keyset sets
-     *
-     * @param ctx
-     * @return
-     */
-    public static TraceTransferBean encoderFromContextBean(TraceContext ctx) {
-        if (ctx == null) {
-            return null;
-        }
-        //build message trace of the transfering entity content bean
-        TraceTransferBean transferBean = new TraceTransferBean();
-        StringBuilder sb = new StringBuilder(256);
-        switch (ctx.getTraceType()) {
-            case Pub: {
-                TraceBean bean = ctx.getTraceBeans().get(0);
-                //append the content of context and traceBean to 
transferBean's TransData
-                
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
-                    
.append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
-            }
-            break;
-            case SubBefore: {
-                for (TraceBean bean : ctx.getTraceBeans()) {
-                    
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
-                }
-            }
-            break;
-            case SubAfter: {
-                for (TraceBean bean : ctx.getTraceBeans()) {
-                    
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
-                }
-            }
-            break;
-            default:
-        }
-        transferBean.setTransData(sb.toString());
-        for (TraceBean bean : ctx.getTraceBeans()) {
-
-            transferBean.getTransKey().add(bean.getMsgId());
-            if (bean.getKeys() != null && bean.getKeys().length() > 0) {
-                transferBean.getTransKey().add(bean.getKeys());
-            }
-        }
-        return transferBean;
-    }
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
deleted file mode 100644
index 51cc0de..0000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.exception.MQClientException;
-import java.io.IOException;
-
-/**
- * Interface of asynchronous transfer data
- */
-public interface TraceDispatcher {
-
-    /**
-     * Initialize asynchronous transfer data module
-     */
-    void start(String nameSrvAddr, AccessChannel accessChannel) throws 
MQClientException;
-
-    /**
-     * Append the transfering data
-     * @param ctx data infomation
-     * @return
-     */
-    boolean append(Object ctx);
-
-    /**
-     * Write flush action
-     *
-     * @throws IOException
-     */
-    void flush() throws IOException;
-
-    /**
-     * Close the trace Hook
-     */
-    void shutdown();
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java
deleted file mode 100644
index f09c9b8..0000000
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-public enum TraceDispatcherType {
-    PRODUCER,
-    CONSUMER
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
deleted file mode 100644
index 052ca36..0000000
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Trace transfering bean
- */
-public class TraceTransferBean {
-    private String transData;
-    private Set<String> transKey = new HashSet<String>();
-
-    public String getTransData() {
-        return transData;
-    }
-
-    public void setTransData(String transData) {
-        this.transData = transData;
-    }
-
-    public Set<String> getTransKey() {
-        return transKey;
-    }
-
-    public void setTransKey(Set<String> transKey) {
-        this.transKey = transKey;
-    }
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
deleted file mode 100644
index 79b19c1..0000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-public enum TraceType {
-    Pub,
-    SubBefore,
-    SubAfter,
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
deleted file mode 100644
index f30b121..0000000
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace.hook;
-
-import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
-import org.apache.rocketmq.client.hook.ConsumeMessageContext;
-import org.apache.rocketmq.client.hook.ConsumeMessageHook;
-import org.apache.rocketmq.client.trace.TraceContext;
-import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceBean;
-import org.apache.rocketmq.client.trace.TraceType;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-
-public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
-
-    private TraceDispatcher localDispatcher;
-
-    public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {
-        this.localDispatcher = localDispatcher;
-    }
-
-    @Override
-    public String hookName() {
-        return "ConsumeMessageTraceHook";
-    }
-
-    @Override
-    public void consumeMessageBefore(ConsumeMessageContext context) {
-        if (context == null || context.getMsgList() == null || 
context.getMsgList().isEmpty()) {
-            return;
-        }
-        TraceContext traceContext = new TraceContext();
-        context.setMqTraceContext(traceContext);
-        traceContext.setTraceType(TraceType.SubBefore);//
-        
traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
-        List<TraceBean> beans = new ArrayList<TraceBean>();
-        for (MessageExt msg : context.getMsgList()) {
-            if (msg == null) {
-                continue;
-            }
-            String regionId = 
msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
-            String traceOn = 
msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
-
-            if (traceOn != null && traceOn.equals("false")) {
-                // If trace switch is false ,skip it
-                continue;
-            }
-            TraceBean traceBean = new TraceBean();
-            
traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
-            traceBean.setMsgId(msg.getMsgId());//
-            traceBean.setTags(msg.getTags());//
-            traceBean.setKeys(msg.getKeys());//
-            traceBean.setStoreTime(msg.getStoreTimestamp());//
-            traceBean.setBodyLength(msg.getStoreSize());//
-            traceBean.setRetryTimes(msg.getReconsumeTimes());//
-            traceContext.setRegionId(regionId);//
-            beans.add(traceBean);
-        }
-        if (beans.size() > 0) {
-            traceContext.setTraceBeans(beans);
-            traceContext.setTimeStamp(System.currentTimeMillis());
-            localDispatcher.append(traceContext);
-        }
-    }
-
-    @Override
-    public void consumeMessageAfter(ConsumeMessageContext context) {
-        if (context == null || context.getMsgList() == null || 
context.getMsgList().isEmpty()) {
-            return;
-        }
-        TraceContext subBeforeContext = (TraceContext) 
context.getMqTraceContext();
-
-        if (subBeforeContext.getTraceBeans() == null || 
subBeforeContext.getTraceBeans().size() < 1) {
-            // If subbefore bean is null ,skip it
-            return;
-        }
-        TraceContext subAfterContext = new TraceContext();
-        subAfterContext.setTraceType(TraceType.SubAfter);//
-        subAfterContext.setRegionId(subBeforeContext.getRegionId());//
-        
subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
-        subAfterContext.setRequestId(subBeforeContext.getRequestId());//
-        subAfterContext.setSuccess(context.isSuccess());//
-
-        // Caculate the cost time for processing messages
-        int costTime = (int) ((System.currentTimeMillis() - 
subBeforeContext.getTimeStamp()) / context.getMsgList().size());
-        subAfterContext.setCostTime(costTime);//
-        subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
-        String contextType = 
context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
-        if (contextType != null) {
-            
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
-        }
-        localDispatcher.append(subAfterContext);
-    }
-}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
deleted file mode 100644
index 80c7bab..0000000
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace.hook;
-
-import java.util.ArrayList;
-import org.apache.rocketmq.client.hook.SendMessageContext;
-import org.apache.rocketmq.client.hook.SendMessageHook;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceBean;
-import org.apache.rocketmq.client.trace.TraceContext;
-import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceType;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-
-public class SendMessageTraceHookImpl implements SendMessageHook {
-
-    private TraceDispatcher localDispatcher;
-
-    public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
-        this.localDispatcher = localDispatcher;
-    }
-
-    @Override
-    public String hookName() {
-        return "SendMessageTraceHook";
-    }
-
-    @Override
-    public void sendMessageBefore(SendMessageContext context) {
-        //if it is message trace data,then it doesn't recorded
-        if (context == null || 
context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) 
localDispatcher).getTraceTopicName())) {
-            return;
-        }
-        //build the context content of TuxeTraceContext
-        TraceContext tuxeContext = new TraceContext();
-        tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
-        context.setMqTraceContext(tuxeContext);
-        tuxeContext.setTraceType(TraceType.Pub);
-        
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
-        //build the data bean object of message trace
-        TraceBean traceBean = new TraceBean();
-        
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
-        traceBean.setTags(context.getMessage().getTags());
-        traceBean.setKeys(context.getMessage().getKeys());
-        traceBean.setStoreHost(context.getBrokerAddr());
-        traceBean.setBodyLength(context.getMessage().getBody().length);
-        traceBean.setMsgType(context.getMsgType());
-        tuxeContext.getTraceBeans().add(traceBean);
-    }
-
-    @Override
-    public void sendMessageAfter(SendMessageContext context) {
-        //if it is message trace data,then it doesn't recorded
-        if (context == null || 
context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) 
localDispatcher).getTraceTopicName())
-            || context.getMqTraceContext() == null) {
-            return;
-        }
-        if (context.getSendResult() == null) {
-            return;
-        }
-
-        if (context.getSendResult().getRegionId() == null
-            || !context.getSendResult().isTraceOn()) {
-            // if switch is false,skip it
-            return;
-        }
-
-        TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
-        TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
-        int costTime = (int) ((System.currentTimeMillis() - 
tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
-        tuxeContext.setCostTime(costTime);
-        if 
(context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
-            tuxeContext.setSuccess(true);
-        } else {
-            tuxeContext.setSuccess(false);
-        }
-        tuxeContext.setRegionId(context.getSendResult().getRegionId());
-        traceBean.setMsgId(context.getSendResult().getMsgId());
-        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
-        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
-        localDispatcher.append(tuxeContext);
-    }
-}
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index 496c514..83474f0 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -63,6 +63,7 @@ import 
org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import 
org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
 import org.junit.Before;
@@ -104,7 +105,7 @@ public class DefaultMQConsumerWithTraceTest {
     private DefaultMQPushConsumer normalPushConsumer;
     private DefaultMQPushConsumer customTraceTopicpushConsumer;
 
-    private AsyncTraceDispatcher asyncTraceDispatcher;
+    private AsyncArrayDispatcher asyncTraceDispatcher;
     private MQClientInstance mQClientTraceFactory;
     @Mock
     private MQClientAPIImpl mQClientTraceAPIImpl;
@@ -121,7 +122,7 @@ public class DefaultMQConsumerWithTraceTest {
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
         pushConsumer.setPullInterval(60 * 1000);
 
-        asyncTraceDispatcher = (AsyncTraceDispatcher) 
pushConsumer.getTraceDispatcher();
+        asyncTraceDispatcher = (AsyncArrayDispatcher) 
pushConsumer.getTraceDispatcher();
         traceProducer = asyncTraceDispatcher.getTraceProducer();
 
         pushConsumer.registerMessageListener(new MessageListenerConcurrently() 
{
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 3759acb..392b5c9 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -37,6 +37,7 @@ import 
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import 
org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
 import org.junit.Before;
@@ -64,7 +65,7 @@ public class DefaultMQProducerWithTraceTest {
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
 
-    private AsyncTraceDispatcher asyncTraceDispatcher;
+    private AsyncArrayDispatcher asyncTraceDispatcher;
 
     private DefaultMQProducer producer;
     private DefaultMQProducer customTraceTopicproducer;
@@ -88,8 +89,8 @@ public class DefaultMQProducerWithTraceTest {
         normalProducer.setNamesrvAddr("127.0.0.1:9877");
         customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
         message = new Message(topic, new byte[] {'a', 'b', 'c'});
-        asyncTraceDispatcher = (AsyncTraceDispatcher) 
producer.getTraceDispatcher();
-        asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
+        asyncTraceDispatcher = (AsyncArrayDispatcher) 
producer.getTraceDispatcher();
+        asyncTraceDispatcher.setCustomizedTraceTopic(customerTraceTopic);
         asyncTraceDispatcher.getHostProducer();
         asyncTraceDispatcher.getHostConsumer();
         traceProducer = asyncTraceDispatcher.getTraceProducer();
diff --git a/pom.xml b/pom.xml
index f8a391b..f6db46b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -625,8 +625,11 @@
                 <artifactId>commons-validator</artifactId>
                 <version>1.6</version>
             </dependency>
-
-
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>ons-trace-core</artifactId>
+                <version>1.2.0-SNAPSHOT</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>

Reply via email to