wu-sheng closed pull request #796: support rocketMQ-3.x-plugin (from 3.0.4-open 
to 3.4.6.Final)
URL: https://github.com/apache/incubator-skywalking/pull/796
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml 
b/apm-sniffer/apm-sdk-plugin/pom.xml
index 6a4f1fffe..3a4f95a3f 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -49,6 +49,7 @@
         <module>mysql-5.x-plugin</module>
         <module>h2-1.x-plugin</module>
         <module>postgresql-8.x-plugin</module>
+        <module>rocketMQ-3.x-plugin</module>
         <module>rocketMQ-4.x-plugin</module>
         <module>elastic-job-2.x-plugin</module>
         <module>mongodb-2.x-plugin</module>
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml
new file mode 100644
index 000000000..b01c17209
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>apm-sdk-plugin</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>5.0.0-alpha-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>apm-rocketmq-3.x-plugin</artifactId>
+    <name>rocketMQ-3.x-plugin</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>3.6.2.Final</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <!-- ???? -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+                <!-- ??????????????? -->
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java
new file mode 100644
index 000000000..0a28b1b0a
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * {@link AbstractMessageConsumeInterceptor} create entry span when the 
<code>consumeMessage</code> in the {@link
+ * com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently} 
and {@link
+ * com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly} class.
+ *
+ * @author carlvine500
+ */
+public abstract class AbstractMessageConsumeInterceptor implements 
InstanceMethodsAroundInterceptor {
+
+    public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+    @Override
+    public final void beforeMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments,
+        Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        List<MessageExt> msgs = (List<MessageExt>)allArguments[0];
+
+        ContextCarrier contextCarrier = 
getContextCarrierFromMessage(msgs.get(0));
+        AbstractSpan span = 
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + 
msgs.get(0).getTopic() + "/Consumer", contextCarrier);
+
+        span.setComponent(ComponentsDefine.ROCKET_MQ);
+        span.setLayer(SpanLayer.MQ);
+        for (int i = 1; i < msgs.size(); i++) {
+            ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
+        }
+
+    }
+
+    @Override public final void handleMethodException(EnhancedInstance 
objInst, Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+
+    private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
+        ContextCarrier contextCarrier = new ContextCarrier();
+
+        CarrierItem next = contextCarrier.items();
+        while (next.hasNext()) {
+            next = next.next();
+            next.setHeadValue(message.getUserProperty(next.getHeadKey()));
+        }
+
+        return contextCarrier;
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java
new file mode 100644
index 000000000..06e1eaf0c
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+
+/**
+ * {@link MessageConcurrentlyConsumeInterceptor} set the process status after 
the {@link
+ * 
com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
+ * com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} 
method execute.
+ *
+ * @author carlvine500
+ */
+public class MessageConcurrentlyConsumeInterceptor extends 
AbstractMessageConsumeInterceptor {
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret;
+        if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
+            AbstractSpan activeSpan = ContextManager.activeSpan();
+            activeSpan.errorOccurred();
+            Tags.STATUS_CODE.set(activeSpan, status.name());
+        }
+        ContextManager.stopSpan();
+        return ret;
+    }
+}
+
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java
new file mode 100644
index 000000000..f24ee47c8
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.lang.reflect.Method;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+
+/**
+ * {@link MessageOrderlyConsumeInterceptor} set the process status after the 
{@link
+ * 
com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
+ * com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} 
method execute.
+ *
+ * @author carlvine500
+ */
+public class MessageOrderlyConsumeInterceptor extends 
AbstractMessageConsumeInterceptor {
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+
+        ConsumeOrderlyStatus status = (ConsumeOrderlyStatus)ret;
+        if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) {
+            AbstractSpan activeSpan = ContextManager.activeSpan();
+            activeSpan.errorOccurred();
+            Tags.STATUS_CODE.set(activeSpan, status.name());
+        }
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java
new file mode 100644
index 000000000..2c6e6e1f2
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import 
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import static 
com.alibaba.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static 
com.alibaba.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
+/**
+ * {@link MessageSendInterceptor} create exit span when the method {@link 
com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
+ * String, Message, 
com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader, long,
+ * com.alibaba.rocketmq.client.impl.CommunicationMode, 
com.alibaba.rocketmq.client.producer.SendCallback,
+ * com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo, 
com.alibaba.rocketmq.client.impl.factory.MQClientInstance,
+ * int, com.alibaba.rocketmq.client.hook.SendMessageContext, 
com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
+ * execute.
+ *
+ * @author carlvine500
+ */
+public class MessageSendInterceptor implements 
InstanceMethodsAroundInterceptor {
+
+    private static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        Message message = (Message)allArguments[2];
+        ContextCarrier contextCarrier = new ContextCarrier();
+        String namingServiceAddress = 
String.valueOf(objInst.getSkyWalkingDynamicField());
+        AbstractSpan span = 
ContextManager.createExitSpan(buildOperationName(message.getTopic()), 
contextCarrier, namingServiceAddress);
+        span.setComponent(ComponentsDefine.ROCKET_MQ);
+        Tags.MQ_BROKER.set(span, (String)allArguments[0]);
+        Tags.MQ_TOPIC.set(span, message.getTopic());
+        SpanLayer.asMQ(span);
+
+        SendMessageRequestHeader requestHeader = 
(SendMessageRequestHeader)allArguments[3];
+        StringBuilder properties = new 
StringBuilder(requestHeader.getProperties());
+        CarrierItem next = contextCarrier.items();
+        while (next.hasNext()) {
+            next = next.next();
+            if (!StringUtil.isEmpty(next.getHeadValue())) {
+                properties.append(next.getHeadKey());
+                properties.append(NAME_VALUE_SEPARATOR);
+                properties.append(next.getHeadValue());
+                properties.append(PROPERTY_SEPARATOR);
+            }
+        }
+        requestHeader.setProperties(properties.toString());
+
+        if (allArguments[6] != null) {
+            ((EnhancedInstance)allArguments[6]).setSkyWalkingDynamicField(new 
SendCallBackEnhanceInfo(message.getTopic(), ContextManager.capture()));
+        }
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+    @Override public void handleMethodException(EnhancedInstance objInst, 
Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+
+    private String buildOperationName(String topicName) {
+        return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java
new file mode 100644
index 000000000..04a220313
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import 
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+
+/**
+ * {@link OnExceptionInterceptor} create local span when the method {@link 
com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)}
+ * execute.
+ *
+ * @author carlvine500
+ */
+public class OnExceptionInterceptor implements 
InstanceMethodsAroundInterceptor {
+
+    private static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        SendCallBackEnhanceInfo enhanceInfo = 
(SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();
+        AbstractSpan activeSpan = 
ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + 
enhanceInfo.getTopicId() + "/Producer/Callback");
+        activeSpan.setComponent(ComponentsDefine.ROCKET_MQ);
+        activeSpan.errorOccurred().log((Throwable)allArguments[0]);
+        ContextManager.continued(enhanceInfo.getContextSnapshot());
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+    @Override public void handleMethodException(EnhancedInstance objInst, 
Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().log(t);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java
new file mode 100644
index 000000000..01c9b76b2
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.client.producer.SendStatus;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import 
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+
+/**
+ * {@link OnSuccessInterceptor} create local span when the method {@link 
com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(SendResult)}
+ * execute.
+ *
+ * @author carlvine500
+ */
+public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor {
+
+    public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        SendCallBackEnhanceInfo enhanceInfo = 
(SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();
+        AbstractSpan activeSpan = 
ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + 
enhanceInfo.getTopicId() + "/Producer/Callback");
+        activeSpan.setComponent(ComponentsDefine.ROCKET_MQ);
+        SendStatus sendStatus = ((SendResult)allArguments[0]).getSendStatus();
+        if (sendStatus != SendStatus.SEND_OK) {
+            activeSpan.errorOccurred();
+            Tags.STATUS_CODE.set(activeSpan, sendStatus.name());
+        }
+        ContextManager.continued(enhanceInfo.getContextSnapshot());
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+    @Override public void handleMethodException(EnhancedInstance objInst, 
Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java
new file mode 100644
index 000000000..d09e211f1
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.lang.reflect.Method;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+public class UpdateNameServerInterceptor implements 
InstanceMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        objInst.setSkyWalkingDynamicField(allArguments[0]);
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        return ret;
+    }
+
+    @Override public void handleMethodException(EnhancedInstance objInst, 
Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java
new file mode 100644
index 000000000..ab184c96a
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link ConsumeMessageConcurrentlyInstrumentation} intercepts the {@link 
com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
+ * com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} 
method by using {@link
+ * 
org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}.
+ *
+ * @author carlvine500
+ */
+public class ConsumeMessageConcurrentlyInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+    private static final String ENHANCE_CLASS = 
"com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently";
+    private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage";
+    private static final String INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor";
+
+    @Override protected ConstructorInterceptPoint[] 
getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override protected InstanceMethodsInterceptPoint[] 
getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return named(CONSUMER_MESSAGE_METHOD);
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java
new file mode 100644
index 000000000..a14e12d1d
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link ConsumeMessageOrderlyInstrumentation} intercepts the {@link 
com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
+ * com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} 
method by using {@link
+ * 
org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}.
+ *
+ * @author carlvine500
+ */
+public class ConsumeMessageOrderlyInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+    private static final String ENHANCE_CLASS = 
"com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly";
+    private static final String ENHANCE_METHOD = "consumeMessage";
+    private static final String INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageOrderlyConsumeInterceptor";
+
+    @Override protected ConstructorInterceptPoint[] 
getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override protected InstanceMethodsInterceptPoint[] 
getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return named(ENHANCE_METHOD);
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java
new file mode 100644
index 000000000..0f9538c2f
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * {@link MQClientAPIImplInstrumentation} intercepts the {@link 
com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
+ * String, com.alibaba.rocketmq.common.message.Message, 
com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader,
+ * long, com.alibaba.rocketmq.client.impl.CommunicationMode, 
com.alibaba.rocketmq.client.producer.SendCallback,
+ * com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo, 
com.alibaba.rocketmq.client.impl.factory.MQClientInstance,
+ * int, com.alibaba.rocketmq.client.hook.SendMessageContext, 
com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
+ * method by using {@link 
org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor}.
+ *
+ * @author carlvine500
+ */
+public class MQClientAPIImplInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+
+    private static final String ENHANCE_CLASS = 
"com.alibaba.rocketmq.client.impl.MQClientAPIImpl";
+    private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage";
+    private static final String ASYNC_METHOD_INTERCEPTOR = 
"org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor";
+    private static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS = 
"org.apache.skywalking.apm.plugin.rocketMQ.v3.UpdateNameServerInterceptor";
+    private static final String UPDATE_NAME_SERVER_METHOD_NAME = 
"updateNameServerAddressList";
+
+    @Override protected ConstructorInterceptPoint[] 
getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override protected InstanceMethodsInterceptPoint[] 
getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return 
named(SEND_MESSAGE_METHOD_NAME).and(takesArgumentWithType(6, 
"com.alibaba.rocketmq.client.producer.SendCallback"));
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return ASYNC_METHOD_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            },
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return named(UPDATE_NAME_SERVER_METHOD_NAME);
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return UPDATE_NAME_SERVER_INTERCEPT_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java
new file mode 100644
index 000000000..82c73b3ec
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+
+/**
+ * {@link SendCallBackEnhanceInfo} saves the topic Id and {@link 
ContextSnapshot} instance for trace.
+ *
+ * @author carlvine500
+ */
+public class SendCallBackEnhanceInfo {
+    private String topicId;
+    private ContextSnapshot contextSnapshot;
+
+    public SendCallBackEnhanceInfo(String topicId, ContextSnapshot 
contextSnapshot) {
+        this.topicId = topicId;
+        this.contextSnapshot = contextSnapshot;
+    }
+
+    public String getTopicId() {
+        return topicId;
+    }
+
+    public ContextSnapshot getContextSnapshot() {
+        return contextSnapshot;
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java
new file mode 100644
index 000000000..1955d12fb
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link SendCallbackInstrumentation} intercepts {@link 
com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(com.alibaba.rocketmq.client.producer.SendResult
 sendResult)}
+ * method by using {@link 
org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor} and also 
intercepts {@link
+ * com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)} 
by using {@link
+ * org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor}.
+ *
+ * @author carlvine500
+ */
+public class SendCallbackInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+
+    private static final String ENHANCE_CLASS = 
"com.alibaba.rocketmq.client.producer.SendCallback";
+    private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess";
+    private static final String ON_SUCCESS_INTERCEPTOR = 
"org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor";
+    private static final String ON_EXCEPTION_METHOD = "onException";
+    private static final String ON_EXCEPTION_INTERCEPTOR = 
"org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor";
+
+    @Override protected ConstructorInterceptPoint[] 
getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override protected InstanceMethodsInterceptPoint[] 
getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return 
named(ON_SUCCESS_ENHANCE_METHOD).and(takesArgumentWithType(0, 
"com.alibaba.rocketmq.client.producer.SendResult"));
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return ON_SUCCESS_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            },
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return 
named(ON_EXCEPTION_METHOD).and(takesArgumentWithType(0, "java.lang.Throwable"));
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return ON_EXCEPTION_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 000000000..8a0f31712
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,4 @@
+rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageConcurrentlyInstrumentation
+rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageOrderlyInstrumentation
+rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.MQClientAPIImplInstrumentation
+rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallbackInstrumentation
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java
new file mode 100644
index 000000000..6e68a5b92
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.util.List;
+import com.alibaba.rocketmq.client.impl.CommunicationMode;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class MessageSendInterceptorTest {
+
+    private MessageSendInterceptor messageSendInterceptor;
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    private Object[] arguments;
+
+    private Object[] argumentsWithoutCallback;
+
+    @Mock
+    private Message message;
+
+    @Mock
+    private SendMessageRequestHeader messageRequestHeader;
+
+    @Mock
+    private EnhancedInstance callBack;
+
+    private EnhancedInstance enhancedInstance;
+
+    @Before
+    public void setUp() {
+        messageSendInterceptor = new MessageSendInterceptor();
+        enhancedInstance = new EnhancedInstance() {
+            @Override public Object getSkyWalkingDynamicField() {
+                return "127.0.0.1:6543";
+            }
+
+            @Override public void setSkyWalkingDynamicField(Object value) {
+
+            }
+        };
+
+        arguments = new Object[] {"127.0.0.1", "test", message, 
messageRequestHeader, null, CommunicationMode.ASYNC, callBack};
+        argumentsWithoutCallback = new Object[] {"127.0.0.1", "test", message, 
messageRequestHeader, null, CommunicationMode.ASYNC, null};
+        when(messageRequestHeader.getProperties()).thenReturn("");
+        when(message.getTags()).thenReturn("TagA");
+    }
+
+    @Test
+    public void testSendMessage() throws Throwable {
+        messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments, 
null, null);
+        messageSendInterceptor.afterMethod(enhancedInstance, null, arguments, 
null, null);
+
+        assertThat(segmentStorage.getTraceSegments().size(), is(1));
+        TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+
+        AbstractTracingSpan mqSpan = spans.get(0);
+
+        SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
+        SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ);
+        SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
+        verify(messageRequestHeader, times(1)).setProperties(anyString());
+        verify(callBack, times(1)).setSkyWalkingDynamicField(Matchers.any());
+    }
+
+    @Test
+    public void testSendMessageWithoutCallBack() throws Throwable {
+        messageSendInterceptor.beforeMethod(enhancedInstance, null, 
argumentsWithoutCallback, null, null);
+        messageSendInterceptor.afterMethod(enhancedInstance, null, 
argumentsWithoutCallback, null, null);
+
+        assertThat(segmentStorage.getTraceSegments().size(), is(1));
+        TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+
+        AbstractTracingSpan mqSpan = spans.get(0);
+
+        SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
+        SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ);
+        SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
+        verify(messageRequestHeader, times(1)).setProperties(anyString());
+    }
+
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java
new file mode 100644
index 000000000..a66d5e4b1
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import 
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class OnExceptionInterceptorTest {
+
+    private OnExceptionInterceptor exceptionInterceptor;
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    @Mock
+    private ContextSnapshot contextSnapshot;
+    private SendCallBackEnhanceInfo enhanceInfo;
+
+    @Mock
+    private EnhancedInstance enhancedInstance;
+
+    @Before
+    public void setUp() {
+        exceptionInterceptor = new OnExceptionInterceptor();
+
+        enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot);
+        
when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
+    }
+
+    @Test
+    public void testOnException() throws Throwable {
+        exceptionInterceptor.beforeMethod(enhancedInstance, null, new Object[] 
{new RuntimeException()}, null, null);
+        exceptionInterceptor.afterMethod(enhancedInstance, null, new Object[] 
{new RuntimeException()}, null, null);
+
+        assertThat(segmentStorage.getTraceSegments().size(), is(1));
+        TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+
+        AbstractTracingSpan exceptionSpan = spans.get(0);
+        SpanAssert.assertException(SpanHelper.getLogs(exceptionSpan).get(0), 
RuntimeException.class);
+        SpanAssert.assertOccurException(exceptionSpan, true);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java
new file mode 100644
index 000000000..ef8e93986
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.util.List;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.client.producer.SendStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import 
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class OnSuccessInterceptorTest {
+
+    private OnSuccessInterceptor successInterceptor;
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    @Mock
+    private ContextSnapshot contextSnapshot;
+    @Mock
+    private SendResult sendResult;
+
+    private SendCallBackEnhanceInfo enhanceInfo;
+
+    @Mock
+    private EnhancedInstance enhancedInstance;
+
+    @Before
+    public void setUp() {
+        successInterceptor = new OnSuccessInterceptor();
+
+        enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot);
+        
when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
+        when(sendResult.getSendStatus()).thenReturn(SendStatus.SEND_OK);
+    }
+
+    @Test
+    public void testOnSuccess() throws Throwable {
+        successInterceptor.beforeMethod(enhancedInstance, null, new Object[] 
{sendResult}, null, null);
+        successInterceptor.afterMethod(enhancedInstance, null, new Object[] 
{sendResult}, null, null);
+
+        assertThat(segmentStorage.getTraceSegments().size(), is(1));
+        TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+
+        AbstractTracingSpan successSpan = spans.get(0);
+
+        SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ);
+
+    }
+
+    @Test
+    public void testOnSuccessWithErrorStatus() throws Throwable {
+        
when(sendResult.getSendStatus()).thenReturn(SendStatus.FLUSH_SLAVE_TIMEOUT);
+        successInterceptor.beforeMethod(enhancedInstance, null, new Object[] 
{sendResult}, null, null);
+        successInterceptor.afterMethod(enhancedInstance, null, new Object[] 
{sendResult}, null, null);
+
+        assertThat(segmentStorage.getTraceSegments().size(), is(1));
+        TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+
+        AbstractTracingSpan successSpan = spans.get(0);
+
+        SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ);
+        SpanAssert.assertOccurException(successSpan, true);
+
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to