wu-sheng closed pull request #796: support rocketMQ-3.x-plugin (from 3.0.4-open to 3.4.6.Final) URL: https://github.com/apache/incubator-skywalking/pull/796
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml index 6a4f1fffe..3a4f95a3f 100644 --- a/apm-sniffer/apm-sdk-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/pom.xml @@ -49,6 +49,7 @@ <module>mysql-5.x-plugin</module> <module>h2-1.x-plugin</module> <module>postgresql-8.x-plugin</module> + <module>rocketMQ-3.x-plugin</module> <module>rocketMQ-4.x-plugin</module> <module>elastic-job-2.x-plugin</module> <module>mongodb-2.x-plugin</module> diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml new file mode 100644 index 000000000..b01c17209 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml @@ -0,0 +1,68 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>apm-sdk-plugin</artifactId> + <groupId>org.apache.skywalking</groupId> + <version>5.0.0-alpha-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>apm-rocketmq-3.x-plugin</artifactId> + <name>rocketMQ-3.x-plugin</name> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <version>3.6.2.Final</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + </plugin> + <plugin> + <!-- ???? --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <!-- ??????????????? --> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java new file mode 100644 index 000000000..0a28b1b0a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.lang.reflect.Method; +import java.util.List; +import com.alibaba.rocketmq.common.message.MessageExt; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; + +/** + * {@link AbstractMessageConsumeInterceptor} create entry span when the <code>consumeMessage</code> in the {@link + * com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently} and {@link + * com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly} class. + * + * @author carlvine500 + */ +public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, + Class<?>[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + List<MessageExt> msgs = (List<MessageExt>)allArguments[0]; + + ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0)); + AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier); + + span.setComponent(ComponentsDefine.ROCKET_MQ); + span.setLayer(SpanLayer.MQ); + for (int i = 1; i < msgs.size(); i++) { + ContextManager.extract(getContextCarrierFromMessage(msgs.get(i))); + } + + } + + @Override public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class<?>[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } + + private ContextCarrier getContextCarrierFromMessage(MessageExt message) { + ContextCarrier contextCarrier = new ContextCarrier(); + + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + next.setHeadValue(message.getUserProperty(next.getHeadKey())); + } + + return contextCarrier; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java new file mode 100644 index 000000000..06e1eaf0c --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; + +/** + * {@link MessageConcurrentlyConsumeInterceptor} set the process status after the {@link + * com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List, + * com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method execute. + * + * @author carlvine500 + */ +public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor { + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + Object ret) throws Throwable { + ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret; + if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) { + AbstractSpan activeSpan = ContextManager.activeSpan(); + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, status.name()); + } + ContextManager.stopSpan(); + return ret; + } +} + diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java new file mode 100644 index 000000000..f24ee47c8 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.lang.reflect.Method; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; + +/** + * {@link MessageOrderlyConsumeInterceptor} set the process status after the {@link + * com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List, + * com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method execute. + * + * @author carlvine500 + */ +public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInterceptor { + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + Object ret) throws Throwable { + + ConsumeOrderlyStatus status = (ConsumeOrderlyStatus)ret; + if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) { + AbstractSpan activeSpan = ContextManager.activeSpan(); + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, status.name()); + } + ContextManager.stopSpan(); + return ret; + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java new file mode 100644 index 000000000..2c6e6e1f2 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; +import org.apache.skywalking.apm.util.StringUtil; + +import static com.alibaba.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +import static com.alibaba.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; + +/** + * {@link MessageSendInterceptor} create exit span when the method {@link com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String, + * String, Message, com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader, long, + * com.alibaba.rocketmq.client.impl.CommunicationMode, com.alibaba.rocketmq.client.producer.SendCallback, + * com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo, com.alibaba.rocketmq.client.impl.factory.MQClientInstance, + * int, com.alibaba.rocketmq.client.hook.SendMessageContext, com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)} + * execute. + * + * @author carlvine500 + */ +public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor { + + private static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + Message message = (Message)allArguments[2]; + ContextCarrier contextCarrier = new ContextCarrier(); + String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField()); + AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress); + span.setComponent(ComponentsDefine.ROCKET_MQ); + Tags.MQ_BROKER.set(span, (String)allArguments[0]); + Tags.MQ_TOPIC.set(span, message.getTopic()); + SpanLayer.asMQ(span); + + SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)allArguments[3]; + StringBuilder properties = new StringBuilder(requestHeader.getProperties()); + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + if (!StringUtil.isEmpty(next.getHeadValue())) { + properties.append(next.getHeadKey()); + properties.append(NAME_VALUE_SEPARATOR); + properties.append(next.getHeadValue()); + properties.append(PROPERTY_SEPARATOR); + } + } + requestHeader.setProperties(properties.toString()); + + if (allArguments[6] != null) { + ((EnhancedInstance)allArguments[6]).setSkyWalkingDynamicField(new SendCallBackEnhanceInfo(message.getTopic(), ContextManager.capture())); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class<?>[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } + + private String buildOperationName(String topicName) { + return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer"; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java new file mode 100644 index 000000000..04a220313 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; + +/** + * {@link OnExceptionInterceptor} create local span when the method {@link com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)} + * execute. + * + * @author carlvine500 + */ +public class OnExceptionInterceptor implements InstanceMethodsAroundInterceptor { + + private static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField(); + AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback"); + activeSpan.setComponent(ComponentsDefine.ROCKET_MQ); + activeSpan.errorOccurred().log((Throwable)allArguments[0]); + ContextManager.continued(enhanceInfo.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class<?>[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java new file mode 100644 index 000000000..01c9b76b2 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.client.producer.SendStatus; +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; + +/** + * {@link OnSuccessInterceptor} create local span when the method {@link com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(SendResult)} + * execute. + * + * @author carlvine500 + */ +public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField(); + AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback"); + activeSpan.setComponent(ComponentsDefine.ROCKET_MQ); + SendStatus sendStatus = ((SendResult)allArguments[0]).getSendStatus(); + if (sendStatus != SendStatus.SEND_OK) { + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, sendStatus.name()); + } + ContextManager.continued(enhanceInfo.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class<?>[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java new file mode 100644 index 000000000..d09e211f1 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +public class UpdateNameServerInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + objInst.setSkyWalkingDynamicField(allArguments[0]); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, + Object ret) throws Throwable { + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class<?>[] argumentsTypes, Throwable t) { + + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java new file mode 100644 index 000000000..ab184c96a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link ConsumeMessageConcurrentlyInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List, + * com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method by using {@link + * org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}. + * + * @author carlvine500 + */ +public class ConsumeMessageConcurrentlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently"; + private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage"; + private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { + return named(CONSUMER_MESSAGE_METHOD); + } + + @Override public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java new file mode 100644 index 000000000..a14e12d1d --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link ConsumeMessageOrderlyInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List, + * com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method by using {@link + * org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}. + * + * @author carlvine500 + */ +public class ConsumeMessageOrderlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly"; + private static final String ENHANCE_METHOD = "consumeMessage"; + private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageOrderlyConsumeInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { + return named(ENHANCE_METHOD); + } + + @Override public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java new file mode 100644 index 000000000..0f9538c2f --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * {@link MQClientAPIImplInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String, + * String, com.alibaba.rocketmq.common.message.Message, com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader, + * long, com.alibaba.rocketmq.client.impl.CommunicationMode, com.alibaba.rocketmq.client.producer.SendCallback, + * com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo, com.alibaba.rocketmq.client.impl.factory.MQClientInstance, + * int, com.alibaba.rocketmq.client.hook.SendMessageContext, com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)} + * method by using {@link org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor}. + * + * @author carlvine500 + */ +public class MQClientAPIImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.impl.MQClientAPIImpl"; + private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage"; + private static final String ASYNC_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor"; + private static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.UpdateNameServerInterceptor"; + private static final String UPDATE_NAME_SERVER_METHOD_NAME = "updateNameServerAddressList"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { + return named(SEND_MESSAGE_METHOD_NAME).and(takesArgumentWithType(6, "com.alibaba.rocketmq.client.producer.SendCallback")); + } + + @Override public String getMethodsInterceptor() { + return ASYNC_METHOD_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { + return named(UPDATE_NAME_SERVER_METHOD_NAME); + } + + @Override public String getMethodsInterceptor() { + return UPDATE_NAME_SERVER_INTERCEPT_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java new file mode 100644 index 000000000..82c73b3ec --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; + +/** + * {@link SendCallBackEnhanceInfo} saves the topic Id and {@link ContextSnapshot} instance for trace. + * + * @author carlvine500 + */ +public class SendCallBackEnhanceInfo { + private String topicId; + private ContextSnapshot contextSnapshot; + + public SendCallBackEnhanceInfo(String topicId, ContextSnapshot contextSnapshot) { + this.topicId = topicId; + this.contextSnapshot = contextSnapshot; + } + + public String getTopicId() { + return topicId; + } + + public ContextSnapshot getContextSnapshot() { + return contextSnapshot; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java new file mode 100644 index 000000000..1955d12fb --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link SendCallbackInstrumentation} intercepts {@link com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(com.alibaba.rocketmq.client.producer.SendResult sendResult)} + * method by using {@link org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor} and also intercepts {@link + * com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)} by using {@link + * org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor}. + * + * @author carlvine500 + */ +public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.producer.SendCallback"; + private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess"; + private static final String ON_SUCCESS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor"; + private static final String ON_EXCEPTION_METHOD = "onException"; + private static final String ON_EXCEPTION_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { + return named(ON_SUCCESS_ENHANCE_METHOD).and(takesArgumentWithType(0, "com.alibaba.rocketmq.client.producer.SendResult")); + } + + @Override public String getMethodsInterceptor() { + return ON_SUCCESS_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { + return named(ON_EXCEPTION_METHOD).and(takesArgumentWithType(0, "java.lang.Throwable")); + } + + @Override public String getMethodsInterceptor() { + return ON_EXCEPTION_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def new file mode 100644 index 000000000..8a0f31712 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def @@ -0,0 +1,4 @@ +rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageConcurrentlyInstrumentation +rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageOrderlyInstrumentation +rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.MQClientAPIImplInstrumentation +rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallbackInstrumentation diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java new file mode 100644 index 000000000..6e68a5b92 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.util.List; +import com.alibaba.rocketmq.client.impl.CommunicationMode; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class MessageSendInterceptorTest { + + private MessageSendInterceptor messageSendInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + private Object[] arguments; + + private Object[] argumentsWithoutCallback; + + @Mock + private Message message; + + @Mock + private SendMessageRequestHeader messageRequestHeader; + + @Mock + private EnhancedInstance callBack; + + private EnhancedInstance enhancedInstance; + + @Before + public void setUp() { + messageSendInterceptor = new MessageSendInterceptor(); + enhancedInstance = new EnhancedInstance() { + @Override public Object getSkyWalkingDynamicField() { + return "127.0.0.1:6543"; + } + + @Override public void setSkyWalkingDynamicField(Object value) { + + } + }; + + arguments = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, callBack}; + argumentsWithoutCallback = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, null}; + when(messageRequestHeader.getProperties()).thenReturn(""); + when(message.getTags()).thenReturn("TagA"); + } + + @Test + public void testSendMessage() throws Throwable { + messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null); + messageSendInterceptor.afterMethod(enhancedInstance, null, arguments, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan mqSpan = spans.get(0); + + SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); + SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertTag(mqSpan, 0, "127.0.0.1"); + verify(messageRequestHeader, times(1)).setProperties(anyString()); + verify(callBack, times(1)).setSkyWalkingDynamicField(Matchers.any()); + } + + @Test + public void testSendMessageWithoutCallBack() throws Throwable { + messageSendInterceptor.beforeMethod(enhancedInstance, null, argumentsWithoutCallback, null, null); + messageSendInterceptor.afterMethod(enhancedInstance, null, argumentsWithoutCallback, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan mqSpan = spans.get(0); + + SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); + SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertTag(mqSpan, 0, "127.0.0.1"); + verify(messageRequestHeader, times(1)).setProperties(anyString()); + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java new file mode 100644 index 000000000..a66d5e4b1 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.util.List; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SpanHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class OnExceptionInterceptorTest { + + private OnExceptionInterceptor exceptionInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + @Mock + private ContextSnapshot contextSnapshot; + private SendCallBackEnhanceInfo enhanceInfo; + + @Mock + private EnhancedInstance enhancedInstance; + + @Before + public void setUp() { + exceptionInterceptor = new OnExceptionInterceptor(); + + enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot); + when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo); + } + + @Test + public void testOnException() throws Throwable { + exceptionInterceptor.beforeMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null); + exceptionInterceptor.afterMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan exceptionSpan = spans.get(0); + SpanAssert.assertException(SpanHelper.getLogs(exceptionSpan).get(0), RuntimeException.class); + SpanAssert.assertOccurException(exceptionSpan, true); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java new file mode 100644 index 000000000..ef8e93986 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.util.List; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.client.producer.SendStatus; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class OnSuccessInterceptorTest { + + private OnSuccessInterceptor successInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + @Mock + private ContextSnapshot contextSnapshot; + @Mock + private SendResult sendResult; + + private SendCallBackEnhanceInfo enhanceInfo; + + @Mock + private EnhancedInstance enhancedInstance; + + @Before + public void setUp() { + successInterceptor = new OnSuccessInterceptor(); + + enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot); + when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo); + when(sendResult.getSendStatus()).thenReturn(SendStatus.SEND_OK); + } + + @Test + public void testOnSuccess() throws Throwable { + successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan successSpan = spans.get(0); + + SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ); + + } + + @Test + public void testOnSuccessWithErrorStatus() throws Throwable { + when(sendResult.getSendStatus()).thenReturn(SendStatus.FLUSH_SLAVE_TIMEOUT); + successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan successSpan = spans.get(0); + + SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertOccurException(successSpan, true); + + } + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services