This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 483a48a130 PrintMessageSubCommand support lmq (#8785)
483a48a130 is described below
commit 483a48a130658e748d0640bd581fb9c925a5ddea
Author: rongtong <[email protected]>
AuthorDate: Wed Oct 9 09:58:05 2024 +0800
PrintMessageSubCommand support lmq (#8785)
---
.../command/message/PrintMessageSubCommand.java | 32 +++++++++++++++++++++-
1 file changed, 31 insertions(+), 1 deletion(-)
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
index bb82f5079e..97e101d813 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageExt;
@@ -97,6 +98,12 @@ public class PrintMessageSubCommand implements SubCommand {
opt.setRequired(false);
options.addOption(opt);
+ opt =
+ new Option("l", "lmqParentTopic", true,
+ "Lmq parent topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -113,11 +120,20 @@ public class PrintMessageSubCommand implements SubCommand
{
String subExpression =
!commandLine.hasOption('s') ? "*" :
commandLine.getOptionValue('s').trim();
+ String lmqParentTopic =
+ !commandLine.hasOption('l') ? null :
commandLine.getOptionValue('l').trim();
+
boolean printBody = !commandLine.hasOption('d') ||
Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
consumer.start();
- Set<MessageQueue> mqs =
consumer.fetchSubscribeMessageQueues(topic);
+ Set<MessageQueue> mqs;
+ if (lmqParentTopic != null) {
+ mqs = consumer.fetchSubscribeMessageQueues(lmqParentTopic);
+ mqs.forEach(mq -> mq.setTopic(topic));
+ } else {
+ mqs = consumer.fetchSubscribeMessageQueues(topic);
+ }
for (MessageQueue mq : mqs) {
long minOffset = consumer.minOffset(mq);
long maxOffset = consumer.maxOffset(mq);
@@ -139,6 +155,7 @@ public class PrintMessageSubCommand implements SubCommand {
READQ:
for (long offset = minOffset; offset < maxOffset; ) {
try {
+ fillBrokerAddrIfNotExist(consumer, mq, lmqParentTopic);
PullResult pullResult = consumer.pull(mq,
subExpression, offset, 32);
offset = pullResult.getNextBeginOffset();
switch (pullResult.getPullStatus()) {
@@ -167,4 +184,17 @@ public class PrintMessageSubCommand implements SubCommand {
consumer.shutdown();
}
}
+
+ public void fillBrokerAddrIfNotExist(DefaultMQPullConsumer
defaultMQPullConsumer, MessageQueue messageQueue,
+ String routeTopic) {
+
+ FindBrokerResult findBrokerResult =
defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+ .findBrokerAddressInSubscribe(messageQueue.getBrokerName(), 0,
false);
+ if (findBrokerResult == null) {
+ // use lmq parent topic to fill up broker addr table
+
defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+ .updateTopicRouteInfoFromNameServer(routeTopic);
+ }
+
+ }
}