This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new 5121e2ba0 [ISSUE #4612] fix trace not complete (#6404)
5121e2ba0 is described below
commit 5121e2ba03e7c3e2484e449ea6fd674d59b3edbc
Author: panzhi <[email protected]>
AuthorDate: Mon Mar 20 13:46:13 2023 +0800
[ISSUE #4612] fix trace not complete (#6404)
---
.../apache/rocketmq/client/hook/ConsumeMessageContext.java | 11 +++++++++++
.../impl/consumer/ConsumeMessageConcurrentlyService.java | 1 +
.../client/impl/consumer/ConsumeMessageOrderlyService.java | 1 +
.../client/impl/consumer/DefaultLitePullConsumerImpl.java | 1 +
.../client/impl/consumer/DefaultMQPullConsumerImpl.java | 1 +
.../java/org/apache/rocketmq/client/trace/TraceContext.java | 10 ++++++++++
.../org/apache/rocketmq/client/trace/TraceDataEncoder.java | 9 ++++++---
.../client/trace/hook/ConsumeMessageTraceHookImpl.java | 1 +
.../apache/rocketmq/client/trace/TraceDataEncoderTest.java | 2 ++
9 files changed, 34 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
index 835852e9e..94633cea8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
+++
b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.client.hook;
import java.util.List;
import java.util.Map;
+
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -30,6 +32,7 @@ public class ConsumeMessageContext {
private Object mqTraceContext;
private Map<String, String> props;
private String namespace;
+ private AccessChannel accessChannel;
public String getConsumerGroup() {
return consumerGroup;
@@ -94,4 +97,12 @@ public class ConsumeMessageContext {
public void setNamespace(String namespace) {
this.namespace = namespace;
}
+
+ public AccessChannel getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(AccessChannel accessChannel) {
+ this.accessChannel = accessChannel;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index c3626c38b..bb6115cbf 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -438,6 +438,7 @@ public class ConsumeMessageConcurrentlyService implements
ConsumeMessageService
if
(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS ==
status);
+
consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 1ae416128..76c7a348d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -543,6 +543,7 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS
== status || ConsumeOrderlyStatus.COMMIT == status);
+
consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 8161e569a..a4eba4662 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -961,6 +961,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
+
consumeMessageContext.setAccessChannel(defaultLitePullConsumer.getAccessChannel());
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index dfcc23e3a..ffe9c6239 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -278,6 +278,7 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
+
consumeMessageContext.setAccessChannel(defaultMQPullConsumer.getAccessChannel());
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
index 16887b7ff..00cb26982 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import java.util.List;
@@ -34,6 +35,7 @@ public class TraceContext implements Comparable<TraceContext>
{
private boolean isSuccess = true;
private String requestId = MessageClientIDSetter.createUniqID();
private int contextCode = 0;
+ private AccessChannel accessChannel;
private List<TraceBean> traceBeans;
public int getContextCode() {
@@ -121,6 +123,14 @@ public class TraceContext implements
Comparable<TraceContext> {
return Long.compare(this.timeStamp, o.getTimeStamp());
}
+ public AccessChannel getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(AccessChannel accessChannel) {
+ this.accessChannel = accessChannel;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder(1024);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index f0c685e0a..523486f32 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageType;
@@ -190,9 +191,11 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
-
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
-
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
-
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR);
+ if (!ctx.getAccessChannel().equals(AccessChannel.CLOUD)) {
+
sb.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
+
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+ }
}
}
break;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
index bce613987..1d5dbebf8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
@@ -100,6 +100,7 @@ public class ConsumeMessageTraceHookImpl implements
ConsumeMessageHook {
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
+ subAfterContext.setAccessChannel(context.getAccessChannel());
subAfterContext.setSuccess(context.isSuccess());//
// Calculate the cost time for processing messages
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
index fed8c4ef7..dd4a119b5 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
@@ -195,6 +196,7 @@ public class TraceDataEncoderTest {
subAfterContext.setTimeStamp(1625883640000L);
subAfterContext.setGroupName("GroupName-test");
subAfterContext.setContextCode(98623046);
+ subAfterContext.setAccessChannel(AccessChannel.LOCAL);
TraceBean bean = new TraceBean();
bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
bean.setKeys("keys");