wu-sheng closed pull request #796: support rocketMQ-3.x-plugin (from 3.0.4-open
to 3.4.6.Final)
URL: https://github.com/apache/incubator-skywalking/pull/796
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml
b/apm-sniffer/apm-sdk-plugin/pom.xml
index 6a4f1fffe..3a4f95a3f 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -49,6 +49,7 @@
<module>mysql-5.x-plugin</module>
<module>h2-1.x-plugin</module>
<module>postgresql-8.x-plugin</module>
+ <module>rocketMQ-3.x-plugin</module>
<module>rocketMQ-4.x-plugin</module>
<module>elastic-job-2.x-plugin</module>
<module>mongodb-2.x-plugin</module>
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml
new file mode 100644
index 000000000..b01c17209
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>apm-sdk-plugin</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>5.0.0-alpha-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>apm-rocketmq-3.x-plugin</artifactId>
+ <name>rocketMQ-3.x-plugin</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>3.6.2.Final</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <!-- ???? -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <!-- ??????????????? -->
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java
new file mode 100644
index 000000000..0a28b1b0a
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * {@link AbstractMessageConsumeInterceptor} create entry span when the
<code>consumeMessage</code> in the {@link
+ * com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently}
and {@link
+ * com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly} class.
+ *
+ * @author carlvine500
+ */
+public abstract class AbstractMessageConsumeInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public final void beforeMethod(EnhancedInstance objInst, Method method,
Object[] allArguments,
+ Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ List<MessageExt> msgs = (List<MessageExt>)allArguments[0];
+
+ ContextCarrier contextCarrier =
getContextCarrierFromMessage(msgs.get(0));
+ AbstractSpan span =
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX +
msgs.get(0).getTopic() + "/Consumer", contextCarrier);
+
+ span.setComponent(ComponentsDefine.ROCKET_MQ);
+ span.setLayer(SpanLayer.MQ);
+ for (int i = 1; i < msgs.size(); i++) {
+ ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
+ }
+
+ }
+
+ @Override public final void handleMethodException(EnhancedInstance
objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+
+ private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
+ ContextCarrier contextCarrier = new ContextCarrier();
+
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ next.setHeadValue(message.getUserProperty(next.getHeadKey()));
+ }
+
+ return contextCarrier;
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java
new file mode 100644
index 000000000..06e1eaf0c
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+
+/**
+ * {@link MessageConcurrentlyConsumeInterceptor} set the process status after
the {@link
+ *
com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
+ * com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)}
method execute.
+ *
+ * @author carlvine500
+ */
+public class MessageConcurrentlyConsumeInterceptor extends
AbstractMessageConsumeInterceptor {
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret;
+ if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
+ AbstractSpan activeSpan = ContextManager.activeSpan();
+ activeSpan.errorOccurred();
+ Tags.STATUS_CODE.set(activeSpan, status.name());
+ }
+ ContextManager.stopSpan();
+ return ret;
+ }
+}
+
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java
new file mode 100644
index 000000000..f24ee47c8
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.lang.reflect.Method;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+
+/**
+ * {@link MessageOrderlyConsumeInterceptor} set the process status after the
{@link
+ *
com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
+ * com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)}
method execute.
+ *
+ * @author carlvine500
+ */
+public class MessageOrderlyConsumeInterceptor extends
AbstractMessageConsumeInterceptor {
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+
+ ConsumeOrderlyStatus status = (ConsumeOrderlyStatus)ret;
+ if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) {
+ AbstractSpan activeSpan = ContextManager.activeSpan();
+ activeSpan.errorOccurred();
+ Tags.STATUS_CODE.set(activeSpan, status.name());
+ }
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java
new file mode 100644
index 000000000..2c6e6e1f2
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import static
com.alibaba.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static
com.alibaba.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
+/**
+ * {@link MessageSendInterceptor} create exit span when the method {@link
com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
+ * String, Message,
com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader, long,
+ * com.alibaba.rocketmq.client.impl.CommunicationMode,
com.alibaba.rocketmq.client.producer.SendCallback,
+ * com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo,
com.alibaba.rocketmq.client.impl.factory.MQClientInstance,
+ * int, com.alibaba.rocketmq.client.hook.SendMessageContext,
com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
+ * execute.
+ *
+ * @author carlvine500
+ */
+public class MessageSendInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ private static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ Message message = (Message)allArguments[2];
+ ContextCarrier contextCarrier = new ContextCarrier();
+ String namingServiceAddress =
String.valueOf(objInst.getSkyWalkingDynamicField());
+ AbstractSpan span =
ContextManager.createExitSpan(buildOperationName(message.getTopic()),
contextCarrier, namingServiceAddress);
+ span.setComponent(ComponentsDefine.ROCKET_MQ);
+ Tags.MQ_BROKER.set(span, (String)allArguments[0]);
+ Tags.MQ_TOPIC.set(span, message.getTopic());
+ SpanLayer.asMQ(span);
+
+ SendMessageRequestHeader requestHeader =
(SendMessageRequestHeader)allArguments[3];
+ StringBuilder properties = new
StringBuilder(requestHeader.getProperties());
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ if (!StringUtil.isEmpty(next.getHeadValue())) {
+ properties.append(next.getHeadKey());
+ properties.append(NAME_VALUE_SEPARATOR);
+ properties.append(next.getHeadValue());
+ properties.append(PROPERTY_SEPARATOR);
+ }
+ }
+ requestHeader.setProperties(properties.toString());
+
+ if (allArguments[6] != null) {
+ ((EnhancedInstance)allArguments[6]).setSkyWalkingDynamicField(new
SendCallBackEnhanceInfo(message.getTopic(), ContextManager.capture()));
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override public void handleMethodException(EnhancedInstance objInst,
Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+
+ private String buildOperationName(String topicName) {
+ return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java
new file mode 100644
index 000000000..04a220313
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+
+/**
+ * {@link OnExceptionInterceptor} create local span when the method {@link
com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)}
+ * execute.
+ *
+ * @author carlvine500
+ */
+public class OnExceptionInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ private static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ SendCallBackEnhanceInfo enhanceInfo =
(SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();
+ AbstractSpan activeSpan =
ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX +
enhanceInfo.getTopicId() + "/Producer/Callback");
+ activeSpan.setComponent(ComponentsDefine.ROCKET_MQ);
+ activeSpan.errorOccurred().log((Throwable)allArguments[0]);
+ ContextManager.continued(enhanceInfo.getContextSnapshot());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override public void handleMethodException(EnhancedInstance objInst,
Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java
new file mode 100644
index 000000000..01c9b76b2
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.client.producer.SendStatus;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+
+/**
+ * {@link OnSuccessInterceptor} create local span when the method {@link
com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(SendResult)}
+ * execute.
+ *
+ * @author carlvine500
+ */
+public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ SendCallBackEnhanceInfo enhanceInfo =
(SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();
+ AbstractSpan activeSpan =
ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX +
enhanceInfo.getTopicId() + "/Producer/Callback");
+ activeSpan.setComponent(ComponentsDefine.ROCKET_MQ);
+ SendStatus sendStatus = ((SendResult)allArguments[0]).getSendStatus();
+ if (sendStatus != SendStatus.SEND_OK) {
+ activeSpan.errorOccurred();
+ Tags.STATUS_CODE.set(activeSpan, sendStatus.name());
+ }
+ ContextManager.continued(enhanceInfo.getContextSnapshot());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override public void handleMethodException(EnhancedInstance objInst,
Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java
new file mode 100644
index 000000000..d09e211f1
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.lang.reflect.Method;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+public class UpdateNameServerInterceptor implements
InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ objInst.setSkyWalkingDynamicField(allArguments[0]);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ return ret;
+ }
+
+ @Override public void handleMethodException(EnhancedInstance objInst,
Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java
new file mode 100644
index 000000000..ab184c96a
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link ConsumeMessageConcurrentlyInstrumentation} intercepts the {@link
com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
+ * com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)}
method by using {@link
+ *
org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}.
+ *
+ * @author carlvine500
+ */
+public class ConsumeMessageConcurrentlyInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS =
"com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently";
+ private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage";
+ private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor";
+
+ @Override protected ConstructorInterceptPoint[]
getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override protected InstanceMethodsInterceptPoint[]
getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return named(CONSUMER_MESSAGE_METHOD);
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java
new file mode 100644
index 000000000..a14e12d1d
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link ConsumeMessageOrderlyInstrumentation} intercepts the {@link
com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
+ * com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)}
method by using {@link
+ *
org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}.
+ *
+ * @author carlvine500
+ */
+public class ConsumeMessageOrderlyInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS =
"com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly";
+ private static final String ENHANCE_METHOD = "consumeMessage";
+ private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageOrderlyConsumeInterceptor";
+
+ @Override protected ConstructorInterceptPoint[]
getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override protected InstanceMethodsInterceptPoint[]
getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return named(ENHANCE_METHOD);
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java
new file mode 100644
index 000000000..0f9538c2f
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * {@link MQClientAPIImplInstrumentation} intercepts the {@link
com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
+ * String, com.alibaba.rocketmq.common.message.Message,
com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader,
+ * long, com.alibaba.rocketmq.client.impl.CommunicationMode,
com.alibaba.rocketmq.client.producer.SendCallback,
+ * com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo,
com.alibaba.rocketmq.client.impl.factory.MQClientInstance,
+ * int, com.alibaba.rocketmq.client.hook.SendMessageContext,
com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
+ * method by using {@link
org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor}.
+ *
+ * @author carlvine500
+ */
+public class MQClientAPIImplInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS =
"com.alibaba.rocketmq.client.impl.MQClientAPIImpl";
+ private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage";
+ private static final String ASYNC_METHOD_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor";
+ private static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v3.UpdateNameServerInterceptor";
+ private static final String UPDATE_NAME_SERVER_METHOD_NAME =
"updateNameServerAddressList";
+
+ @Override protected ConstructorInterceptPoint[]
getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override protected InstanceMethodsInterceptPoint[]
getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return
named(SEND_MESSAGE_METHOD_NAME).and(takesArgumentWithType(6,
"com.alibaba.rocketmq.client.producer.SendCallback"));
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return ASYNC_METHOD_INTERCEPTOR;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return named(UPDATE_NAME_SERVER_METHOD_NAME);
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return UPDATE_NAME_SERVER_INTERCEPT_CLASS;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java
new file mode 100644
index 000000000..82c73b3ec
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+
+/**
+ * {@link SendCallBackEnhanceInfo} saves the topic Id and {@link
ContextSnapshot} instance for trace.
+ *
+ * @author carlvine500
+ */
+public class SendCallBackEnhanceInfo {
+ private String topicId;
+ private ContextSnapshot contextSnapshot;
+
+ public SendCallBackEnhanceInfo(String topicId, ContextSnapshot
contextSnapshot) {
+ this.topicId = topicId;
+ this.contextSnapshot = contextSnapshot;
+ }
+
+ public String getTopicId() {
+ return topicId;
+ }
+
+ public ContextSnapshot getContextSnapshot() {
+ return contextSnapshot;
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java
new file mode 100644
index 000000000..1955d12fb
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link SendCallbackInstrumentation} intercepts {@link
com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(com.alibaba.rocketmq.client.producer.SendResult
sendResult)}
+ * method by using {@link
org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor} and also
intercepts {@link
+ * com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)}
by using {@link
+ * org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor}.
+ *
+ * @author carlvine500
+ */
+public class SendCallbackInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS =
"com.alibaba.rocketmq.client.producer.SendCallback";
+ private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess";
+ private static final String ON_SUCCESS_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor";
+ private static final String ON_EXCEPTION_METHOD = "onException";
+ private static final String ON_EXCEPTION_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor";
+
+ @Override protected ConstructorInterceptPoint[]
getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override protected InstanceMethodsInterceptPoint[]
getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return
named(ON_SUCCESS_ENHANCE_METHOD).and(takesArgumentWithType(0,
"com.alibaba.rocketmq.client.producer.SendResult"));
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return ON_SUCCESS_INTERCEPTOR;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return
named(ON_EXCEPTION_METHOD).and(takesArgumentWithType(0, "java.lang.Throwable"));
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return ON_EXCEPTION_INTERCEPTOR;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 000000000..8a0f31712
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,4 @@
+rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageConcurrentlyInstrumentation
+rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageOrderlyInstrumentation
+rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.MQClientAPIImplInstrumentation
+rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallbackInstrumentation
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java
new file mode 100644
index 000000000..6e68a5b92
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.util.List;
+import com.alibaba.rocketmq.client.impl.CommunicationMode;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class MessageSendInterceptorTest {
+
+ private MessageSendInterceptor messageSendInterceptor;
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ private Object[] arguments;
+
+ private Object[] argumentsWithoutCallback;
+
+ @Mock
+ private Message message;
+
+ @Mock
+ private SendMessageRequestHeader messageRequestHeader;
+
+ @Mock
+ private EnhancedInstance callBack;
+
+ private EnhancedInstance enhancedInstance;
+
+ @Before
+ public void setUp() {
+ messageSendInterceptor = new MessageSendInterceptor();
+ enhancedInstance = new EnhancedInstance() {
+ @Override public Object getSkyWalkingDynamicField() {
+ return "127.0.0.1:6543";
+ }
+
+ @Override public void setSkyWalkingDynamicField(Object value) {
+
+ }
+ };
+
+ arguments = new Object[] {"127.0.0.1", "test", message,
messageRequestHeader, null, CommunicationMode.ASYNC, callBack};
+ argumentsWithoutCallback = new Object[] {"127.0.0.1", "test", message,
messageRequestHeader, null, CommunicationMode.ASYNC, null};
+ when(messageRequestHeader.getProperties()).thenReturn("");
+ when(message.getTags()).thenReturn("TagA");
+ }
+
+ @Test
+ public void testSendMessage() throws Throwable {
+ messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments,
null, null);
+ messageSendInterceptor.afterMethod(enhancedInstance, null, arguments,
null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan mqSpan = spans.get(0);
+
+ SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
+ SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ);
+ SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
+ verify(messageRequestHeader, times(1)).setProperties(anyString());
+ verify(callBack, times(1)).setSkyWalkingDynamicField(Matchers.any());
+ }
+
+ @Test
+ public void testSendMessageWithoutCallBack() throws Throwable {
+ messageSendInterceptor.beforeMethod(enhancedInstance, null,
argumentsWithoutCallback, null, null);
+ messageSendInterceptor.afterMethod(enhancedInstance, null,
argumentsWithoutCallback, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan mqSpan = spans.get(0);
+
+ SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
+ SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ);
+ SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
+ verify(messageRequestHeader, times(1)).setProperties(anyString());
+ }
+
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java
new file mode 100644
index 000000000..a66d5e4b1
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class OnExceptionInterceptorTest {
+
+ private OnExceptionInterceptor exceptionInterceptor;
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ @Mock
+ private ContextSnapshot contextSnapshot;
+ private SendCallBackEnhanceInfo enhanceInfo;
+
+ @Mock
+ private EnhancedInstance enhancedInstance;
+
+ @Before
+ public void setUp() {
+ exceptionInterceptor = new OnExceptionInterceptor();
+
+ enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot);
+
when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
+ }
+
+ @Test
+ public void testOnException() throws Throwable {
+ exceptionInterceptor.beforeMethod(enhancedInstance, null, new Object[]
{new RuntimeException()}, null, null);
+ exceptionInterceptor.afterMethod(enhancedInstance, null, new Object[]
{new RuntimeException()}, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan exceptionSpan = spans.get(0);
+ SpanAssert.assertException(SpanHelper.getLogs(exceptionSpan).get(0),
RuntimeException.class);
+ SpanAssert.assertOccurException(exceptionSpan, true);
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java
new file mode 100644
index 000000000..ef8e93986
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v3;
+
+import java.util.List;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.client.producer.SendStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class OnSuccessInterceptorTest {
+
+ private OnSuccessInterceptor successInterceptor;
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ @Mock
+ private ContextSnapshot contextSnapshot;
+ @Mock
+ private SendResult sendResult;
+
+ private SendCallBackEnhanceInfo enhanceInfo;
+
+ @Mock
+ private EnhancedInstance enhancedInstance;
+
+ @Before
+ public void setUp() {
+ successInterceptor = new OnSuccessInterceptor();
+
+ enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot);
+
when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
+ when(sendResult.getSendStatus()).thenReturn(SendStatus.SEND_OK);
+ }
+
+ @Test
+ public void testOnSuccess() throws Throwable {
+ successInterceptor.beforeMethod(enhancedInstance, null, new Object[]
{sendResult}, null, null);
+ successInterceptor.afterMethod(enhancedInstance, null, new Object[]
{sendResult}, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan successSpan = spans.get(0);
+
+ SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ);
+
+ }
+
+ @Test
+ public void testOnSuccessWithErrorStatus() throws Throwable {
+
when(sendResult.getSendStatus()).thenReturn(SendStatus.FLUSH_SLAVE_TIMEOUT);
+ successInterceptor.beforeMethod(enhancedInstance, null, new Object[]
{sendResult}, null, null);
+ successInterceptor.afterMethod(enhancedInstance, null, new Object[]
{sendResult}, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan successSpan = spans.get(0);
+
+ SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ);
+ SpanAssert.assertOccurException(successSpan, true);
+
+ }
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services