This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit d7493f0e42bc7da471b37cbb2573d545b2c55f67 Author: duhenglucky <[email protected]> AuthorDate: Thu Nov 28 19:40:28 2019 +0800 feat(trace)adapted to support RocketMQ native client --- .../open/trace/core/common/OnsTraceConstants.java | 9 +- .../open/trace/core/dispatch/AsyncDispatcher.java | 2 + .../core/dispatch/impl/AsyncArrayDispatcher.java | 88 ++++++++++----- .../core/dispatch/impl/TraceProducerFactory.java | 31 +++++- .../core/hook/OnsClientSendMessageHookImpl.java | 97 ++++++++++++++++ .../trace/core/hook/OnsConsumeMessageHookImpl.java | 123 +++++++++++++++++++++ 6 files changed, 310 insertions(+), 40 deletions(-) diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java index 0877f84..f92ec8d 100644 --- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.ons.open.trace.core.common; import javax.annotation.Generated; - import org.apache.rocketmq.common.MixAll; @Generated("ons-client") @@ -42,16 +41,16 @@ public class OnsTraceConstants { public static final String MaxMsgSize = "MaxMsgSize"; - public static final String groupName = "_INNER_TRACE_PRODUCER"; public static final String traceTopic = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_"; - public static final String default_region = MixAll.DEFAULT_TRACE_REGION_ID; - public static final char CONTENT_SPLITOR = (char)1; - public static final char FIELD_SPLITOR = (char)2; + public static final char CONTENT_SPLITOR = (char) 1; + public static final char FIELD_SPLITOR = (char) 2; public static final String TraceDispatcherType = "DispatcherType"; + + public static final String CustomizedTraceTopic = "customizedTraceTopic"; } diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java index 87ead88..9156680 100644 --- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java @@ -30,4 +30,6 @@ public interface AsyncDispatcher { void flush() throws IOException; void shutdown(); + + void start(String nameServerAddresses) throws MQClientException; } diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java index 4f02bbd..f3a456d 100644 --- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java @@ -30,6 +30,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -52,13 +53,14 @@ import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDataEncoder; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceTransferBean; import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.remoting.RPCHook; public class AsyncArrayDispatcher implements AsyncDispatcher { private final static InternalLogger CLIENT_LOG = ClientLogger.getLog(); private final int queueSize; private final int batchSize; - private final DefaultMQProducer traceProducer; - private final ThreadPoolExecutor traceExecuter; + private DefaultMQProducer traceProducer; + private final ThreadPoolExecutor traceExecutor; private AtomicLong discardCount; private Thread worker; @@ -71,9 +73,24 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); + private String customizedTraceTopic; - public AsyncArrayDispatcher(Properties properties) throws MQClientException { + public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) { + this(properties, null, rpcHook); + traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); + } + + public AsyncArrayDispatcher(Properties properties) { + this(properties, null, null); + } + + public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials) { + this(properties, sessionCredentials, null); + } + + public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials, RPCHook rpcHook) { dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType); + this.customizedTraceTopic = properties.getProperty(OnsTraceConstants.CustomizedTraceTopic); int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048")); queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); this.queueSize = queueSize; @@ -82,34 +99,22 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { traceContextQueue = new ArrayBlockingQueue<OnsTraceContext>(1024); appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize); - this.traceExecuter = new ThreadPoolExecutor(// + this.traceExecutor = new ThreadPoolExecutor(// 10, // 20, // 1000 * 60, // TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); - traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties); - } - - public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials) throws MQClientException { - dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType); - int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048")); - queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); - this.queueSize = queueSize; - batchSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxBatchNum, "1")); - this.discardCount = new AtomicLong(0L); - traceContextQueue = new ArrayBlockingQueue<OnsTraceContext>(1024); - appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize); - - this.traceExecuter = new ThreadPoolExecutor( - 10, - 20, - 1000 * 60, - TimeUnit.MILLISECONDS, - this.appenderQueue, - new ThreadFactoryImpl("MQTraceSendThread_")); - traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials); + if (sessionCredentials == null && rpcHook == null) { + traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties); + } + if (properties != null && rpcHook != null) { + traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); + } + if (properties != null && sessionCredentials != null) { + traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials); + } } public DefaultMQProducerImpl getHostProducer() { @@ -138,6 +143,12 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { } @Override + public void start(String nameServerAddresses) throws MQClientException { + this.traceProducer.setNamesrvAddr(nameServerAddresses); + this.start(); + } + + @Override public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((OnsTraceContext) ctx); if (!result) { @@ -162,7 +173,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { @Override public void shutdown() { this.stopped = true; - this.traceExecuter.shutdown(); + this.traceExecutor.shutdown(); TraceProducerFactory.unregisterTraceDispatcher(dispatcherId); this.removeShutdownHook(); } @@ -171,6 +182,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { if (shutDownHook == null) { shutDownHook = new ThreadFactoryImpl("ShutdownHookMQTrace").newThread(new Runnable() { private volatile boolean hasShutdown = false; + @Override public void run() { synchronized (this) { @@ -215,7 +227,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { } if (contexts.size() > 0) { AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); - traceExecuter.submit(request); + traceExecutor.submit(request); } else if (AsyncArrayDispatcher.this.stopped) { this.stopped = true; } @@ -291,9 +303,13 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String currentRegionId) { - String topic = OnsTraceConstants.traceTopic + currentRegionId; + String topic = customizedTraceTopic; + if (StringUtils.isBlank(topic)) { + topic = OnsTraceConstants.traceTopic + currentRegionId; + } final Message message = new Message(topic, data.getBytes()); message.setKeys(keySet); + try { Set<String> dataBrokerSet = getBrokerSetByTopic(dataTopic); Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); @@ -305,7 +321,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { @Override public void onException(Throwable e) { - CLIENT_LOG.info("send trace data ,the traceData is " + data); + CLIENT_LOG.info("send trace data ,the traceData is: {} ", data, e); } }; if (dataBrokerSet.isEmpty()) { @@ -333,7 +349,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { } } catch (Exception e) { - CLIENT_LOG.info("send trace data,the traceData is" + data); + CLIENT_LOG.info("send trace data,the traceData is: {}", data, e); } } @@ -379,4 +395,16 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { return brokerSet; } } + + public String getCustomizedTraceTopic() { + return customizedTraceTopic; + } + + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.customizedTraceTopic = customizedTraceTopic; + } + + public DefaultMQProducer getTraceProducer() { + return traceProducer; + } } diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java index 60e65cb..fb1c3af 100644 --- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java @@ -21,14 +21,13 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.namesrv.TopAddressing; - import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; import org.apache.rocketmq.ons.api.impl.rocketmq.ClientRPCHook; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.remoting.RPCHook; public class TraceProducerFactory { private static Map<String, Object> dispatcherTable = new ConcurrentHashMap<String, Object>(); @@ -61,7 +60,27 @@ public class TraceProducerFactory { return traceProducer; } - public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, SessionCredentials sessionCredentials) { + public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) { + if (traceProducer == null) { + traceProducer = new DefaultMQProducer(rpcHook); + traceProducer.setProducerGroup(OnsTraceConstants.groupName); + traceProducer.setSendMsgTimeout(5000); + traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis()))); + String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR); + if (nameSrv == null) { + TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL)); + nameSrv = topAddressing.fetchNSAddr(); + } + traceProducer.setNamesrvAddr(nameSrv); + traceProducer.setVipChannelEnabled(false); + int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize, "128000")); + traceProducer.setMaxMessageSize(maxSize - 10 * 1000); + } + return traceProducer; + } + + public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, + SessionCredentials sessionCredentials) { if (traceProducer == null) { String accessKey = properties.getProperty(OnsTraceConstants.AccessKey); traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials)); @@ -86,12 +105,14 @@ public class TraceProducerFactory { if (traceProducer != null && isStarted.compareAndSet(false, true)) { traceProducer.start(); } + } public static void unregisterTraceDispatcher(String dispatcherId) { dispatcherTable.remove(dispatcherId); - if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) { + if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.compareAndSet(true, false)) { traceProducer.shutdown(); } } + } diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java new file mode 100644 index 0000000..c7b7e4f --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.open.trace.core.hook; + +import java.util.ArrayList; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; + +public class OnsClientSendMessageHookImpl implements SendMessageHook { + + private AsyncDispatcher localDispatcher; + + public OnsClientSendMessageHookImpl(AsyncDispatcher localDispatcher) { + this.localDispatcher = localDispatcher; + } + + @Override + public String hookName() { + return "OnsClientSendMessageHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + + if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) { + return; + } + OnsTraceContext onsContext = new OnsTraceContext(); + onsContext.setTraceBeans(new ArrayList<OnsTraceBean>(1)); + context.setMqTraceContext(onsContext); + onsContext.setTraceType(OnsTraceType.Pub); + String userGroup = NamespaceUtil.withoutNamespace(context.getProducerGroup(), context.getNamespace()); + onsContext.setGroupName(userGroup); + OnsTraceBean traceBean = new OnsTraceBean(); + String userTopic = NamespaceUtil.withoutNamespace(context.getMessage().getTopic(), context.getNamespace()); + traceBean.setTopic(userTopic); + traceBean.setTags(context.getMessage().getTags()); + traceBean.setKeys(context.getMessage().getKeys()); + traceBean.setStoreHost(context.getBrokerAddr()); + traceBean.setBodyLength(context.getMessage().getBody().length); + traceBean.setMsgType(context.getMsgType()); + onsContext.getTraceBeans().add(traceBean); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + + if (context == null || context.getMessage().getTopic().startsWith(OnsTraceConstants.traceTopic) || context.getMqTraceContext() == null) { + return; + } + if (context.getSendResult() == null) { + return; + } + if (context.getSendResult().getRegionId() == null + || context.getSendResult().getRegionId().equals(OnsTraceConstants.default_region) + || !context.getSendResult().isTraceOn()) { + // if regionId is default or switch is false,skip it + return; + } + OnsTraceContext onsContext = (OnsTraceContext) context.getMqTraceContext(); + OnsTraceBean traceBean = onsContext.getTraceBeans().get(0); + int costTime = (int) ((System.currentTimeMillis() - onsContext.getTimeStamp()) / onsContext.getTraceBeans().size()); + onsContext.setCostTime(costTime); + if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { + onsContext.setSuccess(true); + } else { + onsContext.setSuccess(false); + } + onsContext.setRegionId(context.getSendResult().getRegionId()); + traceBean.setMsgId(context.getSendResult().getMsgId()); + traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); + traceBean.setStoreTime(onsContext.getTimeStamp() + costTime / 2); + localDispatcher.append(onsContext); + } +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java new file mode 100644 index 0000000..e08cac1 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.hook; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; + +public class OnsConsumeMessageHookImpl implements ConsumeMessageHook { + + private AsyncDispatcher localDispatcher; + + public OnsConsumeMessageHookImpl(AsyncDispatcher localDispatcher) { + this.localDispatcher = localDispatcher; + } + + @Override + public String hookName() { + return "OnsConsumeMessageHook"; + } + + @Override + public void consumeMessageBefore(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + OnsTraceContext onsTraceContext = new OnsTraceContext(); + context.setMqTraceContext(onsTraceContext); + onsTraceContext.setTraceType(OnsTraceType.SubBefore); + String userGroup = NamespaceUtil.withoutNamespace(context.getConsumerGroup(), context.getNamespace()); + onsTraceContext.setGroupName(userGroup); + List<OnsTraceBean> beans = new ArrayList<OnsTraceBean>(); + for (MessageExt msg : context.getMsgList()) { + if (msg == null) { + continue; + } + String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION); + String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH); + if (regionId == null || regionId.equals(OnsTraceConstants.default_region)) { + // if regionId is default ,skip it + continue; + } + if (traceOn != null && "false".equals(traceOn)) { + // if trace switch is false ,skip it + continue; + } + OnsTraceBean traceBean = new OnsTraceBean(); + + String userTopic = NamespaceUtil.withoutNamespace(msg.getTopic(), context.getNamespace()); + traceBean.setTopic(userTopic); + traceBean.setMsgId(msg.getMsgId()); + traceBean.setTags(msg.getTags()); + traceBean.setKeys(msg.getKeys()); + traceBean.setStoreTime(msg.getStoreTimestamp()); + traceBean.setBodyLength(msg.getStoreSize()); + traceBean.setRetryTimes(msg.getReconsumeTimes()); + onsTraceContext.setRegionId(regionId); + beans.add(traceBean); + } + if (beans.size() > 0) { + onsTraceContext.setTraceBeans(beans); + onsTraceContext.setTimeStamp(System.currentTimeMillis()); + localDispatcher.append(onsTraceContext); + } + } + + @Override + public void consumeMessageAfter(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + OnsTraceContext subBeforeContext = (OnsTraceContext) context.getMqTraceContext(); + if (subBeforeContext.getRegionId().equals(OnsTraceConstants.default_region)) { + // if regionId is default ,skip it + return; + } + if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { + // if subbefore bean is null ,skip it + return; + } + OnsTraceContext subAfterContext = new OnsTraceContext(); + subAfterContext.setTraceType(OnsTraceType.SubAfter); + subAfterContext.setRegionId(subBeforeContext.getRegionId()); + subAfterContext.setGroupName(subBeforeContext.getGroupName()); + subAfterContext.setRequestId(subBeforeContext.getRequestId()); + subAfterContext.setSuccess(context.isSuccess()); + + int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); + subAfterContext.setCostTime(costTime);// + subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans()); + String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE); + if (contextType != null) { + subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal()); + } + localDispatcher.append(subAfterContext); + } +}
