Include client IP per message queue of consumer progress command output

Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8c793c09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8c793c09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8c793c09

Branch: refs/heads/master
Commit: 8c793c09077455bf44e961517514f3551d3d2436
Parents: 3401296
Author: Zhanhui Li <[email protected]>
Authored: Wed Mar 29 21:50:59 2017 +0800
Committer: dongeforever <[email protected]>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../consumer/ConsumerProgressSubCommand.java    | 41 ++++++++++++++++----
 1 file changed, 34 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c793c09/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 35fd260..f341362 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -16,10 +16,6 @@
  */
 package org.apache.rocketmq.tools.command.consumer;
 
-import java.util.Collections;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -30,7 +26,9 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.Connection;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -40,6 +38,13 @@ import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 public class ConsumerProgressSubCommand implements SubCommand {
     private final Logger log = ClientLogger.getLog();
 
@@ -62,6 +67,24 @@ public class ConsumerProgressSubCommand implements 
SubCommand {
         return options;
     }
 
+    private Map<MessageQueue, String> 
getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
+        String groupName) {
+        Map<MessageQueue, String> results = new HashMap<>();
+        try {
+            ConsumerConnection consumerConnection = 
defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
+            for (Connection connection : 
consumerConnection.getConnectionSet()) {
+                String clientId = connection.getClientId();
+                ConsumerRunningInfo consumerRunningInfo = 
defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId,
+                    false);
+                for (MessageQueue messageQueue : 
consumerRunningInfo.getMqTable().keySet()) {
+                    results.put(messageQueue, clientId.split("@")[0]);
+                }
+            }
+        } catch (Exception ignore) {
+        }
+        return results;
+    }
+
     @Override
     public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -75,13 +98,14 @@ public class ConsumerProgressSubCommand implements 
SubCommand {
                 List<MessageQueue> mqList = new LinkedList<MessageQueue>();
                 mqList.addAll(consumeStats.getOffsetTable().keySet());
                 Collections.sort(mqList);
-
-                System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s  
%s%n",
+                Map<MessageQueue, String> messageQueueAllocationResult = 
getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
+                System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s 
%-20s  %s%n",
                     "#Topic",
                     "#Broker Name",
                     "#QID",
                     "#Broker Offset",
                     "#Consumer Offset",
+                    "#Client IP",
                     "#Diff",
                     "#LastTime");
 
@@ -95,12 +119,15 @@ public class ConsumerProgressSubCommand implements 
SubCommand {
                         lastTime = UtilAll.formatDate(new 
Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
                     } catch (Exception e) {
                     }
-                    System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20d 
 %s%n",
+
+                    String clientIP = messageQueueAllocationResult.get(mq);
+                    System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20s 
%-20d  %s%n",
                         UtilAll.frontStringAtLeast(mq.getTopic(), 32),
                         UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
                         mq.getQueueId(),
                         offsetWrapper.getBrokerOffset(),
                         offsetWrapper.getConsumerOffset(),
+                        null != clientIP ? clientIP : "NA",
                         diff,
                         lastTime
                     );

Reply via email to