This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 16ef57553 [ISSUE #4612] Fix trace not complete (#6941)
16ef57553 is described below

commit 16ef5755375e7c8f4fb11dd63f5fdfdfa25668e7
Author: panzhi <[email protected]>
AuthorDate: Sun Jun 25 14:44:56 2023 +0800

    [ISSUE #4612] Fix trace not complete (#6941)
---
 .../apache/rocketmq/client/hook/ConsumeMessageContext.java    | 11 +++++++++++
 .../impl/consumer/ConsumeMessageConcurrentlyService.java      |  1 +
 .../client/impl/consumer/ConsumeMessageOrderlyService.java    |  1 +
 .../impl/consumer/ConsumeMessagePopConcurrentlyService.java   |  1 +
 .../client/impl/consumer/DefaultLitePullConsumerImpl.java     |  1 +
 .../client/impl/consumer/DefaultMQPullConsumerImpl.java       |  1 +
 .../java/org/apache/rocketmq/client/trace/TraceContext.java   | 10 ++++++++++
 .../org/apache/rocketmq/client/trace/TraceDataEncoder.java    |  9 ++++++---
 .../client/trace/hook/ConsumeMessageTraceHookImpl.java        |  1 +
 .../apache/rocketmq/client/trace/TraceDataEncoderTest.java    |  2 ++
 10 files changed, 35 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
 
b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
index 835852e9e..94633cea8 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.client.hook;
 
 import java.util.List;
 import java.util.Map;
+
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 
@@ -30,6 +32,7 @@ public class ConsumeMessageContext {
     private Object mqTraceContext;
     private Map<String, String> props;
     private String namespace;
+    private AccessChannel accessChannel;
 
     public String getConsumerGroup() {
         return consumerGroup;
@@ -94,4 +97,12 @@ public class ConsumeMessageContext {
     public void setNamespace(String namespace) {
         this.namespace = namespace;
     }
+
+    public AccessChannel getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(AccessChannel accessChannel) {
+        this.accessChannel = accessChannel;
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index c915cce81..ea6c8072b 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -447,6 +447,7 @@ public class ConsumeMessageConcurrentlyService implements 
ConsumeMessageService
             if 
(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                 consumeMessageContext.setStatus(status.toString());
                 
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == 
status);
+                
consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());
                 
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
             }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index f9c00839c..4246768d4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -543,6 +543,7 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
                                 
consumeMessageContext.setStatus(status.toString());
                                 consumeMessageContext
                                     .setSuccess(ConsumeOrderlyStatus.SUCCESS 
== status || ConsumeOrderlyStatus.COMMIT == status);
+                                
consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());
                                 
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                             }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
index c2b39ad7b..a61454f59 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
@@ -457,6 +457,7 @@ public class ConsumeMessagePopConcurrentlyService 
implements ConsumeMessageServi
                 
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, 
returnType.name());
                 consumeMessageContext.setStatus(status.toString());
                 
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == 
status);
+                
consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());
                 
ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
             }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 2d37581bb..20ca47700 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -632,6 +632,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                     this.executeHookBefore(consumeMessageContext);
                     
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
                     consumeMessageContext.setSuccess(true);
+                    
consumeMessageContext.setAccessChannel(defaultLitePullConsumer.getAccessChannel());
                     this.executeHookAfter(consumeMessageContext);
                 }
                 
consumeRequest.getProcessQueue().setLastConsumeTimestamp(System.currentTimeMillis());
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 3348f3192..e6d148c7f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -278,6 +278,7 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
             this.executeHookBefore(consumeMessageContext);
             
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
             consumeMessageContext.setSuccess(true);
+            
consumeMessageContext.setAccessChannel(defaultMQPullConsumer.getAccessChannel());
             this.executeHookAfter(consumeMessageContext);
         }
         return pullResult;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
index 96dc1df18..a1f632e02 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 
 import java.util.List;
@@ -34,6 +35,7 @@ public class TraceContext implements Comparable<TraceContext> 
{
     private boolean isSuccess = true;
     private String requestId = MessageClientIDSetter.createUniqID();
     private int contextCode = 0;
+    private AccessChannel accessChannel;
     private List<TraceBean> traceBeans;
 
     public int getContextCode() {
@@ -116,6 +118,14 @@ public class TraceContext implements 
Comparable<TraceContext> {
         this.regionName = regionName;
     }
 
+    public AccessChannel getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(AccessChannel accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     @Override
     public int compareTo(TraceContext o) {
         return Long.compare(this.timeStamp, o.getTimeStamp());
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index 918422264..0fdd95243 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageType;
@@ -190,9 +191,11 @@ public class TraceDataEncoder {
                         
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
                         
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
                         
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
-                        
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
-                        
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
-                        
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+                        
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR);
+                    if (!ctx.getAccessChannel().equals(AccessChannel.CLOUD)) {
+                        
sb.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
+                            
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+                    }
                 }
             }
             break;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
index 6db8a177f..f23a4ff0a 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
@@ -99,6 +99,7 @@ public class ConsumeMessageTraceHookImpl implements 
ConsumeMessageHook {
         subAfterContext.setRegionId(subBeforeContext.getRegionId());//
         
subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
         subAfterContext.setRequestId(subBeforeContext.getRequestId());//
+        subAfterContext.setAccessChannel(context.getAccessChannel());
         subAfterContext.setSuccess(context.isSuccess());//
 
         // Calculate the cost time for processing messages
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
index 763de9f3b..26b7bda59 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.common.message.MessageType;
 import org.junit.Assert;
@@ -195,6 +196,7 @@ public class TraceDataEncoderTest {
         subAfterContext.setTimeStamp(1625883640000L);
         subAfterContext.setGroupName("GroupName-test");
         subAfterContext.setContextCode(98623046);
+        subAfterContext.setAccessChannel(AccessChannel.LOCAL);
         TraceBean bean = new TraceBean();
         bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
         bean.setKeys("keys");

Reply via email to