dongeforever closed pull request #645: [ISSUE#525]restructure and optimize
codes for message track
URL: https://github.com/apache/rocketmq/pull/645
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index bd5eaeeab..f1ae2612a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -126,7 +126,7 @@ public TopicConfigManager(BrokerController
brokerController) {
}
{
if
(this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
- String topic =
this.brokerController.getBrokerConfig().getMsgTrackTopicName();
+ String topic =
this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index cbe22fdb3..edb8cb5ab 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -18,7 +18,6 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
@@ -32,9 +31,7 @@
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceDispatcherType;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
@@ -288,8 +285,8 @@ public DefaultMQPushConsumer(final String consumerGroup,
RPCHook rpcHook,
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
- * @param msgTraceSwitch switch flag instance for message track trace.
- * @param traceTopicName the name value of message track trace topic.If
you don't config,you can use the default trace topic name.
+ * @param msgTraceSwitch switch flag instance for message trace.
+ * @param traceTopicName the name value of message trace topic.If you
don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
msgTraceSwitch, final String traceTopicName) {
@@ -298,21 +295,9 @@ public DefaultMQPushConsumer(final String consumerGroup,
RPCHook rpcHook,
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
if (msgTraceSwitch) {
try {
- Properties tempProperties = new Properties();
- tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000");
- tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048");
- tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100");
- tempProperties.put(TraceConstants.INSTANCE_NAME,
"PID_CLIENT_INNER_TRACE_PRODUCER");
- tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE,
TraceDispatcherType.CONSUMER.name());
- if (!UtilAll.isBlank(traceTopicName)) {
- tempProperties.put(TraceConstants.TRACE_TOPIC,
traceTopicName);
- } else {
- tempProperties.put(TraceConstants.TRACE_TOPIC,
MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
- }
- AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(tempProperties, rpcHook);
+ AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(traceTopicName, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
-
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
@@ -334,8 +319,8 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
- * @param msgTraceSwitch switch flag instance for message track trace.
- * @param traceTopicName the name value of message track trace topic.If
you don't config,you can use the default trace topic name.
+ * @param msgTraceSwitch switch flag instance for message trace.
+ * @param traceTopicName the name value of message trace topic.If you
don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean
msgTraceSwitch, final String traceTopicName) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(),
msgTraceSwitch, traceTopicName);
@@ -585,9 +570,7 @@ public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
- Properties tempProperties = new Properties();
- tempProperties.put(TraceConstants.NAMESRV_ADDR,
this.getNamesrvAddr());
- traceDispatcher.start(tempProperties);
+ traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 1592d22b2..23cb53ca1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -18,7 +18,6 @@
import java.util.Collection;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
@@ -28,12 +27,9 @@
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceConstants;
-import org.apache.rocketmq.client.trace.TraceDispatcherType;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -158,31 +154,18 @@ public DefaultMQProducer(final String producerGroup,
RPCHook rpcHook) {
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
- * @param msgTraceSwitch switch flag instance for message track trace.
- * @param traceTopicName the name value of message track trace topic.If
you don't config,you can use the default trace topic name.
+ * @param msgTraceSwitch switch flag instance for message trace.
+ * @param traceTopicName the name value of message trace topic.If you
don't config,you can use the default trace topic name.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook,
boolean msgTraceSwitch,final String traceTopicName) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- //if client open the message track trace feature
- //TODO wrap this code to TraceDispatcherFactory
+ //if client open the message trace feature
if (msgTraceSwitch) {
try {
- Properties tempProperties = new Properties();
- tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000");
- tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048");
- tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100");
- tempProperties.put(TraceConstants.INSTANCE_NAME,
"PID_CLIENT_INNER_TRACE_PRODUCER");
- tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE,
TraceDispatcherType.PRODUCER.name());
- if (!UtilAll.isBlank(traceTopicName)) {
- tempProperties.put(TraceConstants.TRACE_TOPIC,
traceTopicName);
- } else {
- tempProperties.put(TraceConstants.TRACE_TOPIC,
MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
- }
- AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(tempProperties, rpcHook);
+ AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(traceTopicName, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
-
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
@@ -204,8 +187,8 @@ public DefaultMQProducer(final String producerGroup) {
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
- * @param msgTraceSwitch switch flag instance for message track trace.
- * @param traceTopicName the name value of message track trace topic.If
you don't config,you can use the default trace topic name.
+ * @param msgTraceSwitch switch flag instance for message trace.
+ * @param traceTopicName the name value of message trace topic.If you
don't config,you can use the default trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean
msgTraceSwitch, final String traceTopicName) {
this(producerGroup, null, msgTraceSwitch, traceTopicName);
@@ -235,12 +218,9 @@ public DefaultMQProducer(RPCHook rpcHook) {
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
- //TODO wrap this code to TraceDispatcherFactory
if (null != traceDispatcher) {
try {
- Properties tempProperties = new Properties();
- tempProperties.put(TraceConstants.NAMESRV_ADDR,
this.getNamesrvAddr());
- traceDispatcher.start(tempProperties);
+ traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index ce82d1bb1..04ef8e09a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -26,7 +27,9 @@
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
@@ -35,7 +38,6 @@
import java.util.List;
import java.util.HashMap;
import java.util.UUID;
-import java.util.Properties;
import java.util.Set;
import java.util.HashSet;
import java.util.ArrayList;
@@ -46,11 +48,14 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.RPCHook;
+import static
org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
+
public class AsyncTraceDispatcher implements TraceDispatcher {
private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
+ private final int maxMsgSize;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecuter;
// the last discard number of log
@@ -60,24 +65,27 @@
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
- private String dispatcherType;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
+ private static AtomicBoolean isStarted = new AtomicBoolean(false);
+
- public AsyncTraceDispatcher(Properties properties, RPCHook rpcHook) throws
MQClientException {
- dispatcherType =
properties.getProperty(TraceConstants.TRACE_DISPATCHER_TYPE);
- int queueSize =
Integer.parseInt(properties.getProperty(TraceConstants.ASYNC_BUFFER_SIZE,
"2048"));
+ public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws
MQClientException {
// queueSize is greater than or equal to the n power of 2 of value
- queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
- this.queueSize = queueSize;
- batchSize =
Integer.parseInt(properties.getProperty(TraceConstants.MAX_BATCH_NUM, "1"));
+ this.queueSize = 2048;
+ this.batchSize = 100;
+ this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
- traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
- appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
- traceTopicName = properties.getProperty(TraceConstants.TRACE_TOPIC);
+ this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
+ this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
+ if (!UtilAll.isBlank(traceTopicName)) {
+ this.traceTopicName = traceTopicName;
+ } else {
+ this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
+ }
this.traceExecuter = new ThreadPoolExecutor(//
10, //
20, //
@@ -85,7 +93,7 @@ public AsyncTraceDispatcher(Properties properties, RPCHook
rpcHook) throws MQCli
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
- traceProducer =
TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
+ traceProducer = getAndCreateTraceProducer(rpcHook);
}
public String getTraceTopicName() {
@@ -116,14 +124,31 @@ public void setHostConsumer(DefaultMQPushConsumerImpl
hostConsumer) {
this.hostConsumer = hostConsumer;
}
- public void start(Properties properties) throws MQClientException {
- TraceProducerFactory.registerTraceDispatcher(dispatcherId,
properties.getProperty(TraceConstants.NAMESRV_ADDR));
+ public void start(String nameSrvAddr) throws MQClientException {
+ if (isStarted.compareAndSet(false, true)) {
+ traceProducer.setNamesrvAddr(nameSrvAddr);
+ traceProducer.start();
+ }
this.worker = new Thread(new AsyncRunnable(),
"MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
+ private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
+ DefaultMQProducer traceProducerInstance = this.traceProducer;
+ if (traceProducerInstance == null) {
+ traceProducerInstance = new DefaultMQProducer(rpcHook);
+ traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
+ traceProducerInstance.setSendMsgTimeout(5000);
+ traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME);
+ traceProducerInstance.setVipChannelEnabled(false);
+ //the max size of message is 128K
+ traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
+ }
+ return traceProducerInstance;
+ }
+
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
@@ -151,7 +176,9 @@ public void flush() throws IOException {
public void shutdown() {
this.stopped = true;
this.traceExecuter.shutdown();
- TraceProducerFactory.unregisterTraceDispatcher(dispatcherId);
+ if (isStarted.get()) {
+ traceProducer.shutdown();
+ }
this.removeShutdownHook();
}
@@ -193,7 +220,7 @@ public void run() {
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
- //get track trace data element from blocking Queue —
traceContextQueue
+ //get trace data element from blocking Queue —
traceContextQueue
context = traceContextQueue.poll(5,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
@@ -266,7 +293,7 @@ private void flushData(List<TraceTransferBean>
transBeanList) {
Set<String> keySet = new HashSet<String>();
for (TraceTransferBean bean : transBeanList) {
- // keyset of message track trace includes msgId of or original
message
+ // keyset of message trace includes msgId of or original
message
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
count++;
@@ -286,16 +313,16 @@ private void flushData(List<TraceTransferBean>
transBeanList) {
}
/**
- * send message track trace data
+ * send message trace data
*
* @param keySet the keyset in this batch(including msgId in original
message not offsetMsgId)
- * @param data the message track trace data in this batch
+ * @param data the message trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data) {
String topic = traceTopicName;
final Message message = new Message(topic, data.getBytes());
- //keyset of message track trace includes msgId of or original
message
+ //keyset of message trace includes msgId of or original message
message.setKeys(keySet);
try {
Set<String> traceBrokerSet =
tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
index 970b55635..b9fd8778e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
@@ -17,16 +17,9 @@
package org.apache.rocketmq.client.trace;
public class TraceConstants {
- public static final String NAMESRV_ADDR = "NAMESRV_ADDR";
- public static final String ADDRSRV_URL = "ADDRSRV_URL";
- public static final String INSTANCE_NAME = "InstanceName";
- public static final String ASYNC_BUFFER_SIZE = "AsyncBufferSize";
- public static final String MAX_BATCH_NUM = "MaxBatchNum";
- public static final String WAKE_UP_NUM = "WakeUpNum";
- public static final String MAX_MSG_SIZE = "MaxMsgSize";
+
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
- public static final String TRACE_TOPIC = "TRACK_TRACE_TOPIC_NAME";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
- public static final String TRACE_DISPATCHER_TYPE = "DispatcherType";
+ public static final String TRACE_INSTANCE_NAME =
"PID_CLIENT_INNER_TRACE_PRODUCER";
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
index 2370db955..f61ba888c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
@@ -21,7 +21,7 @@
import java.util.List;
/**
- * The context of Track Trace
+ * The context of Trace
*/
public class TraceContext implements Comparable<TraceContext> {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index 6015e27e6..2ed894024 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -22,12 +22,12 @@
import java.util.List;
/**
- * encode/decode for Track Trace Data
+ * encode/decode for Trace Data
*/
public class TraceDataEncoder {
/**
- * resolving traceContext list From track trace data String
+ * resolving traceContext list From trace data String
*
* @param traceData
* @return
@@ -101,7 +101,7 @@
}
/**
- * Encoding the trace context into track data strings and keyset sets
+ * Encoding the trace context into data strings and keyset sets
*
* @param ctx
* @return
@@ -110,7 +110,7 @@ public static TraceTransferBean
encoderFromContextBean(TraceContext ctx) {
if (ctx == null) {
return null;
}
- //build message track trace of the transfering entity content bean
+ //build message trace of the transfering entity content bean
TraceTransferBean transferBean = new TraceTransferBean();
StringBuilder sb = new StringBuilder(256);
switch (ctx.getTraceType()) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
index 3efef7c46..2b0f45309 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.trace;
-import java.util.Properties;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;
@@ -28,7 +27,7 @@
/**
* Initialize asynchronous transfer data module
*/
- void start(Properties properties) throws MQClientException;
+ void start(String nameSrvAddr) throws MQClientException;
/**
* append the transfering data
@@ -45,7 +44,7 @@
void flush() throws IOException;
/**
- * close the track trace Hook
+ * close the trace Hook
*/
void shutdown();
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java
deleted file mode 100644
index 6e4ed363a..000000000
---
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.namesrv.TopAddressing;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rocketmq.remoting.RPCHook;
-
-@Deprecated
-public class TraceProducerFactory {
-
- private static Map<String, Object> dispatcherTable = new
ConcurrentHashMap<String, Object>();
- private static AtomicBoolean isStarted = new AtomicBoolean(false);
- private static DefaultMQProducer traceProducer;
-
-
- public static DefaultMQProducer getTraceDispatcherProducer(Properties
properties, RPCHook rpcHook) {
- if (traceProducer == null) {
-
- traceProducer = new DefaultMQProducer(rpcHook);
- traceProducer.setProducerGroup(TraceConstants.GROUP_NAME);
- traceProducer.setSendMsgTimeout(5000);
-
traceProducer.setInstanceName(properties.getProperty(TraceConstants.INSTANCE_NAME,
String.valueOf(System.currentTimeMillis())));
- String nameSrv =
properties.getProperty(TraceConstants.NAMESRV_ADDR);
- if (nameSrv == null) {
- TopAddressing topAddressing = new
TopAddressing(properties.getProperty(TraceConstants.ADDRSRV_URL));
- nameSrv = topAddressing.fetchNSAddr();
- }
- traceProducer.setNamesrvAddr(nameSrv);
- traceProducer.setVipChannelEnabled(false);
- //the max size of message is 128K
- int maxSize =
Integer.parseInt(properties.getProperty(TraceConstants.MAX_MSG_SIZE, "128000"));
- traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
- }
- return traceProducer;
- }
-
- public static void registerTraceDispatcher(String dispatcherId, String
nameSrvAddr) throws MQClientException {
- dispatcherTable.put(dispatcherId, new Object());
- if (traceProducer != null && isStarted.compareAndSet(false, true)) {
- traceProducer.setNamesrvAddr(nameSrvAddr);
- traceProducer.start();
- }
- }
-
- public static void unregisterTraceDispatcher(String dispatcherId) {
- dispatcherTable.remove(dispatcherId);
- if (dispatcherTable.isEmpty() && traceProducer != null &&
isStarted.get()) {
- traceProducer.shutdown();
- }
- }
-}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
index d3d25c4d4..2e054ee1e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
@@ -20,7 +20,7 @@
import java.util.Set;
/**
- * track trace transfering bean
+ * trace transfering bean
*/
public class TraceTransferBean {
private String transData;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
index bfe5d7aa3..20396c6dd 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
@@ -36,12 +36,12 @@ public SendMessageTraceHookImpl(TraceDispatcher
localDispatcher) {
@Override
public String hookName() {
- return "SendMessageTrackHook";
+ return "SendMessageTraceHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
- //if it is message track trace data,then it doesn't recorded
+ //if it is message trace data,then it doesn't recorded
if (context == null ||
context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher)
localDispatcher).getTraceTopicName())) {
return;
}
@@ -51,8 +51,7 @@ public void sendMessageBefore(SendMessageContext context) {
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup());
-
- //build the data bean object of message track trace
+ //build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags());
@@ -65,7 +64,7 @@ public void sendMessageBefore(SendMessageContext context) {
@Override
public void sendMessageAfter(SendMessageContext context) {
- //if it is message track trace data,then it doesn't recorded
+ //if it is message trace data,then it doesn't recorded
if (context == null ||
context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher)
localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index b34784055..b45ad0281 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -87,7 +87,7 @@
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
- private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC +
System.currentTimeMillis();
+ private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC +
System.currentTimeMillis();
private String topic = "FooBar";
private String brokerName = "BrokerA";
@@ -107,7 +107,7 @@
@Mock
private MQClientAPIImpl mQClientTraceAPIImpl;
private DefaultMQProducer traceProducer;
- private String customerTraceTopic = "rmq_track_trace_topic_12345";
+ private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
public void init() throws Exception {
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 905efb983..6dcceeb5c 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -83,8 +83,8 @@
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix +
System.currentTimeMillis();
- private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC +
System.currentTimeMillis();
- private String customerTraceTopic = "rmq_track_trace_topic_12345";
+ private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC +
System.currentTimeMillis();
+ private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
public void init() throws Exception {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9823ca047..eb1a684df 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -54,7 +54,7 @@
@ImportantField
private boolean autoTraceBrokerEnable = false;
@ImportantField
- private String msgTrackTopicName = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
+ private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
/**
* thread numbers for send message thread pool, since spin lock will be
used by default since 4.0.x, the default
* value is 1.
@@ -759,11 +759,12 @@ public void setAutoTraceBrokerEnable(boolean
autoTraceBrokerEnable) {
this.autoTraceBrokerEnable = autoTraceBrokerEnable;
}
- public String getMsgTrackTopicName() {
- return msgTrackTopicName;
+ public String getMsgTraceTopicName() {
+ return msgTraceTopicName;
}
- public void setMsgTrackTopicName(String msgTrackTopicName) {
- this.msgTrackTopicName = msgTrackTopicName;
+ public void setMsgTraceTopicName(String msgTraceTopicName) {
+ this.msgTraceTopicName = msgTraceTopicName;
}
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 0573c762a..5fdb0120f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -91,7 +91,7 @@
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
public static final String RMQ_SYS_TRANS_HALF_TOPIC =
"RMQ_SYS_TRANS_HALF_TOPIC";
- public static final String RMQ_SYS_TRACK_TRACE_TOPIC =
"RMQ_SYS_TRACK_TRACE_TOPIC";
+ public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC =
"RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
diff --git
a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
index 8b10eef2a..7e197522f 100644
--- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
@@ -37,10 +37,10 @@ public void testBrokerConfigAttribute() {
brokerConfig.setBrokerName("broker-a");
brokerConfig.setBrokerId(0);
brokerConfig.setBrokerClusterName("DefaultCluster");
- brokerConfig.setMsgTrackTopicName("RMQ_SYS_TRACK_TRACE_TOPIC4");
+ brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
-
assertThat(brokerConfig.getMsgTrackTopicName()).isEqualTo("RMQ_SYS_TRACK_TRACE_TOPIC4");
+
assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services