This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch enchanced_msg_trace
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/enchanced_msg_trace by this 
push:
     new 0c5dae7  [ISSSUE 1188]Fix the problem when more than one producer or 
consumer in the same process can trace only one (#1275)
0c5dae7 is described below

commit 0c5dae7cbcf6bad680fffb38a5e0eed018555aea
Author: zhengwen zhu <[email protected]>
AuthorDate: Sun Jun 23 19:51:18 2019 +0800

    [ISSSUE 1188]Fix the problem when more than one producer or consumer in the 
same process can trace only one (#1275)
    
    * fix trace problem when multi produce/consumer in the same process
    
    * uniform parameter manner
    
    * variable rename
    
    * consumer groups may be same with the producer group
---
 .../rocketmq/client/consumer/DefaultMQPushConsumer.java     |  2 +-
 .../apache/rocketmq/client/producer/DefaultMQProducer.java  |  4 ++--
 .../apache/rocketmq/client/trace/AsyncTraceDispatcher.java  | 13 +++++++++++--
 .../org/apache/rocketmq/client/trace/TraceConstants.java    |  2 +-
 .../org/apache/rocketmq/client/trace/TraceDispatcher.java   |  5 ++++-
 5 files changed, 19 insertions(+), 7 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 339f799..6ad0fc3 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
         if (enableMsgTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
+                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, 
customizedTraceTopic, rpcHook);
                 
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
                 traceDispatcher = dispatcher;
                 this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index b4acf8f..9b36cf0 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -171,7 +171,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         //if client open the message trace feature
         if (enableMsgTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
+                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, 
customizedTraceTopic, rpcHook);
                 dispatcher.setHostProducer(this.defaultMQProducerImpl);
                 traceDispatcher = dispatcher;
                 this.defaultMQProducerImpl.registerSendMessageHook(
@@ -256,7 +256,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         //if client open the message trace feature
         if (enableMsgTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
+                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, 
customizedTraceTopic, rpcHook);
                 dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                 traceDispatcher = dispatcher;
                 this.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index ca3bcfa..b987d96 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
     private String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
     private AccessChannel accessChannel = AccessChannel.LOCAL;
+    private String group;
+    private Type type;
 
-    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
+    public AsyncTraceDispatcher(String group, Type type,String traceTopicName, 
RPCHook rpcHook) {
         // queueSize is greater than or equal to the n power of 2 of value
         this.queueSize = 2048;
         this.batchSize = 100;
         this.maxMsgSize = 128000;
         this.discardCount = new AtomicLong(0L);
         this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
+        this.group = group;
+        this.type = type;
+
         this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
         if (!UtilAll.isBlank(traceTopicName)) {
             this.traceTopicName = traceTopicName;
@@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
         DefaultMQProducer traceProducerInstance = this.traceProducer;
         if (traceProducerInstance == null) {
             traceProducerInstance = new DefaultMQProducer(rpcHook);
-            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
+            traceProducerInstance.setProducerGroup(genGroupNameForTrace());
             traceProducerInstance.setSendMsgTimeout(5000);
             traceProducerInstance.setVipChannelEnabled(false);
             // The max size of message is 128K
@@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
         return traceProducerInstance;
     }
 
+    private String genGroupNameForTrace() {
+        return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + 
this.type ;
+    }
+
     @Override
     public boolean append(final Object ctx) {
         boolean result = traceContextQueue.offer((TraceContext) ctx);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
index e61ea9d..cb4a246 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
 
 public class TraceConstants {
 
-    public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
+    public static final String GROUP_NAME_PREFIX = "_INNER_TRACE_PRODUCER";
     public static final char CONTENT_SPLITOR = (char) 1;
     public static final char FIELD_SPLITOR = (char) 2;
     public static final String TRACE_INSTANCE_NAME = 
"PID_CLIENT_INNER_TRACE_PRODUCER";
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
index 51cc0de..33341cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
@@ -24,7 +24,10 @@ import java.io.IOException;
  * Interface of asynchronous transfer data
  */
 public interface TraceDispatcher {
-
+    enum Type {
+        PRODUCE,
+        CONSUME
+    }
     /**
      * Initialize asynchronous transfer data module
      */

Reply via email to