fuyou001 closed pull request #156: [ROCKETMQ-271] add tools for Analyzing 
message lifetime
URL: https://github.com/apache/rocketmq/pull/156
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/pom.xml b/broker/pom.xml
index fd1e69148..748deefe1 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -68,6 +68,10 @@
             <groupId>org.javassist</groupId>
             <artifactId>javassist</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index cd68552bd..ea54fc23f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -56,6 +56,7 @@
 import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
 import org.apache.rocketmq.broker.processor.PullMessageProcessor;
 import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
+import org.apache.rocketmq.broker.processor.QueryTracerTimeProcessor;
 import org.apache.rocketmq.broker.processor.SendMessageProcessor;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -129,6 +130,7 @@
     private ExecutorService adminBrokerExecutor;
     private ExecutorService clientManageExecutor;
     private ExecutorService consumerManageExecutor;
+    private ExecutorService queryTracerTimerExecutor;
     private boolean updateMasterHAServerAddrPeriodically = false;
     private BrokerStats brokerStats;
     private InetSocketAddress storeHost;
@@ -253,6 +255,9 @@ public boolean initialize() throws 
CloneNotSupportedException {
                 
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(),
 new ThreadFactoryImpl(
                     "ConsumerManageThread_"));
 
+            this.queryTracerTimerExecutor =   
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(),
 new ThreadFactoryImpl(
+                "queryTracerTimerThread_"));
+
             this.registerProcessor();
 
             final long initialDelay = UtilAll.computNextMorningTimeMillis() - 
System.currentTimeMillis();
@@ -386,12 +391,16 @@ public void registerProcessor() {
         sendProcessor.registerSendMessageHook(sendMessageHookList);
         sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
 
+        QueryTracerTimeProcessor queryTracerTimeProcessor = new 
QueryTracerTimeProcessor();
+
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
sendProcessor, this.sendMessageExecutor);
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, 
sendProcessor, this.sendMessageExecutor);
         this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, 
sendProcessor, this.sendMessageExecutor);
         
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, 
sendProcessor, this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.QUERY_TRACER_TIME, 
queryTracerTimeProcessor, this.queryTracerTimerExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
sendProcessor, this.sendMessageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, 
sendProcessor, this.sendMessageExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_TRACER_TIME, 
queryTracerTimeProcessor, this.queryTracerTimerExecutor);
         
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, 
sendProcessor, this.sendMessageExecutor);
         
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, 
sendProcessor, this.sendMessageExecutor);
         /**
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/ServerTracerTimeUtil.java 
b/broker/src/main/java/org/apache/rocketmq/broker/ServerTracerTimeUtil.java
new file mode 100644
index 000000000..c08be062b
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/ServerTracerTimeUtil.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ClientTracerTimeUtil;
+import org.apache.rocketmq.common.TracerTime;
+
+public class ServerTracerTimeUtil {
+
+    public static Cache<String, TracerTime> tracerTimeCache = 
CacheBuilder.newBuilder()
+        .maximumSize(10000)
+        .expireAfterWrite(15, TimeUnit.MINUTES)
+        .build();
+
+    public static boolean isEnableTracerTime() {
+        return ClientTracerTimeUtil.isEnableTracerTime();
+    }
+
+    public static void addMessageCreateTime(String messageTracerTimeId, String 
messageCreateTime) {
+        if (messageCreateTime == null || messageCreateTime.length() < 1) {
+            return;
+        }
+
+        TracerTime tracerTime = 
tracerTimeCache.getIfPresent(messageTracerTimeId);
+        if (tracerTime == null) {
+            tracerTime = new TracerTime();
+        }
+
+        tracerTime.setMessageCreateTime(Long.valueOf(messageCreateTime));
+
+        tracerTimeCache.put(messageTracerTimeId, tracerTime);
+    }
+
+    public static void addMessageSendTime(String messageTracerTimeId, String 
messageSendTime) {
+        TracerTime tracerTime = 
tracerTimeCache.getIfPresent(messageTracerTimeId);
+        if (tracerTime == null) {
+            return;
+        }
+        if (messageSendTime == null || messageSendTime.length() < 1) {
+            return;
+        }
+        tracerTime.setMessageSendTime(Long.valueOf(messageSendTime));
+    }
+
+    public static void addMessageArriveBrokerTime(String messageTracerTimeId, 
long messageArriveBrokerTime) {
+        TracerTime tracerTime = 
tracerTimeCache.getIfPresent(messageTracerTimeId);
+        if (tracerTime == null) {
+            return;
+        }
+
+        tracerTime.setMessageArriveBrokerTime(messageArriveBrokerTime);
+    }
+
+    public static void addMessageBeginSaveTime(String messageTracerTimeId, 
long messageBeginSaveTime) {
+        TracerTime tracerTime = 
tracerTimeCache.getIfPresent(messageTracerTimeId);
+        if (tracerTime == null) {
+            return;
+        }
+
+        tracerTime.setMessageBeginSaveTime(messageBeginSaveTime);
+    }
+
+    public static void addMessageSaveEndTime(String messageTracerTimeId, long 
messageSaveEndTime) {
+        TracerTime tracerTime = 
tracerTimeCache.getIfPresent(messageTracerTimeId);
+        if (tracerTime == null) {
+            return;
+        }
+
+        tracerTime.setMessageSaveEndTime(messageSaveEndTime);
+    }
+
+    public static void addBrokerSendAckTime(String messageTracerTimeId, long 
brokerSendAckTime) {
+        TracerTime tracerTime = 
tracerTimeCache.getIfPresent(messageTracerTimeId);
+        if (tracerTime == null) {
+            return;
+        }
+        tracerTime.setBrokerSendAckTime(brokerSendAckTime);
+    }
+
+    public static void addReceiveSendAckTime(String messageTracerTimeId, 
String receiveSendAckTime) {
+        TracerTime tracerTime = 
tracerTimeCache.getIfPresent(messageTracerTimeId);
+        if (tracerTime == null) {
+            return;
+        }
+        if (receiveSendAckTime == null || receiveSendAckTime.length() < 1) {
+            return;
+        }
+        tracerTime.setReceiveSendAckTime(Long.valueOf(receiveSendAckTime));
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/TrackerTimeSendMessageHook.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/TrackerTimeSendMessageHook.java
new file mode 100644
index 000000000..57e53383b
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/TrackerTimeSendMessageHook.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.mqtrace;
+
+import java.util.Map;
+import org.apache.rocketmq.broker.ServerTracerTimeUtil;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+
+public class TrackerTimeSendMessageHook implements SendMessageHook {
+
+    @Override
+    public String hookName() {
+        return "TrackerTimeSendMessageHook";
+    }
+
+    @Override
+    public void sendMessageBefore(SendMessageContext context) {
+        // brokerController.getBrokerConfig().isEnableTracerTime()
+        String props = context.getMsgProps();
+        if (props != null && props.length() > 1) {
+            Map<String, String> properties = 
MessageDecoder.string2messageProperties(props);
+            String messageTracerTimeId = 
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+            if (properties.containsKey(MessageConst.MESSAGE_CREATE_TIME)) {
+                ServerTracerTimeUtil.addMessageCreateTime(messageTracerTimeId, 
properties.get(MessageConst.MESSAGE_CREATE_TIME));
+                ServerTracerTimeUtil.addMessageSendTime(messageTracerTimeId, 
properties.get(MessageConst.MESSAGE_SEND_TIME));
+                
ServerTracerTimeUtil.addMessageArriveBrokerTime(messageTracerTimeId, 
System.currentTimeMillis());
+                
ServerTracerTimeUtil.addMessageBeginSaveTime(messageTracerTimeId, 
System.currentTimeMillis());
+            }
+        }
+
+    }
+
+    @Override
+    public void sendMessageAfter(SendMessageContext context) {
+        String props = context.getMsgProps();
+        if (props != null && props.length() > 1) {
+            Map<String, String> properties = 
MessageDecoder.string2messageProperties(props);
+            if (properties.containsKey(MessageConst.MESSAGE_CREATE_TIME)) {
+                String messageTracerTimeId = 
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+                
ServerTracerTimeUtil.addMessageSaveEndTime(messageTracerTimeId, 
System.currentTimeMillis());
+                ServerTracerTimeUtil.addBrokerSendAckTime(messageTracerTimeId, 
System.currentTimeMillis());
+                ServerTracerTimeUtil.addBrokerSendAckTime(messageTracerTimeId, 
System.currentTimeMillis());
+            }
+        }
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryTracerTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryTracerTimeProcessor.java
new file mode 100644
index 000000000..8bd26b68c
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryTracerTimeProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.ServerTracerTimeUtil;
+import org.apache.rocketmq.common.TracerTime;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.QueryTracerTimeRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.QueryTracerTimeResponseHeader;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class QueryTracerTimeProcessor implements NettyRequestProcessor {
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        if (RequestCode.QUERY_TRACER_TIME == request.getCode()) {
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(QueryTracerTimeResponseHeader.class);
+            final QueryTracerTimeRequestHeader requestHeader =
+                (QueryTracerTimeRequestHeader) 
request.decodeCommandCustomHeader(QueryTracerTimeRequestHeader.class);
+
+            String messageTracerTimeId = 
requestHeader.getMessageTracerTimeId();
+            TracerTime tracerTime = 
ServerTracerTimeUtil.tracerTimeCache.getIfPresent(messageTracerTimeId);
+
+            if (tracerTime != null) {
+                response.setBody(tracerTime.encode());
+            }
+            response.setCode(ResponseCode.SUCCESS);
+            return response;
+
+        } else {
+            throw new RuntimeException(String.format("request code must be 
%s", RequestCode.QUERY_TRACER_TIME));
+        }
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryTracerTimeProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryTracerTimeProcessorTest.java
new file mode 100644
index 000000000..2ea5cb21b
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryTracerTimeProcessorTest.java
@@ -0,0 +1,69 @@
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.ServerTracerTimeUtil;
+import org.apache.rocketmq.common.TracerTime;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.QueryTracerTimeRequestHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class QueryTracerTimeProcessorTest {
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    private QueryTracerTimeProcessor queryTracerTimeProcessor;
+
+    @Before
+    public void init() {
+        queryTracerTimeProcessor = new QueryTracerTimeProcessor();
+    }
+
+    @Test
+    public void test_no_exists() throws Exception {
+        final RemotingCommand request = createQueryTrackerTimeCommand("");
+        RemotingCommand responseToReturn = 
queryTracerTimeProcessor.processRequest(handlerContext, request);
+        assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        assertThat(responseToReturn.getBody()).isEqualTo(null);
+
+    }
+
+    @Test
+    public void test_exists() throws Exception {
+        long messageCreateTime = System.currentTimeMillis();
+        TracerTime tracerTime = new TracerTime();
+        tracerTime.setMessageCreateTime(messageCreateTime);
+
+        String messageTracerTimeId = "messageTracerTimeId";
+
+        final RemotingCommand request = 
createQueryTrackerTimeCommand(messageTracerTimeId);
+
+        ServerTracerTimeUtil.tracerTimeCache.put(messageTracerTimeId, 
tracerTime);
+
+        RemotingCommand responseToReturn = 
queryTracerTimeProcessor.processRequest(handlerContext, request);
+
+        byte[] body = responseToReturn.getBody();
+        TracerTime responseTracerTime = TracerTime.decode(body, 
TracerTime.class);
+
+        assertThat(responseTracerTime).isEqualTo(tracerTime);
+        
assertThat(responseTracerTime.getMessageCreateTime()).isEqualTo(tracerTime.getMessageCreateTime());
+
+    }
+
+    private RemotingCommand createQueryTrackerTimeCommand(String 
messageTracerTimeId) {
+        QueryTracerTimeRequestHeader requestHeader = new 
QueryTracerTimeRequestHeader();
+        requestHeader.setMessageTracerTimeId(messageTracerTimeId);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_TRACER_TIME, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        return request;
+    }
+
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/hook/TracerTimeSendMessageHook.java
 
b/client/src/main/java/org/apache/rocketmq/client/hook/TracerTimeSendMessageHook.java
new file mode 100644
index 000000000..8a619283c
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/hook/TracerTimeSendMessageHook.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.hook;
+
+import java.lang.reflect.Method;
+import org.apache.rocketmq.common.ClientTracerTimeUtil;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+
+public class TracerTimeSendMessageHook implements SendMessageHook {
+
+    @Override
+    public String hookName() {
+        return "TracerTimeSendMessageHook";
+    }
+
+    @Override
+    public void sendMessageBefore(SendMessageContext context) {
+        tracerTimeIfNecessary(context.getMessage(), 
MessageConst.MESSAGE_SEND_TIME);
+    }
+
+    @Override
+    public void sendMessageAfter(SendMessageContext context) {
+        tracerTimeIfNecessary(context.getMessage(), 
MessageConst.RECEIVE_SEND_ACK_TIME);
+    }
+
+    public void tracerTimeIfNecessary(Message msg, String propertyKey) {
+        if (ClientTracerTimeUtil.isEnableTracerTime()) {
+            try {
+                Method putPropertyMethod = 
msg.getClass().getDeclaredMethod("putProperty", String.class, String.class);
+                putPropertyMethod.setAccessible(true);
+                putPropertyMethod.invoke(msg, propertyKey, 
String.valueOf(System.currentTimeMillis()));
+            } catch (Exception e) {
+            }
+        }
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c5abc36bf..60a70c7fb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -45,6 +45,7 @@
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TracerTime;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
@@ -112,6 +113,7 @@
 import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
 import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryTracerTimeRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
@@ -2084,4 +2086,28 @@ public void checkClientInBroker(final String brokerAddr, 
final String consumerGr
             throw new MQClientException(response.getCode(), 
response.getRemark());
         }
     }
+
+    public TracerTime queryTracerTime(final String brokerAddr, final String 
messageTracerTimeId,
+        final long timeoutMillis) throws InterruptedException,
+        RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQClientException {
+
+        QueryTracerTimeRequestHeader requestHeader = new 
QueryTracerTimeRequestHeader();
+        requestHeader.setMessageTracerTimeId(messageTracerTimeId);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_TRACER_TIME, 
requestHeader);
+
+        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 brokerAddr), request, timeoutMillis);
+
+        assert response != null;
+
+        if (ResponseCode.SUCCESS == response.getCode()) {
+            if (response.getBody() != null) {
+                return TracerTime.decode(response.getBody(), TracerTime.class);
+            } else {
+                return null;
+            }
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
 }
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9a208a3fa..33c4cded0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -63,6 +63,8 @@
     private int clientManageThreadPoolNums = 32;
     private int consumerManageThreadPoolNums = 32;
 
+    private int queryTrackTimeThreadPoolNums = 2;
+
     private int flushConsumerOffsetInterval = 1000 * 5;
 
     private int flushConsumerOffsetHistoryInterval = 1000 * 60;
@@ -128,6 +130,9 @@
     private boolean filterSupportRetry = false;
     private boolean enablePropertyFilter = false;
 
+
+    private volatile boolean enableTracerTime = true;
+
     public boolean isTraceOn() {
         return traceOn;
     }
@@ -561,4 +566,20 @@ public boolean isEnablePropertyFilter() {
     public void setEnablePropertyFilter(boolean enablePropertyFilter) {
         this.enablePropertyFilter = enablePropertyFilter;
     }
+
+    public boolean isEnableTracerTime() {
+        return enableTracerTime;
+    }
+
+    public void setEnableTracerTime(boolean enableTracerTime) {
+        this.enableTracerTime = enableTracerTime;
+    }
+
+    public int getQueryTrackTimeThreadPoolNums() {
+        return queryTrackTimeThreadPoolNums;
+    }
+
+    public void setQueryTrackTimeThreadPoolNums(int 
queryTrackTimeThreadPoolNums) {
+        this.queryTrackTimeThreadPoolNums = queryTrackTimeThreadPoolNums;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/ClientTracerTimeUtil.java 
b/common/src/main/java/org/apache/rocketmq/common/ClientTracerTimeUtil.java
new file mode 100644
index 000000000..0bb51f151
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/ClientTracerTimeUtil.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+public class ClientTracerTimeUtil {
+    public static final String MESSAGE_TRACER_TIME_ENABLE = 
"MESSAGE_TRACER_TIME_ENABLE";
+
+    private static boolean isEnableMessageTracerTime = 
Boolean.parseBoolean(System.getProperty(MESSAGE_TRACER_TIME_ENABLE, 
Boolean.FALSE.toString()));
+
+    public static boolean isEnableTracerTime() {
+        return isEnableMessageTracerTime;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TracerTime.java 
b/common/src/main/java/org/apache/rocketmq/common/TracerTime.java
new file mode 100644
index 000000000..e03ff2f87
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/TracerTime.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class TracerTime extends RemotingSerializable {
+
+    private long messageCreateTime;
+    private long messageSendTime;
+    private long messageArriveBrokerTime;
+    private long messageBeginSaveTime;
+    private long messageSaveEndTime;
+    private long brokerSendAckTime;
+    private long receiveSendAckTime;
+
+    public TracerTime() {
+    }
+
+    public long getMessageCreateTime() {
+        return messageCreateTime;
+    }
+
+    public void setMessageCreateTime(long messageCreateTime) {
+        this.messageCreateTime = messageCreateTime;
+    }
+
+    public long getMessageSendTime() {
+        return messageSendTime;
+    }
+
+    public void setMessageSendTime(long messageSendTime) {
+        this.messageSendTime = messageSendTime;
+    }
+
+    public long getMessageArriveBrokerTime() {
+        return messageArriveBrokerTime;
+    }
+
+    public void setMessageArriveBrokerTime(long messageArriveBrokerTime) {
+        this.messageArriveBrokerTime = messageArriveBrokerTime;
+    }
+
+    public long getMessageBeginSaveTime() {
+        return messageBeginSaveTime;
+    }
+
+    public void setMessageBeginSaveTime(long messageBeginSaveTime) {
+        this.messageBeginSaveTime = messageBeginSaveTime;
+    }
+
+    public long getMessageSaveEndTime() {
+        return messageSaveEndTime;
+    }
+
+    public void setMessageSaveEndTime(long messageSaveEndTime) {
+        this.messageSaveEndTime = messageSaveEndTime;
+    }
+
+    public long getBrokerSendAckTime() {
+        return brokerSendAckTime;
+    }
+
+    public void setBrokerSendAckTime(long brokerSendAckTime) {
+        this.brokerSendAckTime = brokerSendAckTime;
+    }
+
+    public long getReceiveSendAckTime() {
+        return receiveSendAckTime;
+    }
+
+    public void setReceiveSendAckTime(long receiveSendAckTime) {
+        this.receiveSendAckTime = receiveSendAckTime;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        TracerTime that = (TracerTime) o;
+
+        if (messageCreateTime != that.messageCreateTime)
+            return false;
+        if (messageSendTime != that.messageSendTime)
+            return false;
+        if (messageArriveBrokerTime != that.messageArriveBrokerTime)
+            return false;
+        if (messageBeginSaveTime != that.messageBeginSaveTime)
+            return false;
+        if (messageSaveEndTime != that.messageSaveEndTime)
+            return false;
+        if (brokerSendAckTime != that.brokerSendAckTime)
+            return false;
+        return receiveSendAckTime == that.receiveSendAckTime;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (messageCreateTime ^ (messageCreateTime >>> 32));
+        result = 31 * result + (int) (messageSendTime ^ (messageSendTime >>> 
32));
+        result = 31 * result + (int) (messageArriveBrokerTime ^ 
(messageArriveBrokerTime >>> 32));
+        result = 31 * result + (int) (messageBeginSaveTime ^ 
(messageBeginSaveTime >>> 32));
+        result = 31 * result + (int) (messageSaveEndTime ^ (messageSaveEndTime 
>>> 32));
+        result = 31 * result + (int) (brokerSendAckTime ^ (brokerSendAckTime 
>>> 32));
+        result = 31 * result + (int) (receiveSendAckTime ^ (receiveSendAckTime 
>>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("TracerTime{");
+        sb.append("messageCreateTime=").append(messageCreateTime);
+        sb.append(", messageSendTime=").append(messageSendTime);
+        sb.append(", 
messageArriveBrokerTime=").append(messageArriveBrokerTime);
+        sb.append(", messageBeginSaveTime=").append(messageBeginSaveTime);
+        sb.append(", messageSaveEndTime=").append(messageSaveEndTime);
+        sb.append(", brokerSendAckTime=").append(brokerSendAckTime);
+        sb.append(", receiveSendAckTime=").append(receiveSendAckTime);
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/Message.java 
b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index 15ba2142c..ba23fd12d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -20,6 +20,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.rocketmq.common.ClientTracerTimeUtil;
 
 public class Message implements Serializable {
     private static final long serialVersionUID = 8445773977080406428L;
@@ -48,6 +49,10 @@ public Message(String topic, String tags, String keys, int 
flag, byte[] body, bo
             this.setKeys(keys);
 
         this.setWaitStoreMsgOK(waitStoreMsgOK);
+
+        if (ClientTracerTimeUtil.isEnableTracerTime()) {
+            putProperty(MessageConst.MESSAGE_CREATE_TIME, 
String.valueOf(System.currentTimeMillis()));
+        }
     }
 
     public Message(String topic, String tags, byte[] body) {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 1edbbec70..108fec967 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -41,6 +41,13 @@
     public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = 
"UNIQ_KEY";
     public static final String PROPERTY_MAX_RECONSUME_TIMES = 
"MAX_RECONSUME_TIMES";
     public static final String PROPERTY_CONSUME_START_TIMESTAMP = 
"CONSUME_START_TIME";
+    public static final String MESSAGE_CREATE_TIME = "MESSAGE_CREATE_TIME";
+    public static final String MESSAGE_SEND_TIME = "MESSAGE_SEND_TIME";
+    public static final String MESSAGE_ARRIVE_BROKER_TIME = 
"MESSAGE_ARRIVE_BROKER_TIME";
+    public static final String MESSAGE_BEGIN_SAVE_TIME = 
"MESSAGE_BEGIN_SAVE_TIME";
+    public static final String MESSAGE_SAVE_END_TIME = "MESSAGE_SAVE_END_TIME";
+    public static final String BROKER_SEND_ACK_TIME = "BROKER_SEND_ACK_TIME";
+    public static final String RECEIVE_SEND_ACK_TIME = "RECEIVE_SEND_ACK_TIME";
 
     public static final String KEY_SEPARATOR = " ";
 
@@ -69,5 +76,12 @@
         STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
         STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
         STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
+        STRING_HASH_SET.add(MESSAGE_CREATE_TIME);
+        STRING_HASH_SET.add(MESSAGE_SEND_TIME);
+        STRING_HASH_SET.add(MESSAGE_ARRIVE_BROKER_TIME);
+        STRING_HASH_SET.add(MESSAGE_BEGIN_SAVE_TIME);
+        STRING_HASH_SET.add(MESSAGE_SAVE_END_TIME);
+        STRING_HASH_SET.add(BROKER_SEND_ACK_TIME);
+        STRING_HASH_SET.add(RECEIVE_SEND_ACK_TIME);
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 5900c0b9d..c30193560 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -165,4 +165,8 @@
     public static final int SEND_BATCH_MESSAGE = 320;
 
     public static final int QUERY_CONSUME_QUEUE = 321;
+
+    public static final int QUERY_TRACER_TIME = 322;
+
+
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTracerTimeRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTracerTimeRequestHeader.java
new file mode 100644
index 000000000..565d9c366
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTracerTimeRequestHeader.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class QueryTracerTimeRequestHeader implements CommandCustomHeader {
+
+    private String messageTracerTimeId;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getMessageTracerTimeId() {
+        return messageTracerTimeId;
+    }
+
+    public void setMessageTracerTimeId(String messageTracerTimeId) {
+        this.messageTracerTimeId = messageTracerTimeId;
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTracerTimeResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTracerTimeResponseHeader.java
new file mode 100644
index 000000000..008a06b55
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTracerTimeResponseHeader.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class QueryTracerTimeResponseHeader implements CommandCustomHeader {
+
+    private String result;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getResult() {
+        return result;
+    }
+
+    public void setResult(String result) {
+        this.result = result;
+    }
+}
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
index 26b77fe84..b9beb9c17 100644
--- 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
+++ 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -164,4 +164,6 @@ public void shutdown() {
         }
         return mqs;
     }
+
+
 }
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java 
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
index bd151d056..ce07fc15c 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
@@ -20,6 +20,7 @@
 import java.util.HashMap;
 import java.util.Set;
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.TracerTime;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -160,4 +161,24 @@ public void getSubConnection(String nameSrvAddr, String 
clusterName, String cons
         mqAdminExt.shutdown();
     }
 
+    public static TracerTime queryTracerTime(String nameSrvAddr, String 
clusterName, String topic,
+        String messageTracerTimeId) {
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(nameSrvAddr);
+        try {
+            mqAdminExt.start();
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
+                clusterName);
+            for (String brokerAddr : masterSet) {
+                TracerTime tracerTime = mqAdminExt.queryTracerTime(brokerAddr, 
topic, messageTracerTimeId);
+                if (tracerTime != null) {
+                    return tracerTime;
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java 
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 8516779e4..707640b24 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import org.apache.log4j.Logger;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.TracerTime;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
@@ -31,8 +32,6 @@
 import org.apache.rocketmq.test.listener.AbstractListener;
 import org.apache.rocketmq.test.util.MQAdmin;
 import org.apache.rocketmq.test.util.MQRandomUtils;
-import org.apache.rocketmq.test.util.TestUtils;
-import org.junit.Assert;
 
 public class BaseConf {
     protected static String nsAddr;
@@ -144,4 +143,14 @@ public static void shutDown() {
         }
 
     }
+
+    public static BrokerController getBrokerController1() {
+        return brokerController1;
+    }
+
+    public static TracerTime queryTracerTime(String topic,
+        String messageTracerTimeId) {
+       return IntegrationTestBase.queryTracerTime(nsAddr, clusterName,topic, 
messageTracerTimeId);
+
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java 
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index e1b8c9102..d95ca5401 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -24,7 +24,9 @@
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.mqtrace.TrackerTimeSendMessageHook;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TracerTime;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.namesrv.NamesrvController;
@@ -137,7 +139,12 @@ public static BrokerController createAndStartBroker(String 
nsAddr) {
         storeConfig.setMaxIndexNum(INDEX_NUM);
         storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
         nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
+
         BrokerController brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, storeConfig);
+        if (brokerConfig.isEnableTracerTime()) {
+            brokerController.registerSendMessageHook(new 
TrackerTimeSendMessageHook());
+        }
+
         try {
             Assert.assertTrue(brokerController.initialize());
             logger.info("Broker Start name:{} addr:{}", 
brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
@@ -190,4 +197,9 @@ public static void deleteFile(File file) {
         }
     }
 
+    public static TracerTime queryTracerTime(String nameSrvAddr, String 
clusterName, String topic,
+        String messageTracerTimeId) {
+        return MQAdmin.queryTracerTime(nameSrvAddr, clusterName, topic, 
messageTracerTimeId);
+    }
+
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/TracerTimeSendMessageTest.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/TracerTimeSendMessageTest.java
new file mode 100644
index 000000000..ad7f92ee1
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/TracerTimeSendMessageTest.java
@@ -0,0 +1,109 @@
+package org.apache.rocketmq.test.client.producer;
+
+import static com.google.common.truth.Truth.assertThat;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.hook.TracerTimeSendMessageHook;
+import org.apache.rocketmq.common.ClientTracerTimeUtil;
+import org.apache.rocketmq.common.TracerTime;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.producer.order.OrderMsgIT;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.tools.command.stats.QueryTracerTimeCommand;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TracerTimeSendMessageTest extends BaseConf {
+
+    private static Logger logger = Logger.getLogger(OrderMsgIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        shutDown();
+    }
+
+    @Test
+    public void testOrderMsg_with_tracer() throws Exception {
+
+        int msgSize = 1;
+
+        System.setProperty(ClientTracerTimeUtil.MESSAGE_TRACER_TIME_ENABLE, 
"true");
+        List<Object> messageList = MQMessageFactory.getMsg(topic, msgSize, 
"tag");
+
+        Message message = (Message) messageList.get(0);
+
+        
producer.getProducer().getDefaultMQProducerImpl().registerSendMessageHook(new 
TracerTimeSendMessageHook());
+
+        producer.send(message);
+
+        Map<String, String> properties = message.getProperties();
+        String messageTraceTimeId = 
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+        
assertThat(properties.containsKey(MessageConst.MESSAGE_CREATE_TIME)).isTrue();
+        
assertThat(properties.containsKey(MessageConst.MESSAGE_SEND_TIME)).isTrue();
+        
assertThat(properties.containsKey(MessageConst.RECEIVE_SEND_ACK_TIME)).isTrue();
+        assertThat(messageTraceTimeId).isNotEmpty();
+
+        TracerTime tracerTime = queryTracerTime(topic, messageTraceTimeId);
+
+        
assertThat(String.valueOf(tracerTime.getMessageCreateTime())).isEqualTo(properties.get(MessageConst.MESSAGE_CREATE_TIME));
+
+    }
+
+    @Test
+    public void test_QueryTracerTimeCommand() throws Exception {
+        int msgSize = 1;
+
+        Field tracerTimeFiled = 
ClientTracerTimeUtil.class.getDeclaredField("isEnableMessageTracerTime");
+
+        tracerTimeFiled.setAccessible(true);
+        tracerTimeFiled.setBoolean(null, true);
+
+        List<Object> messageList = MQMessageFactory.getMsg(topic, msgSize, 
"tag");
+
+        Message message = (Message) messageList.get(0);
+
+        producer.send(message);
+
+        Map<String, String> properties = message.getProperties();
+        String messageTraceTimeId = 
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+
+        List<String> args = new ArrayList<>();
+        args.add("-b");
+        args.add(BaseConf.getBrokerController1().getBrokerAddr());
+        args.add("-id");
+        args.add(messageTraceTimeId);
+
+        try {
+            Options options = ServerUtil.buildCommandlineOptions(new 
Options());
+
+            QueryTracerTimeCommand queryTracerTimeCommand = new 
QueryTracerTimeCommand();
+
+            CommandLine commandLine =
+                ServerUtil.parseCmdLine(queryTracerTimeCommand.commandName(), 
args.toArray(new String[0]), 
queryTracerTimeCommand.buildCommandlineOptions(options), new PosixParser());
+
+            queryTracerTimeCommand.execute(commandLine, options, null);
+        } catch (Exception e) {
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index eb45de22f..466ada2e6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -27,6 +27,7 @@
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TracerTime;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.RollbackStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
@@ -513,4 +514,10 @@ public QueryConsumeQueueResponseBody 
queryConsumeQueue(String brokerAddr, String
             brokerAddr, topic, queueId, index, count, consumerGroup
         );
     }
+
+    @Override
+    public TracerTime queryTracerTime(final String brokerAddr,String topic, 
String messageTracerTimeId)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException {
+        return this.defaultMQAdminExtImpl.queryTracerTime(brokerAddr,topic, 
messageTracerTimeId);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index c93c40067..28312a84c 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -29,6 +29,7 @@
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -40,6 +41,7 @@
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TracerTime;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
@@ -1000,4 +1002,32 @@ public QueryConsumeQueueResponseBody 
queryConsumeQueue(String brokerAddr, String
             brokerAddr, topic, queueId, index, count, consumerGroup, 
timeoutMillis
         );
     }
+
+    @Override
+    public TracerTime queryTracerTime(final String brokerAddr, final String 
topic, final String messageTracerTimeId)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException {
+
+        if (brokerAddr != null) {
+            return 
this.mqClientInstance.getMQClientAPIImpl().queryTracerTime(brokerAddr, 
messageTracerTimeId, timeoutMillis);
+        }
+
+        this.mqClientInstance.updateTopicRouteInfoFromNameServer();
+
+        ConcurrentMap<String, TopicRouteData> topicRouteTable = 
this.mqClientInstance.getTopicRouteTable();
+        if (topicRouteTable != null) {
+            TopicRouteData topicRouteData = topicRouteTable.get(topic);
+            if (topicRouteData != null) {
+                List<BrokerData> brokerDataList = 
topicRouteData.getBrokerDatas();
+                if (brokerDataList != null) {
+                    for (BrokerData brokerData : brokerDataList) {
+                        TracerTime tracerTime = 
this.mqClientInstance.getMQClientAPIImpl().queryTracerTime(brokerData.selectBrokerAddr(),
 messageTracerTimeId, timeoutMillis);
+                        if (tracerTime != null) {
+                            return tracerTime;
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 16b442757..293b7700d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -25,6 +25,7 @@
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TracerTime;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.RollbackStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
@@ -259,4 +260,7 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final 
String brokerAddr,
         final String topic, final int queueId,
         final long index, final int count, final String consumerGroup)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException;
+
+    TracerTime queryTracerTime(final String brokerAddr,final String topic, 
final String messageTracerTimeId)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException;
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 6398291a0..670ae22ca 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -60,6 +60,7 @@
 import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
 import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
 import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
+import org.apache.rocketmq.tools.command.stats.QueryTracerTimeCommand;
 import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
 import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
 import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
@@ -192,6 +193,8 @@ public static void initCommand() {
         initCommand(new GetBrokerConfigCommand());
 
         initCommand(new QueryConsumeQueueCommand());
+
+        initCommand(new QueryTracerTimeCommand());
     }
 
     private static void initLogback() throws JoranException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/QueryTracerTimeCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/QueryTracerTimeCommand.java
new file mode 100644
index 000000000..e041b8854
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/QueryTracerTimeCommand.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.stats;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.TracerTime;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class QueryTracerTimeCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "queryTracerTimeCommand";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "queryTracerTimeCommand";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("id", "tracerTimeId", true, "tracerTimeId");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "clusterName");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("b", "broker", true, "broker addr.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            Set<String> masterSet = new HashSet<>();
+
+            String clusterName = null;
+            String tracerTimeId = commandLine.getOptionValue("id").trim();
+            String brokerAddr = "";
+            String topic = "";
+
+            if (commandLine.hasOption("b")) {
+                brokerAddr = commandLine.getOptionValue("b").trim();
+                masterSet.add(brokerAddr);
+            }
+
+
+            if (commandLine.hasOption("t")) {
+                TopicRouteData topicRouteData = 
defaultMQAdminExt.examineTopicRouteInfo(topic);
+                if (topicRouteData != null && topicRouteData.getBrokerDatas() 
!= null) {
+                    List<BrokerData> brokerDataList = 
topicRouteData.getBrokerDatas();
+                    for (BrokerData brokerData : brokerDataList) {
+                        masterSet.add(brokerData.selectBrokerAddr());
+                    }
+                }
+            }
+
+            if (commandLine.hasOption("c")) {
+                clusterName = commandLine.getOptionValue("c").trim();
+            }
+
+            defaultMQAdminExt.start();
+            if (masterSet.isEmpty()) {
+                masterSet = 
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt,
+                    clusterName);
+            }
+
+            String formatter = "%n%-35s  %-18s  %-23s  %-23s %-23s %-23s %-23s 
%-23s %n";
+
+            System.out.printf(formatter,
+                "#tracerTimeId",
+                "#brokerAddr",
+                "#messageCreateTime",
+                "#messageSendTime",
+                "#messageArriveBrokerTime",
+                "#messageBeginSaveTime",
+                "#messageSaveEndTime",
+                "#brokerSendAckTime");
+
+            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+            for (String broker : masterSet) {
+                TracerTime tracerTime = 
defaultMQAdminExt.queryTracerTime(broker, null, tracerTimeId);
+                if (tracerTime != null) {
+                    System.out.printf(formatter,
+                        tracerTimeId,
+                        broker,
+                        df.format(new Date(tracerTime.getMessageCreateTime())),
+                        df.format(new Date(tracerTime.getMessageSendTime())),
+                        df.format(new 
Date(tracerTime.getMessageArriveBrokerTime())),
+                        df.format(new 
Date(tracerTime.getMessageBeginSaveTime())),
+                        df.format(new 
Date(tracerTime.getMessageSaveEndTime())),
+                        df.format(new 
Date(tracerTime.getBrokerSendAckTime())));
+                }
+            }
+
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to