This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit e46fac7d8f39874dbd16bc1b61febcac05b56a37 Author: duhenglucky <[email protected]> AuthorDate: Fri Nov 29 12:18:26 2019 +0800 feat(client) use the trace rpc hook in trace core replace origin rpc hook --- .../ons/api/impl/rocketmq/ONSConsumerAbstract.java | 2 +- .../ons/api/impl/rocketmq/OrderProducerImpl.java | 2 +- .../ons/api/impl/rocketmq/ProducerImpl.java | 2 +- .../api/impl/rocketmq/TransactionProducerImpl.java | 2 +- .../tracehook/OnsClientSendMessageHookImpl.java | 97 ---------------- .../impl/tracehook/OnsConsumeMessageHookImpl.java | 123 --------------------- .../core/dispatch/impl/TraceProducerFactory.java | 16 ++- 7 files changed, 14 insertions(+), 230 deletions(-) diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java index d82feeb..3c6cae2 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java @@ -27,11 +27,11 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.api.exception.ONSClientException; -import org.apache.rocketmq.ons.api.impl.tracehook.OnsConsumeMessageHookImpl; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.ons.open.trace.core.hook.OnsConsumeMessageHookImpl; import org.apache.rocketmq.remoting.protocol.LanguageCode; public class ONSConsumerAbstract extends ONSClientAbstract { diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java index c840bb2..2808444 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java @@ -31,11 +31,11 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.api.exception.ONSClientException; -import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.remoting.protocol.LanguageCode; public class OrderProducerImpl extends ONSClientAbstract implements OrderProducer { diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java index cccdf05..80b3fe6 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java @@ -34,11 +34,11 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.api.exception.ONSClientException; -import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.LanguageCode; diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java index 66b3014..f56a460 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java @@ -33,11 +33,11 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.ons.api.Constants; import org.apache.rocketmq.ons.api.PropertyKeyConst; -import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.remoting.protocol.LanguageCode; public class TransactionProducerImpl extends ONSClientAbstract implements TransactionProducer { diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java deleted file mode 100644 index 6385ad9..0000000 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.ons.api.impl.tracehook; - -import java.util.ArrayList; -import org.apache.rocketmq.client.hook.SendMessageContext; -import org.apache.rocketmq.client.hook.SendMessageHook; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.NamespaceUtil; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType; -import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; - -public class OnsClientSendMessageHookImpl implements SendMessageHook { - - private AsyncDispatcher localDispatcher; - - public OnsClientSendMessageHookImpl(AsyncDispatcher localDispatcher) { - this.localDispatcher = localDispatcher; - } - - @Override - public String hookName() { - return "OnsClientSendMessageHook"; - } - - @Override - public void sendMessageBefore(SendMessageContext context) { - - if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) { - return; - } - OnsTraceContext onsContext = new OnsTraceContext(); - onsContext.setTraceBeans(new ArrayList<OnsTraceBean>(1)); - context.setMqTraceContext(onsContext); - onsContext.setTraceType(OnsTraceType.Pub); - String userGroup = NamespaceUtil.withoutNamespace(context.getProducerGroup(), context.getNamespace()); - onsContext.setGroupName(userGroup); - OnsTraceBean traceBean = new OnsTraceBean(); - String userTopic = NamespaceUtil.withoutNamespace(context.getMessage().getTopic(), context.getNamespace()); - traceBean.setTopic(userTopic); - traceBean.setTags(context.getMessage().getTags()); - traceBean.setKeys(context.getMessage().getKeys()); - traceBean.setStoreHost(context.getBrokerAddr()); - traceBean.setBodyLength(context.getMessage().getBody().length); - traceBean.setMsgType(context.getMsgType()); - onsContext.getTraceBeans().add(traceBean); - } - - @Override - public void sendMessageAfter(SendMessageContext context) { - - if (context == null || context.getMessage().getTopic().startsWith(OnsTraceConstants.traceTopic) || context.getMqTraceContext() == null) { - return; - } - if (context.getSendResult() == null) { - return; - } - if (context.getSendResult().getRegionId() == null - || context.getSendResult().getRegionId().equals(OnsTraceConstants.default_region) - || !context.getSendResult().isTraceOn()) { - // if regionId is default or switch is false,skip it - return; - } - OnsTraceContext onsContext = (OnsTraceContext) context.getMqTraceContext(); - OnsTraceBean traceBean = onsContext.getTraceBeans().get(0); - int costTime = (int) ((System.currentTimeMillis() - onsContext.getTimeStamp()) / onsContext.getTraceBeans().size()); - onsContext.setCostTime(costTime); - if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { - onsContext.setSuccess(true); - } else { - onsContext.setSuccess(false); - } - onsContext.setRegionId(context.getSendResult().getRegionId()); - traceBean.setMsgId(context.getSendResult().getMsgId()); - traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); - traceBean.setStoreTime(onsContext.getTimeStamp() + costTime / 2); - localDispatcher.append(onsContext); - } -} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java deleted file mode 100644 index d5a0782..0000000 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.ons.api.impl.tracehook; - -import java.util.ArrayList; -import java.util.List; -import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; -import org.apache.rocketmq.client.hook.ConsumeMessageContext; -import org.apache.rocketmq.client.hook.ConsumeMessageHook; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.NamespaceUtil; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType; -import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; - -public class OnsConsumeMessageHookImpl implements ConsumeMessageHook { - - private AsyncDispatcher localDispatcher; - - public OnsConsumeMessageHookImpl(AsyncDispatcher localDispatcher) { - this.localDispatcher = localDispatcher; - } - - @Override - public String hookName() { - return "OnsConsumeMessageHook"; - } - - @Override - public void consumeMessageBefore(ConsumeMessageContext context) { - if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { - return; - } - OnsTraceContext onsTraceContext = new OnsTraceContext(); - context.setMqTraceContext(onsTraceContext); - onsTraceContext.setTraceType(OnsTraceType.SubBefore); - String userGroup = NamespaceUtil.withoutNamespace(context.getConsumerGroup(), context.getNamespace()); - onsTraceContext.setGroupName(userGroup); - List<OnsTraceBean> beans = new ArrayList<OnsTraceBean>(); - for (MessageExt msg : context.getMsgList()) { - if (msg == null) { - continue; - } - String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION); - String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH); - if (regionId == null || regionId.equals(OnsTraceConstants.default_region)) { - // if regionId is default ,skip it - continue; - } - if (traceOn != null && "false".equals(traceOn)) { - // if trace switch is false ,skip it - continue; - } - OnsTraceBean traceBean = new OnsTraceBean(); - - String userTopic = NamespaceUtil.withoutNamespace(msg.getTopic(), context.getNamespace()); - traceBean.setTopic(userTopic); - traceBean.setMsgId(msg.getMsgId()); - traceBean.setTags(msg.getTags()); - traceBean.setKeys(msg.getKeys()); - traceBean.setStoreTime(msg.getStoreTimestamp()); - traceBean.setBodyLength(msg.getStoreSize()); - traceBean.setRetryTimes(msg.getReconsumeTimes()); - onsTraceContext.setRegionId(regionId); - beans.add(traceBean); - } - if (beans.size() > 0) { - onsTraceContext.setTraceBeans(beans); - onsTraceContext.setTimeStamp(System.currentTimeMillis()); - localDispatcher.append(onsTraceContext); - } - } - - @Override - public void consumeMessageAfter(ConsumeMessageContext context) { - if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { - return; - } - OnsTraceContext subBeforeContext = (OnsTraceContext) context.getMqTraceContext(); - if (subBeforeContext.getRegionId().equals(OnsTraceConstants.default_region)) { - // if regionId is default ,skip it - return; - } - if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { - // if subbefore bean is null ,skip it - return; - } - OnsTraceContext subAfterContext = new OnsTraceContext(); - subAfterContext.setTraceType(OnsTraceType.SubAfter); - subAfterContext.setRegionId(subBeforeContext.getRegionId()); - subAfterContext.setGroupName(subBeforeContext.getGroupName()); - subAfterContext.setRequestId(subBeforeContext.getRequestId()); - subAfterContext.setSuccess(context.isSuccess()); - - int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); - subAfterContext.setCostTime(costTime);// - subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans()); - String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE); - if (contextType != null) { - subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal()); - } - localDispatcher.append(subAfterContext); - } -} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java index fb1c3af..187637f 100644 --- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java @@ -36,14 +36,18 @@ public class TraceProducerFactory { public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) { if (traceProducer == null) { - SessionCredentials sessionCredentials = new SessionCredentials(); - Properties sessionProperties = new Properties(); String accessKey = properties.getProperty(OnsTraceConstants.AccessKey); String secretKey = properties.getProperty(OnsTraceConstants.SecretKey); - sessionProperties.put(OnsTraceConstants.AccessKey, accessKey); - sessionProperties.put(OnsTraceConstants.SecretKey, secretKey); - sessionCredentials.updateContent(sessionProperties); - traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials)); + if (accessKey != null && secretKey != null) { + SessionCredentials sessionCredentials = new SessionCredentials(); + Properties sessionProperties = new Properties(); + sessionProperties.put(OnsTraceConstants.AccessKey, accessKey); + sessionProperties.put(OnsTraceConstants.SecretKey, secretKey); + sessionCredentials.updateContent(sessionProperties); + traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials)); + } else { + traceProducer = new DefaultMQProducer(); + } traceProducer.setProducerGroup(accessKey + OnsTraceConstants.groupName); traceProducer.setSendMsgTimeout(5000); traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis())));
