This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 483a48a130 PrintMessageSubCommand support lmq (#8785)
483a48a130 is described below

commit 483a48a130658e748d0640bd581fb9c925a5ddea
Author: rongtong <[email protected]>
AuthorDate: Wed Oct 9 09:58:05 2024 +0800

    PrintMessageSubCommand support lmq (#8785)
---
 .../command/message/PrintMessageSubCommand.java    | 32 +++++++++++++++++++++-
 1 file changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
index bb82f5079e..97e101d813 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -97,6 +98,12 @@ public class PrintMessageSubCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt =
+            new Option("l", "lmqParentTopic", true,
+                "Lmq parent topic, lmq is used to find the route.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -113,11 +120,20 @@ public class PrintMessageSubCommand implements SubCommand 
{
             String subExpression =
                 !commandLine.hasOption('s') ? "*" : 
commandLine.getOptionValue('s').trim();
 
+            String lmqParentTopic =
+                !commandLine.hasOption('l') ? null : 
commandLine.getOptionValue('l').trim();
+
             boolean printBody = !commandLine.hasOption('d') || 
Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
 
             consumer.start();
 
-            Set<MessageQueue> mqs = 
consumer.fetchSubscribeMessageQueues(topic);
+            Set<MessageQueue> mqs;
+            if (lmqParentTopic != null) {
+                mqs = consumer.fetchSubscribeMessageQueues(lmqParentTopic);
+                mqs.forEach(mq -> mq.setTopic(topic));
+            } else {
+                mqs = consumer.fetchSubscribeMessageQueues(topic);
+            }
             for (MessageQueue mq : mqs) {
                 long minOffset = consumer.minOffset(mq);
                 long maxOffset = consumer.maxOffset(mq);
@@ -139,6 +155,7 @@ public class PrintMessageSubCommand implements SubCommand {
                 READQ:
                 for (long offset = minOffset; offset < maxOffset; ) {
                     try {
+                        fillBrokerAddrIfNotExist(consumer, mq, lmqParentTopic);
                         PullResult pullResult = consumer.pull(mq, 
subExpression, offset, 32);
                         offset = pullResult.getNextBeginOffset();
                         switch (pullResult.getPullStatus()) {
@@ -167,4 +184,17 @@ public class PrintMessageSubCommand implements SubCommand {
             consumer.shutdown();
         }
     }
+
+    public void fillBrokerAddrIfNotExist(DefaultMQPullConsumer 
defaultMQPullConsumer, MessageQueue messageQueue,
+        String routeTopic) {
+
+        FindBrokerResult findBrokerResult = 
defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+            .findBrokerAddressInSubscribe(messageQueue.getBrokerName(), 0, 
false);
+        if (findBrokerResult == null) {
+            // use lmq parent topic to fill up broker addr table
+            
defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+                .updateTopicRouteInfoFromNameServer(routeTopic);
+        }
+
+    }
 }

Reply via email to