This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a7d493b2fb transactionProducer get the topic route before sending the
message (#7569)
a7d493b2fb is described below
commit a7d493b2fbc153cc6cbdf2b2ffcbf19cf7cba803
Author: panzhi <[email protected]>
AuthorDate: Tue Nov 21 20:55:35 2023 +0800
transactionProducer get the topic route before sending the message (#7569)
---
.../impl/producer/DefaultMQProducerImpl.java | 15 ++++++
.../client/producer/DefaultMQProducer.java | 63 ++++++++++++++++++++++
.../client/producer/TransactionMQProducer.java | 23 ++++++--
.../example/transaction/TransactionProducer.java | 3 +-
4 files changed, 98 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 545f17d931..088bff0891 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -262,6 +262,8 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
mQClientFactory.start();
}
+ this.initTopicRoute();
+
this.mqFaultStrategy.startDetector();
log.info("the producer [{}] start OK.
sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
@@ -1740,6 +1742,19 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
}
}
+ private void initTopicRoute() {
+ List<String> topics = this.defaultMQProducer.getTopics();
+ if (topics != null && topics.size() > 0) {
+ topics.forEach(topic -> {
+ String newTopic =
NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic);
+ TopicPublishInfo topicPublishInfo =
tryToFindTopicPublishInfo(newTopic);
+ if (topicPublishInfo == null || !topicPublishInfo.ok()) {
+ log.warn("No route info of this topic: " + newTopic +
FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO));
+ }
+ });
+ }
+ }
+
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 7bd3876f5a..700e00aac1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
*/
private String producerGroup;
+ /**
+ * Topics that need to be initialized for transaction producer
+ */
+ private List<String> topics;
+
/**
* Just for testing or demo program
*/
@@ -235,6 +240,22 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
+ /**
+ * Constructor specifying namespace, producer group, topics and RPC hook.
+ *
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param topics Topic that needs to be initialized for routing
+ * @param rpcHook RPC hook to execute per each remoting command
execution.
+ */
+ public DefaultMQProducer(final String namespace, final String
producerGroup, final List<String> topics, RPCHook rpcHook) {
+ this.namespace = namespace;
+ this.producerGroup = producerGroup;
+ this.topics = topics;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ }
+
/**
* Constructor specifying producer group and enabled msg trace flag.
*
@@ -290,6 +311,41 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
}
}
+ /**
+ * Constructor specifying namespace, producer group, topics, RPC hook,
enabled msgTrace flag and customized trace topic
+ * name.
+ *
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param topics Topic that needs to be initialized for
routing
+ * @param rpcHook RPC hook to execute per each remoting
command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
+ * @param customizedTraceTopic The name value of message trace topic.If
you don't config,you can use the default
+ * trace topic name.
+ */
+ public DefaultMQProducer(final String namespace, final String
producerGroup, final List<String> topics,
+ RPCHook rpcHook, boolean enableMsgTrace, final
String customizedTraceTopic) {
+ this.namespace = namespace;
+ this.producerGroup = producerGroup;
+ this.topics = topics;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ //if client open the message trace feature
+ if (enableMsgTrace) {
+ try {
+ AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE,
customizedTraceTopic, rpcHook);
+ dispatcher.setHostProducer(this.defaultMQProducerImpl);
+ traceDispatcher = dispatcher;
+ this.defaultMQProducerImpl.registerSendMessageHook(
+ new SendMessageTraceHookImpl(traceDispatcher));
+ this.defaultMQProducerImpl.registerEndTransactionHook(
+ new EndTransactionTraceHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ logger.error("system mqtrace hook init failed ,maybe can't
send msg trace data");
+ }
+ }
+ }
+
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
@@ -1316,4 +1372,11 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
}
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ public void setTopics(List<String> topics) {
+ this.topics = topics;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index d529f3e778..2c3b479f77 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.producer;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
@@ -36,19 +37,31 @@ public class TransactionMQProducer extends
DefaultMQProducer {
}
public TransactionMQProducer(final String producerGroup) {
- this(null, producerGroup, null);
+ this(null, producerGroup, null, null);
+ }
+
+ public TransactionMQProducer(final String producerGroup, final
List<String> topics) {
+ this(null, producerGroup, topics, null);
}
public TransactionMQProducer(final String namespace, final String
producerGroup) {
- this(namespace, producerGroup, null);
+ this(namespace, producerGroup, null, null);
+ }
+
+ public TransactionMQProducer(final String namespace, final String
producerGroup, final List<String> topics) {
+ this(namespace, producerGroup, topics, null);
}
public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
- this(null, producerGroup, rpcHook);
+ this(null, producerGroup, null, rpcHook);
+ }
+
+ public TransactionMQProducer(final String producerGroup, final
List<String> topics, RPCHook rpcHook) {
+ this(null, producerGroup, topics, rpcHook);
}
- public TransactionMQProducer(final String namespace, final String
producerGroup, RPCHook rpcHook) {
- super(namespace, producerGroup, rpcHook);
+ public TransactionMQProducer(final String namespace, final String
producerGroup, final List<String> topics, RPCHook rpcHook) {
+ super(namespace, producerGroup, topics, rpcHook);
}
public TransactionMQProducer(final String namespace, final String
producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String
customizedTraceTopic) {
diff --git
a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
index 5973c3c306..d1d57c55ef 100644
---
a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -39,7 +40,7 @@ public class TransactionProducer {
public static void main(String[] args) throws MQClientException,
InterruptedException {
TransactionListener transactionListener = new
TransactionListenerImpl();
- TransactionMQProducer producer = new
TransactionMQProducer(PRODUCER_GROUP);
+ TransactionMQProducer producer = new
TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));
// Uncomment the following line while debugging, namesrvAddr should be
set to your local address
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);