Include client IP per message queue of consumer progress command output
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8c793c09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8c793c09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8c793c09 Branch: refs/heads/master Commit: 8c793c09077455bf44e961517514f3551d3d2436 Parents: 3401296 Author: Zhanhui Li <[email protected]> Authored: Wed Mar 29 21:50:59 2017 +0800 Committer: dongeforever <[email protected]> Committed: Tue Jun 6 11:37:29 2017 +0800 ---------------------------------------------------------------------- .../consumer/ConsumerProgressSubCommand.java | 41 ++++++++++++++++---- 1 file changed, 34 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c793c09/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java index 35fd260..f341362 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -16,10 +16,6 @@ */ package org.apache.rocketmq.tools.command.consumer; -import java.util.Collections; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -30,7 +26,9 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -40,6 +38,13 @@ import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; import org.slf4j.Logger; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + public class ConsumerProgressSubCommand implements SubCommand { private final Logger log = ClientLogger.getLog(); @@ -62,6 +67,24 @@ public class ConsumerProgressSubCommand implements SubCommand { return options; } + private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt, + String groupName) { + Map<MessageQueue, String> results = new HashMap<>(); + try { + ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName); + for (Connection connection : consumerConnection.getConnectionSet()) { + String clientId = connection.getClientId(); + ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, + false); + for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) { + results.put(messageQueue, clientId.split("@")[0]); + } + } + } catch (Exception ignore) { + } + return results; + } + @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -75,13 +98,14 @@ public class ConsumerProgressSubCommand implements SubCommand { List<MessageQueue> mqList = new LinkedList<MessageQueue>(); mqList.addAll(consumeStats.getOffsetTable().keySet()); Collections.sort(mqList); - - System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n", + Map<MessageQueue, String> messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup); + System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %-20s %s%n", "#Topic", "#Broker Name", "#QID", "#Broker Offset", "#Consumer Offset", + "#Client IP", "#Diff", "#LastTime"); @@ -95,12 +119,15 @@ public class ConsumerProgressSubCommand implements SubCommand { lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS); } catch (Exception e) { } - System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n", + + String clientIP = messageQueueAllocationResult.get(mq); + System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20s %-20d %s%n", UtilAll.frontStringAtLeast(mq.getTopic(), 32), UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), mq.getQueueId(), offsetWrapper.getBrokerOffset(), offsetWrapper.getConsumerOffset(), + null != clientIP ? clientIP : "NA", diff, lastTime );
