This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 16ef57553 [ISSUE #4612] Fix trace not complete (#6941)
16ef57553 is described below
commit 16ef5755375e7c8f4fb11dd63f5fdfdfa25668e7
Author: panzhi <[email protected]>
AuthorDate: Sun Jun 25 14:44:56 2023 +0800
[ISSUE #4612] Fix trace not complete (#6941)
---
.../apache/rocketmq/client/hook/ConsumeMessageContext.java | 11 +++++++++++
.../impl/consumer/ConsumeMessageConcurrentlyService.java | 1 +
.../client/impl/consumer/ConsumeMessageOrderlyService.java | 1 +
.../impl/consumer/ConsumeMessagePopConcurrentlyService.java | 1 +
.../client/impl/consumer/DefaultLitePullConsumerImpl.java | 1 +
.../client/impl/consumer/DefaultMQPullConsumerImpl.java | 1 +
.../java/org/apache/rocketmq/client/trace/TraceContext.java | 10 ++++++++++
.../org/apache/rocketmq/client/trace/TraceDataEncoder.java | 9 ++++++---
.../client/trace/hook/ConsumeMessageTraceHookImpl.java | 1 +
.../apache/rocketmq/client/trace/TraceDataEncoderTest.java | 2 ++
10 files changed, 35 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
index 835852e9e..94633cea8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
+++
b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.client.hook;
import java.util.List;
import java.util.Map;
+
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -30,6 +32,7 @@ public class ConsumeMessageContext {
private Object mqTraceContext;
private Map<String, String> props;
private String namespace;
+ private AccessChannel accessChannel;
public String getConsumerGroup() {
return consumerGroup;
@@ -94,4 +97,12 @@ public class ConsumeMessageContext {
public void setNamespace(String namespace) {
this.namespace = namespace;
}
+
+ public AccessChannel getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(AccessChannel accessChannel) {
+ this.accessChannel = accessChannel;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index c915cce81..ea6c8072b 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -447,6 +447,7 @@ public class ConsumeMessageConcurrentlyService implements
ConsumeMessageService
if
(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS ==
status);
+
consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index f9c00839c..4246768d4 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -543,6 +543,7 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS
== status || ConsumeOrderlyStatus.COMMIT == status);
+
consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
index c2b39ad7b..a61454f59 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
@@ -457,6 +457,7 @@ public class ConsumeMessagePopConcurrentlyService
implements ConsumeMessageServi
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE,
returnType.name());
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS ==
status);
+
consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());
ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 2d37581bb..20ca47700 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -632,6 +632,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
+
consumeMessageContext.setAccessChannel(defaultLitePullConsumer.getAccessChannel());
this.executeHookAfter(consumeMessageContext);
}
consumeRequest.getProcessQueue().setLastConsumeTimestamp(System.currentTimeMillis());
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 3348f3192..e6d148c7f 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -278,6 +278,7 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
+
consumeMessageContext.setAccessChannel(defaultMQPullConsumer.getAccessChannel());
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
index 96dc1df18..a1f632e02 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import java.util.List;
@@ -34,6 +35,7 @@ public class TraceContext implements Comparable<TraceContext>
{
private boolean isSuccess = true;
private String requestId = MessageClientIDSetter.createUniqID();
private int contextCode = 0;
+ private AccessChannel accessChannel;
private List<TraceBean> traceBeans;
public int getContextCode() {
@@ -116,6 +118,14 @@ public class TraceContext implements
Comparable<TraceContext> {
this.regionName = regionName;
}
+ public AccessChannel getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(AccessChannel accessChannel) {
+ this.accessChannel = accessChannel;
+ }
+
@Override
public int compareTo(TraceContext o) {
return Long.compare(this.timeStamp, o.getTimeStamp());
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index 918422264..0fdd95243 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageType;
@@ -190,9 +191,11 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
-
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
-
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
-
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR);
+ if (!ctx.getAccessChannel().equals(AccessChannel.CLOUD)) {
+
sb.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
+
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+ }
}
}
break;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
index 6db8a177f..f23a4ff0a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
@@ -99,6 +99,7 @@ public class ConsumeMessageTraceHookImpl implements
ConsumeMessageHook {
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
+ subAfterContext.setAccessChannel(context.getAccessChannel());
subAfterContext.setSuccess(context.isSuccess());//
// Calculate the cost time for processing messages
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
index 763de9f3b..26b7bda59 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
@@ -195,6 +196,7 @@ public class TraceDataEncoderTest {
subAfterContext.setTimeStamp(1625883640000L);
subAfterContext.setGroupName("GroupName-test");
subAfterContext.setContextCode(98623046);
+ subAfterContext.setAccessChannel(AccessChannel.LOCAL);
TraceBean bean = new TraceBean();
bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
bean.setKeys("keys");