This is an automated email from the ASF dual-hosted git repository. vongosling pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f39c8f5e47baa82f94427d60de4b70265d99c0c1 Author: duhenglucky <[email protected]> AuthorDate: Fri May 17 01:38:28 2019 +0800 (1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found --- .../TraceDispatcher.java => AccessChannel.java} | 37 +++------------------- .../org/apache/rocketmq/client/ClientConfig.java | 10 ++++++ .../client/consumer/DefaultMQPushConsumer.java | 2 +- .../client/producer/DefaultMQProducer.java | 13 +++++--- .../client/trace/AsyncTraceDispatcher.java | 27 +++++++++++----- .../rocketmq/client/trace/TraceDispatcher.java | 3 +- .../trace/hook/ConsumeMessageTraceHookImpl.java | 7 ++-- .../trace/hook/SendMessageTraceHookImpl.java | 9 +++--- 8 files changed, 53 insertions(+), 55 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java similarity index 54% copy from client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java copy to client/src/main/java/org/apache/rocketmq/client/AccessChannel.java index 275e6a3..82978b0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java @@ -14,37 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace; - -import org.apache.rocketmq.client.exception.MQClientException; -import java.io.IOException; - -/** - * Interface of asynchronous transfer data - */ -public interface TraceDispatcher { - - /** - * Initialize asynchronous transfer data module - */ - void start(String nameSrvAddr) throws MQClientException; - - /** - * Append the transfering data - * @param ctx data infomation - * @return - */ - boolean append(Object ctx); - - /** - * Write flush action - * - * @throws IOException - */ - void flush() throws IOException; - - /** - * Close the trace Hook - */ - void shutdown(); +package org.apache.rocketmq.client; +public enum AccessChannel { + local, + cloud, } diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 6493f2d..53ac353 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -37,6 +37,8 @@ public class ClientConfig { private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); protected String namespace; + protected AccessChannel accessChannel = AccessChannel.local; + /** * Pulling topic information interval from the named server */ @@ -263,6 +265,14 @@ public class ClientConfig { this.namespace = namespace; } + public AccessChannel getAccessChannel() { + return this.accessChannel; + } + + public void setAccessChannel(AccessChannel accessChannel) { + this.accessChannel = accessChannel; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 44edfb6..339f799 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -693,7 +693,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { - traceDispatcher.start(this.getNamesrvAddr()); + traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index d5fbde0..b4acf8f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -238,15 +238,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } /** - * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic name. + * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic + * name. * * @param namespace Namespace for this MQ Producer instance. * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. */ - public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { + public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, + boolean enableMsgTrace, final String customizedTraceTopic) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); @@ -282,7 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { - traceDispatcher.start(this.getNamesrvAddr()); + traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } @@ -331,7 +334,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - Validators.checkMessage(msg,this); + Validators.checkMessage(msg, this); msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 0aaadb1..3b5fc1d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -57,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private final int batchSize; private final int maxMsgSize; private final DefaultMQProducer traceProducer; - private final ThreadPoolExecutor traceExecuter; + private final ThreadPoolExecutor traceExecutor; // The last discard number of log private AtomicLong discardCount; private Thread worker; @@ -71,8 +72,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); + private AccessChannel accessChannel = AccessChannel.local; - public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { + public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; @@ -85,7 +87,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } else { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } - this.traceExecuter = new ThreadPoolExecutor(// + this.traceExecutor = new ThreadPoolExecutor(// 10, // 20, // 1000 * 60, // @@ -95,6 +97,14 @@ public class AsyncTraceDispatcher implements TraceDispatcher { traceProducer = getAndCreateTraceProducer(rpcHook); } + public AccessChannel getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(AccessChannel accessChannel) { + this.accessChannel = accessChannel; + } + public String getTraceTopicName() { return traceTopicName; } @@ -123,12 +133,13 @@ public class AsyncTraceDispatcher implements TraceDispatcher { this.hostConsumer = hostConsumer; } - public void start(String nameSrvAddr) throws MQClientException { + public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } + this.accessChannel = accessChannel; this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); @@ -174,7 +185,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { @Override public void shutdown() { this.stopped = true; - this.traceExecuter.shutdown(); + this.traceExecutor.shutdown(); if (isStarted.get()) { traceProducer.shutdown(); } @@ -231,7 +242,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } if (contexts.size() > 0) { AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); - traceExecuter.submit(request); + traceExecutor.submit(request); } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } @@ -330,11 +341,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher { */ private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) { String traceTopic = traceTopicName; - if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) { + if (AccessChannel.cloud == accessChannel){ traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; } final Message message = new Message(traceTopic, data.getBytes()); - // Keyset of message trace includes msgId of or original message message.setKeys(keySet); try { @@ -342,6 +352,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { + } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java index 275e6a3..51cc0de 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.trace; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import java.io.IOException; @@ -27,7 +28,7 @@ public interface TraceDispatcher { /** * Initialize asynchronous transfer data module */ - void start(String nameSrvAddr) throws MQClientException; + void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException; /** * Append the transfering data diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java index 38ec8b9..f30b121 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt; import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.protocol.NamespaceUtil; public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { @@ -51,7 +52,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { TraceContext traceContext = new TraceContext(); context.setMqTraceContext(traceContext); traceContext.setTraceType(TraceType.SubBefore);// - traceContext.setGroupName(context.getConsumerGroup());// + traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));// List<TraceBean> beans = new ArrayList<TraceBean>(); for (MessageExt msg : context.getMsgList()) { if (msg == null) { @@ -65,7 +66,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { continue; } TraceBean traceBean = new TraceBean(); - traceBean.setTopic(msg.getTopic());// + traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));// traceBean.setMsgId(msg.getMsgId());// traceBean.setTags(msg.getTags());// traceBean.setKeys(msg.getKeys());// @@ -96,7 +97,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { TraceContext subAfterContext = new TraceContext(); subAfterContext.setTraceType(TraceType.SubAfter);// subAfterContext.setRegionId(subBeforeContext.getRegionId());// - subAfterContext.setGroupName(subBeforeContext.getGroupName());// + subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));// subAfterContext.setRequestId(subBeforeContext.getRequestId());// subAfterContext.setSuccess(context.isSuccess());// diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index 20396c6..80c7bab 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -16,15 +16,16 @@ */ package org.apache.rocketmq.client.trace.hook; +import java.util.ArrayList; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceBean; import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher; -import org.apache.rocketmq.client.trace.TraceBean; import org.apache.rocketmq.client.trace.TraceType; -import java.util.ArrayList; +import org.apache.rocketmq.common.protocol.NamespaceUtil; public class SendMessageTraceHookImpl implements SendMessageHook { @@ -50,10 +51,10 @@ public class SendMessageTraceHookImpl implements SendMessageHook { tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1)); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); - tuxeContext.setGroupName(context.getProducerGroup()); + tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup())); //build the data bean object of message trace TraceBean traceBean = new TraceBean(); - traceBean.setTopic(context.getMessage().getTopic()); + traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic())); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr());
