This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2a8ba5a [ISSUE #1473] Trace message`s clientHost was wrong (#1474)
2a8ba5a is described below
commit 2a8ba5a79935ab0a23646455427647ebd57d9e07
Author: ChaosYjh <[email protected]>
AuthorDate: Tue Sep 15 15:07:47 2020 +0800
[ISSUE #1473] Trace message`s clientHost was wrong (#1474)
---
.../apache/rocketmq/client/trace/TraceDataEncoder.java | 15 +++++++++++++--
.../client/trace/hook/ConsumeMessageTraceHookImpl.java | 2 ++
.../client/trace/hook/SendMessageTraceHookImpl.java | 1 +
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index 5a1afaf..9569cc0 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -62,6 +62,14 @@ public class TraceDataEncoder {
bean.setOffsetMsgId(line[12]);
pubContext.setSuccess(Boolean.parseBoolean(line[13]));
}
+
+ // compatible with the old version
+ if (line.length >= 15) {
+ bean.setOffsetMsgId(line[12]);
+ pubContext.setSuccess(Boolean.parseBoolean(line[13]));
+ bean.setClientHost(line[14]);
+ }
+
pubContext.setTraceBeans(new ArrayList<TraceBean>(1));
pubContext.getTraceBeans().add(bean);
resList.add(pubContext);
@@ -76,6 +84,7 @@ public class TraceDataEncoder {
bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]);
+ bean.setClientHost(line[8]);
subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext);
@@ -130,7 +139,8 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
-
.append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
+
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
+
.append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);
}
break;
case SubBefore: {
@@ -142,7 +152,8 @@ public class TraceDataEncoder {
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
-
.append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
+
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
+
.append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);//
}
}
break;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
index f30b121..4f6f916 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace.hook;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
@@ -73,6 +74,7 @@ public class ConsumeMessageTraceHookImpl implements
ConsumeMessageHook {
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
+
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId());
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
index 80c7bab..4feb276 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
@@ -60,6 +60,7 @@ public class SendMessageTraceHookImpl implements
SendMessageHook {
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
+
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
tuxeContext.getTraceBeans().add(traceBean);
}