http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java index 66f7159..86e7848 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java @@ -6,24 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.namesrv; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.SubCommand; +import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; - -import java.util.List; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; public class WipeWritePermSubCommand implements SubCommand { @@ -32,13 +31,11 @@ public class WipeWritePermSubCommand implements SubCommand { return "wipeWritePerm"; } - @Override public String commandDesc() { return "Wipe write perm of broker in all name server"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerName", true, "broker name"); @@ -47,7 +44,6 @@ public class WipeWritePermSubCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -63,14 +59,14 @@ public class WipeWritePermSubCommand implements SubCommand { try { int wipeTopicCount = defaultMQAdminExt.wipeWritePermOfBroker(namesrvAddr, brokerName); System.out.printf("wipe write perm of broker[%s] in name server[%s] OK, %d%n", - brokerName, - namesrvAddr, - wipeTopicCount + brokerName, + namesrvAddr, + wipeTopicCount ); } catch (Exception e) { System.out.printf("wipe write perm of broker[%s] in name server[%s] Failed%n", - brokerName, - namesrvAddr + brokerName, + namesrvAddr ); e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java index 3cb7e3f..fe239aa 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java @@ -6,17 +6,21 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.offset; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.route.BrokerData; @@ -24,11 +28,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Set; public class CloneGroupOffsetCommand implements SubCommand { @Override @@ -94,7 +93,7 @@ public class CloneGroupOffsetCommand implements SubCommand { } } System.out.printf("clone group offset success. srcGroup[%s], destGroup=[%s], topic[%s]", - srcGroup, destGroup, topic); + srcGroup, destGroup, topic); } catch (Exception e) { e.printStackTrace(); } finally { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java index 1623f52..68b62e1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java @@ -17,16 +17,15 @@ package org.apache.rocketmq.tools.command.offset; +import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Map; public class GetConsumerStatusCommand implements SubCommand { @Override @@ -70,15 +69,15 @@ public class GetConsumerStatusCommand implements SubCommand { defaultMQAdminExt.start(); Map<String, Map<MessageQueue, Long>> consumerStatusTable = - defaultMQAdminExt.getConsumeStatus(topic, group, originClientId); + defaultMQAdminExt.getConsumeStatus(topic, group, originClientId); System.out.printf("get consumer status from client. group=%s, topic=%s, originClientId=%s%n", - group, topic, originClientId); + group, topic, originClientId); System.out.printf("%-50s %-15s %-15s %-20s%n", - "#clientId", - "#brokerName", - "#queueId", - "#offset"); + "#clientId", + "#brokerName", + "#queueId", + "#offset"); for (Map.Entry<String, Map<MessageQueue, Long>> entry : consumerStatusTable.entrySet()) { String clientId = entry.getKey(); @@ -86,10 +85,10 @@ public class GetConsumerStatusCommand implements SubCommand { for (Map.Entry<MessageQueue, Long> entry1 : mqTable.entrySet()) { MessageQueue mq = entry1.getKey(); System.out.printf("%-50s %-15s %-15d %-20d%n", - UtilAll.frontStringAtLeast(clientId, 50), - mq.getBrokerName(), - mq.getQueueId(), - mqTable.get(mq)); + UtilAll.frontStringAtLeast(clientId, 50), + mq.getBrokerName(), + mq.getQueueId(), + mqTable.get(mq)); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java index 5eb30b5..e07a7c8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java @@ -17,6 +17,12 @@ package org.apache.rocketmq.tools.command.offset; +import java.util.Iterator; +import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageQueue; @@ -25,22 +31,14 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; - -import java.util.Iterator; -import java.util.Map; - public class ResetOffsetByTimeCommand implements SubCommand { public static void main(String[] args) { ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand(); Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] subargs = new String[]{"-t Jodie_rest_test", "-g CID_Jodie_rest_test", "-s -1", "-f true"}; + String[] subargs = new String[] {"-t Jodie_rest_test", "-g CID_Jodie_rest_test", "-s -1", "-f true"}; final CommandLine commandLine = - ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); cmd.execute(commandLine, options, null); } @@ -120,20 +118,20 @@ public class ResetOffsetByTimeCommand implements SubCommand { } System.out.printf("rollback consumer offset by specified group[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n", - group, topic, force, timeStampStr, timestamp); + group, topic, force, timeStampStr, timestamp); System.out.printf("%-40s %-40s %-40s%n", - "#brokerName", - "#queueId", - "#offset"); + "#brokerName", + "#queueId", + "#offset"); Iterator<Map.Entry<MessageQueue, Long>> iterator = offsetTable.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<MessageQueue, Long> entry = iterator.next(); System.out.printf("%-40s %-40d %-40d%n", - UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(), 32), - entry.getKey().getQueueId(), - entry.getValue()); + UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(), 32), + entry.getKey().getQueueId(), + entry.getValue()); } } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java index 0f15f69..9b30474 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java @@ -6,17 +6,22 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.offset; +import java.util.Date; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; @@ -25,19 +30,40 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Date; -import java.util.List; - /** * * */ public class ResetOffsetByTimeOldCommand implements SubCommand { + public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic, long timestamp, boolean force, + String timeStampStr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + List<RollbackStats> rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); + System.out.printf( + "rollback consumer offset by specified consumerGroup[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n", + consumerGroup, topic, force, timeStampStr, timestamp); + + System.out.printf("%-20s %-20s %-20s %-20s %-20s %-20s%n", + "#brokerName", + "#queueId", + "#brokerOffset", + "#consumerOffset", + "#timestampOffset", + "#rollbackOffset" + ); + + for (RollbackStats rollbackStats : rollbackStatsList) { + System.out.printf("%-20s %-20d %-20d %-20d %-20d %-20d%n", + UtilAll.frontStringAtLeast(rollbackStats.getBrokerName(), 32), + rollbackStats.getQueueId(), + rollbackStats.getBrokerOffset(), + rollbackStats.getConsumerOffset(), + rollbackStats.getTimestampOffset(), + rollbackStats.getRollbackOffset() + ); + } + } + @Override public String commandName() { return "resetOffsetByTimeOld"; @@ -104,32 +130,4 @@ public class ResetOffsetByTimeOldCommand implements SubCommand { defaultMQAdminExt.shutdown(); } } - - public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic, long timestamp, boolean force, - String timeStampStr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - List<RollbackStats> rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); - System.out.printf( - "rollback consumer offset by specified consumerGroup[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n", - consumerGroup, topic, force, timeStampStr, timestamp); - - System.out.printf("%-20s %-20s %-20s %-20s %-20s %-20s%n", - "#brokerName", - "#queueId", - "#brokerOffset", - "#consumerOffset", - "#timestampOffset", - "#rollbackOffset" - ); - - for (RollbackStats rollbackStats : rollbackStatsList) { - System.out.printf("%-20s %-20d %-20d %-20d %-20d %-20d%n", - UtilAll.frontStringAtLeast(rollbackStats.getBrokerName(), 32), - rollbackStats.getQueueId(), - rollbackStats.getBrokerOffset(), - rollbackStats.getConsumerOffset(), - rollbackStats.getTimestampOffset(), - rollbackStats.getRollbackOffset() - ); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java index 90a361c..81a7f78 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.tools.command.stats; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; @@ -32,82 +35,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - public class StatsAllSubCommand implements SubCommand { - @Override - public String commandName() { - return "statsAll"; - } - - @Override - public String commandDesc() { - return "Topic and Consumer tps stats"; - } - - @Override - public Options buildCommandlineOptions(Options options) { - Option opt = new Option("a", "activeTopic", false, "print active topic only"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("t", "topic", true, "print select topic only"); - opt.setRequired(false); - options.addOption(opt); - - return options; - } - - @Override - public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { - DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); - - defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - - try { - defaultMQAdminExt.start(); - - TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); - - System.out.printf("%-32s %-32s %12s %11s %11s %14s %14s%n", - "#Topic", - "#Consumer Group", - "#Accumulation", - "#InTPS", - "#OutTPS", - "#InMsg24Hour", - "#OutMsg24Hour" - ); - - boolean activeTopic = commandLine.hasOption('a'); - String selectTopic = commandLine.getOptionValue('t'); - - for (String topic : topicList.getTopicList()) { - if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) { - continue; - } - - if (selectTopic != null && selectTopic != "" && !topic.equals(selectTopic)) { - continue; - } - - try { - printTopicDetail(defaultMQAdminExt, topic, activeTopic); - } catch (Exception e) { - } - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - defaultMQAdminExt.shutdown(); - } - } - public static void printTopicDetail(final DefaultMQAdminExt admin, final String topic, final boolean activeTopic) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic); GroupList groupList = admin.queryTopicConsumeByWho(topic); @@ -116,7 +47,6 @@ public class StatsAllSubCommand implements SubCommand { long inMsgCntToday = 0; - for (BrokerData bd : topicRouteData.getBrokerDatas()) { String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { @@ -161,16 +91,16 @@ public class StatsAllSubCommand implements SubCommand { } if (!activeTopic || (inMsgCntToday > 0) || - (outMsgCntToday > 0)) { + (outMsgCntToday > 0)) { System.out.printf("%-32s %-32s %12d %11.2f %11.2f %14d %14d%n", - UtilAll.frontStringAtLeast(topic, 32), - UtilAll.frontStringAtLeast(group, 32), - accumulate, - inTPS, - outTPS, - inMsgCntToday, - outMsgCntToday + UtilAll.frontStringAtLeast(topic, 32), + UtilAll.frontStringAtLeast(group, 32), + accumulate, + inTPS, + outTPS, + inMsgCntToday, + outMsgCntToday ); } } @@ -178,13 +108,13 @@ public class StatsAllSubCommand implements SubCommand { if (!activeTopic || (inMsgCntToday > 0)) { System.out.printf("%-32s %-32s %12d %11.2f %11s %14d %14s%n", - UtilAll.frontStringAtLeast(topic, 32), - "", - 0, - inTPS, - "", - inMsgCntToday, - "NO_CONSUMER" + UtilAll.frontStringAtLeast(topic, 32), + "", + 0, + inTPS, + "", + inMsgCntToday, + "NO_CONSUMER" ); } } @@ -205,4 +135,72 @@ public class StatsAllSubCommand implements SubCommand { return 0; } + + @Override + public String commandName() { + return "statsAll"; + } + + @Override + public String commandDesc() { + return "Topic and Consumer tps stats"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("a", "activeTopic", false, "print active topic only"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "print select topic only"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + + System.out.printf("%-32s %-32s %12s %11s %11s %14s %14s%n", + "#Topic", + "#Consumer Group", + "#Accumulation", + "#InTPS", + "#OutTPS", + "#InMsg24Hour", + "#OutMsg24Hour" + ); + + boolean activeTopic = commandLine.hasOption('a'); + String selectTopic = commandLine.getOptionValue('t'); + + for (String topic : topicList.getTopicList()) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) { + continue; + } + + if (selectTopic != null && selectTopic != "" && !topic.equals(selectTopic)) { + continue; + } + + try { + printTopicDetail(defaultMQAdminExt, topic, activeTopic); + } catch (Exception e) { + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java index 09d8011..709aada 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.tools.command.topic; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; @@ -24,15 +31,6 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - public class AllocateMQSubCommand implements SubCommand { @Override @@ -40,13 +38,11 @@ public class AllocateMQSubCommand implements SubCommand { return "allocateMQ"; } - @Override public String commandDesc() { return "Allocate MQ"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); @@ -60,7 +56,6 @@ public class AllocateMQSubCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); @@ -81,7 +76,6 @@ public class AllocateMQSubCommand implements SubCommand { final AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely(); - RebalanceResult rr = new RebalanceResult(); for (String i : ipList) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java index 0749e36..69cbc99 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java @@ -6,16 +6,22 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.topic; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.remoting.RPCHook; @@ -24,32 +30,41 @@ import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - /** * * */ public class DeleteTopicSubCommand implements SubCommand { + public static void deleteTopic(final DefaultMQAdminExt adminExt, + final String clusterName, + final String topic + ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + + Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); + adminExt.deleteTopicInBroker(masterSet, topic); + System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName); + + Set<String> nameServerSet = null; + if (adminExt.getNamesrvAddr() != null) { + String[] ns = adminExt.getNamesrvAddr().trim().split(";"); + nameServerSet = new HashSet(Arrays.asList(ns)); + } + + adminExt.deleteTopicInNameServer(nameServerSet, topic); + System.out.printf("delete topic [%s] from NameServer success.%n", topic); + } + @Override public String commandName() { return "deleteTopic"; } - @Override public String commandDesc() { return "Delete topic from broker and NameServer."; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); @@ -63,29 +78,6 @@ public class DeleteTopicSubCommand implements SubCommand { return options; } - - public static void deleteTopic(final DefaultMQAdminExt adminExt, - final String clusterName, - final String topic - ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { - - Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); - adminExt.deleteTopicInBroker(masterSet, topic); - System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName); - - - Set<String> nameServerSet = null; - if (adminExt.getNamesrvAddr() != null) { - String[] ns = adminExt.getNamesrvAddr().trim().split(";"); - nameServerSet = new HashSet(Arrays.asList(ns)); - } - - - adminExt.deleteTopicInNameServer(nameServerSet, topic); - System.out.printf("delete topic [%s] from NameServer success.%n", topic); - } - - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java index 9f6c0b0..6e6e4ff 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java @@ -6,22 +6,21 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.topic; -import org.apache.rocketmq.common.message.MessageQueue; - import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.rocketmq.common.message.MessageQueue; public class RebalanceResult { private Map<String/*ip*/, List<MessageQueue>> result = new HashMap<String, List<MessageQueue>>(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java index 2bdedd6..6a267a5 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java @@ -16,15 +16,13 @@ */ package org.apache.rocketmq.tools.command.topic; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.SubCommand; +import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; - -import java.util.Set; - +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; /** * @@ -37,13 +35,11 @@ public class TopicClusterSubCommand implements SubCommand { return "topicClusterList"; } - @Override public String commandDesc() { return "get cluster info for topic"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java index 42184fb..5e23a96 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.tools.command.topic; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; @@ -28,14 +34,6 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; - /** * @@ -72,15 +70,15 @@ public class TopicListSubCommand implements SubCommand { ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); System.out.printf("%-20s %-48s %-48s%n", - "#Cluster Name", - "#Topic", - "#Consumer Group" + "#Cluster Name", + "#Topic", + "#Consumer Group" ); TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); for (String topic : topicList.getTopicList()) { if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) - || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) { + || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) { continue; } @@ -89,7 +87,7 @@ public class TopicListSubCommand implements SubCommand { try { clusterName = - this.findTopicBelongToWhichCluster(topic, clusterInfo, defaultMQAdminExt); + this.findTopicBelongToWhichCluster(topic, clusterInfo, defaultMQAdminExt); groupList = defaultMQAdminExt.queryTopicConsumeByWho(topic); } catch (Exception e) { } @@ -101,9 +99,9 @@ public class TopicListSubCommand implements SubCommand { for (String group : groupList.getGroupList()) { System.out.printf("%-20s %-48s %-48s%n", - UtilAll.frontStringAtLeast(clusterName, 20), - UtilAll.frontStringAtLeast(topic, 48), - UtilAll.frontStringAtLeast(group, 48) + UtilAll.frontStringAtLeast(clusterName, 20), + UtilAll.frontStringAtLeast(topic, 48), + UtilAll.frontStringAtLeast(group, 48) ); } } @@ -121,8 +119,8 @@ public class TopicListSubCommand implements SubCommand { } private String findTopicBelongToWhichCluster(final String topic, final ClusterInfo clusterInfo, - final DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQClientException, - InterruptedException { + final DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQClientException, + InterruptedException { TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); BrokerData brokerData = topicRouteData.getBrokerDatas().get(0); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java index 6f89b22..b7a180f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java @@ -16,14 +16,13 @@ */ package org.apache.rocketmq.tools.command.topic; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - /** * @@ -36,13 +35,11 @@ public class TopicRouteSubCommand implements SubCommand { return "topicRoute"; } - @Override public String commandDesc() { return "Examine topic route info"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); @@ -52,7 +49,6 @@ public class TopicRouteSubCommand implements SubCommand { return options; } - @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java index 73b98c9..76d9cbc 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java @@ -6,16 +6,22 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.topic; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; @@ -23,14 +29,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - /** * @@ -43,13 +41,11 @@ public class TopicStatusSubCommand implements SubCommand { return "topicStatus"; } - @Override public String commandDesc() { return "Examine topic Status info"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); @@ -58,7 +54,6 @@ public class TopicStatusSubCommand implements SubCommand { return options; } - @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -75,11 +70,11 @@ public class TopicStatusSubCommand implements SubCommand { Collections.sort(mqList); System.out.printf("%-32s %-4s %-20s %-20s %s%n", - "#Broker Name", - "#QID", - "#Min Offset", - "#Max Offset", - "#Last Updated" + "#Broker Name", + "#QID", + "#Min Offset", + "#Max Offset", + "#Last Updated" ); for (MessageQueue mq : mqList) { @@ -91,11 +86,11 @@ public class TopicStatusSubCommand implements SubCommand { } System.out.printf("%-32s %-4d %-20d %-20d %s%n", - UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), - mq.getQueueId(), - topicOffset.getMinOffset(), - topicOffset.getMaxOffset(), - humanTimestamp + UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), + mq.getQueueId(), + topicOffset.getMinOffset(), + topicOffset.getMaxOffset(), + humanTimestamp ); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java index 4cc88eb..f9f4f1f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java @@ -6,26 +6,25 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.topic; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - /** * @@ -38,13 +37,11 @@ public class UpdateOrderConfCommand implements SubCommand { return "updateOrderConf"; } - @Override public String commandDesc() { return "Create or update or delete order conf"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); @@ -62,7 +59,6 @@ public class UpdateOrderConfCommand implements SubCommand { return options; } - @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -76,7 +72,7 @@ public class UpdateOrderConfCommand implements SubCommand { defaultMQAdminExt.start(); String orderConf = - defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic); + defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic); System.out.printf("get orderConf success. topic=[%s], orderConf=[%s] ", topic, orderConf); return; @@ -93,7 +89,7 @@ public class UpdateOrderConfCommand implements SubCommand { defaultMQAdminExt.createOrUpdateOrderConf(topic, orderConf, true); System.out.printf("update orderConf success. topic=[%s], orderConf=[%s]", topic, - orderConf.toString()); + orderConf.toString()); return; } else if ("delete".equals(type)) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java index cd119a0..fb7ab21 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java @@ -16,6 +16,11 @@ */ package org.apache.rocketmq.tools.command.topic; +import java.util.List; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; @@ -24,13 +29,6 @@ import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.List; -import java.util.Set; - public class UpdateTopicPermSubCommand implements SubCommand { @@ -39,13 +37,11 @@ public class UpdateTopicPermSubCommand implements SubCommand { return "updateTopicPerm"; } - @Override public String commandDesc() { return "Update topic perm"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); @@ -67,7 +63,6 @@ public class UpdateTopicPermSubCommand implements SubCommand { return options; } - @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -106,7 +101,7 @@ public class UpdateTopicPermSubCommand implements SubCommand { } else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim(); Set<String> masterSet = - CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, addr); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java index 25dd1f3..d4437b1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java @@ -16,6 +16,10 @@ */ package org.apache.rocketmq.tools.command.topic; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.RPCHook; @@ -23,12 +27,6 @@ import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Set; - public class UpdateTopicSubCommand implements SubCommand { @@ -37,13 +35,11 @@ public class UpdateTopicSubCommand implements SubCommand { return "updateTopic"; } - @Override public String commandDesc() { return "Update or create topic"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); @@ -85,7 +81,6 @@ public class UpdateTopicSubCommand implements SubCommand { return options; } - @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -142,7 +137,7 @@ public class UpdateTopicSubCommand implements SubCommand { String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums(); defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false); System.out.printf(String.format("set broker orderConf. isOrder=%s, orderConf=[%s]", - isOrder, orderConf.toString())); + isOrder, orderConf.toString())); } System.out.printf("create topic to %s success.%n", addr); System.out.printf("%s", topicConfig); @@ -154,7 +149,7 @@ public class UpdateTopicSubCommand implements SubCommand { defaultMQAdminExt.start(); Set<String> masterSet = - CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); System.out.printf("create topic to %s success.%n", addr); @@ -162,18 +157,18 @@ public class UpdateTopicSubCommand implements SubCommand { if (isOrder) { Set<String> brokerNameSet = - CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName); + CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName); StringBuilder orderConf = new StringBuilder(); String splitor = ""; for (String s : brokerNameSet) { orderConf.append(splitor).append(s).append(":") - .append(topicConfig.getWriteQueueNums()); + .append(topicConfig.getWriteQueueNums()); splitor = ";"; } defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), - orderConf.toString(), true); + orderConf.toString(), true); System.out.printf(String.format("set cluster orderConf. isOrder=%s, orderConf=[%s]", - isOrder, orderConf.toString())); + isOrder, orderConf.toString())); } System.out.printf("%s", topicConfig); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java index 9bcb2df..63b81f8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java @@ -6,60 +6,52 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.monitor; -import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.slf4j.Logger; - import java.util.Iterator; import java.util.Map.Entry; import java.util.TreeMap; - +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.slf4j.Logger; public class DefaultMonitorListener implements MonitorListener { private final static String LOG_PREFIX = "[MONITOR] "; private final static String LOG_NOTIFY = LOG_PREFIX + " [NOTIFY] "; private final Logger log = ClientLogger.getLog(); - public DefaultMonitorListener() { } - @Override public void beginRound() { log.info(LOG_PREFIX + "=========================================beginRound"); } - @Override public void reportUndoneMsgs(UndoneMsgs undoneMsgs) { log.info(String.format(LOG_PREFIX + "reportUndoneMsgs: %s", undoneMsgs)); } - @Override public void reportFailedMsgs(FailedMsgs failedMsgs) { log.info(String.format(LOG_PREFIX + "reportFailedMsgs: %s", failedMsgs)); } - @Override public void reportDeleteMsgsEvent(DeleteMsgsEvent deleteMsgsEvent) { log.info(String.format(LOG_PREFIX + "reportDeleteMsgsEvent: %s", deleteMsgsEvent)); } - @Override public void reportConsumerRunningInfo(TreeMap<String, ConsumerRunningInfo> criTable) { @@ -67,12 +59,11 @@ public class DefaultMonitorListener implements MonitorListener { boolean result = ConsumerRunningInfo.analyzeSubscription(criTable); if (!result) { log.info(String.format(LOG_NOTIFY - + "reportConsumerRunningInfo: ConsumerGroup: %s, Subscription different", criTable - .firstEntry().getValue().getProperties().getProperty("consumerGroup"))); + + "reportConsumerRunningInfo: ConsumerGroup: %s, Subscription different", criTable + .firstEntry().getValue().getProperties().getProperty("consumerGroup"))); } } - { Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator(); while (it.hasNext()) { @@ -80,16 +71,15 @@ public class DefaultMonitorListener implements MonitorListener { String result = ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue()); if (!result.isEmpty()) { log.info(String.format(LOG_NOTIFY - + "reportConsumerRunningInfo: ConsumerGroup: %s, ClientId: %s, %s", - criTable.firstEntry().getValue().getProperties().getProperty("consumerGroup"), - next.getKey(), - result)); + + "reportConsumerRunningInfo: ConsumerGroup: %s, ClientId: %s, %s", + criTable.firstEntry().getValue().getProperties().getProperty("consumerGroup"), + next.getKey(), + result)); } } } } - @Override public void endRound() { log.info(LOG_PREFIX + "=========================================endRound"); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java index 5db446f..3270286 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java @@ -6,48 +6,42 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.monitor; import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; - public class DeleteMsgsEvent { private OffsetMovedEvent offsetMovedEvent; private long eventTimestamp; - public OffsetMovedEvent getOffsetMovedEvent() { return offsetMovedEvent; } - public void setOffsetMovedEvent(OffsetMovedEvent offsetMovedEvent) { this.offsetMovedEvent = offsetMovedEvent; } - public long getEventTimestamp() { return eventTimestamp; } - public void setEventTimestamp(long eventTimestamp) { this.eventTimestamp = eventTimestamp; } - @Override public String toString() { return "DeleteMsgsEvent [offsetMovedEvent=" + offsetMovedEvent + ", eventTimestamp=" + eventTimestamp - + "]"; + + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java index 4c4e91c..bf63984 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.monitor; @@ -22,40 +22,33 @@ public class FailedMsgs { private String topic; private long failedMsgsTotalRecently; - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public String getTopic() { return topic; } - public void setTopic(String topic) { this.topic = topic; } - public long getFailedMsgsTotalRecently() { return failedMsgsTotalRecently; } - public void setFailedMsgsTotalRecently(long failedMsgsTotalRecently) { this.failedMsgsTotalRecently = failedMsgsTotalRecently; } - @Override public String toString() { return "FailedMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic - + ", failedMsgsTotalRecently=" + failedMsgsTotalRecently + "]"; + + ", failedMsgsTotalRecently=" + failedMsgsTotalRecently + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java index fbe6c3c..e60d317 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java @@ -6,42 +6,37 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.monitor; import org.apache.rocketmq.common.MixAll; - public class MonitorConfig { private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, - System.getenv(MixAll.NAMESRV_ADDR_ENV)); + System.getenv(MixAll.NAMESRV_ADDR_ENV)); private int roundInterval = 1000 * 60; - public String getNamesrvAddr() { return namesrvAddr; } - public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } - public int getRoundInterval() { return roundInterval; } - public void setRoundInterval(int roundInterval) { this.roundInterval = roundInterval; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java index a60a273..17f85ce 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java @@ -17,9 +17,8 @@ package org.apache.rocketmq.tools.monitor; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; - import java.util.TreeMap; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; public interface MonitorListener { void beginRound(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java index 8c368fe..d9c4cf8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java @@ -6,17 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.monitor; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Random; +import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.PullResult; @@ -43,17 +52,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - public class MonitorService { private final Logger log = ClientLogger.getLog(); private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorService")); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorService")); private final MonitorConfig monitorConfig; @@ -61,10 +63,9 @@ public class MonitorService { private final DefaultMQAdminExt defaultMQAdminExt; private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( - MixAll.TOOLS_CONSUMER_GROUP); + MixAll.TOOLS_CONSUMER_GROUP); private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer( - MixAll.MONITOR_CONSUMER_GROUP); - + MixAll.MONITOR_CONSUMER_GROUP); public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) { this.monitorConfig = monitorConfig; @@ -87,10 +88,10 @@ public class MonitorService { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { try { OffsetMovedEvent ome = - OffsetMovedEvent.decode(msgs.get(0).getBody(), OffsetMovedEvent.class); + OffsetMovedEvent.decode(msgs.get(0).getBody(), OffsetMovedEvent.class); DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent(); deleteMsgsEvent.setOffsetMovedEvent(ome); @@ -107,27 +108,18 @@ public class MonitorService { } } - - private String instanceName() { - String name = - System.currentTimeMillis() + new Random().nextInt() + this.monitorConfig.getNamesrvAddr(); - - return "MonitorService_" + name.hashCode(); - } - public static void main(String[] args) throws MQClientException { main0(args, null); } public static void main0(String[] args, RPCHook rpcHook) throws MQClientException { final MonitorService monitorService = - new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook); + new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook); monitorService.start(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; - @Override public void run() { synchronized (this) { @@ -140,6 +132,13 @@ public class MonitorService { }, "ShutdownHook")); } + private String instanceName() { + String name = + System.currentTimeMillis() + new Random().nextInt() + this.monitorConfig.getNamesrvAddr(); + + return "MonitorService_" + name.hashCode(); + } + public void start() throws MQClientException { this.defaultMQPullConsumer.start(); this.defaultMQAdminExt.start(); @@ -181,7 +180,6 @@ public class MonitorService { // log.error("reportUndoneMsgs Exception", e); } - try { this.reportConsumerRunningInfo(consumerGroup); } catch (Exception e) { @@ -228,7 +226,6 @@ public class MonitorService { } } - { Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator(); while (it.hasNext()) { @@ -245,7 +242,7 @@ public class MonitorService { } public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException, - MQBrokerException, RemotingException, MQClientException { + MQBrokerException, RemotingException, MQClientException { ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup); TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>(); for (Connection c : cc.getConnectionSet()) { @@ -257,7 +254,7 @@ public class MonitorService { try { ConsumerRunningInfo info = - defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false); + defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false); infoMap.put(clientId, info); } catch (Exception e) { } @@ -296,7 +293,7 @@ public class MonitorService { switch (pull.getPullStatus()) { case FOUND: long delay = - pull.getMsgFoundList().get(0).getStoreTimestamp() - ow.getLastTimestamp(); + pull.getMsgFoundList().get(0).getStoreTimestamp() - ow.getLastTimestamp(); if (delay > delayMax) { delayMax = delay; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java index ac549af..abc0cb9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.monitor; @@ -27,61 +27,50 @@ public class UndoneMsgs { private long undoneMsgsDelayTimeMills; - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public String getTopic() { return topic; } - public void setTopic(String topic) { this.topic = topic; } - public long getUndoneMsgsTotal() { return undoneMsgsTotal; } - public void setUndoneMsgsTotal(long undoneMsgsTotal) { this.undoneMsgsTotal = undoneMsgsTotal; } - public long getUndoneMsgsSingleMQ() { return undoneMsgsSingleMQ; } - public void setUndoneMsgsSingleMQ(long undoneMsgsSingleMQ) { this.undoneMsgsSingleMQ = undoneMsgsSingleMQ; } - public long getUndoneMsgsDelayTimeMills() { return undoneMsgsDelayTimeMills; } - public void setUndoneMsgsDelayTimeMills(long undoneMsgsDelayTimeMills) { this.undoneMsgsDelayTimeMills = undoneMsgsDelayTimeMills; } - @Override public String toString() { return "UndoneMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic + ", undoneMsgsTotal=" - + undoneMsgsTotal + ", undoneMsgsSingleMQ=" + undoneMsgsSingleMQ - + ", undoneMsgsDelayTimeMills=" + undoneMsgsDelayTimeMills + "]"; + + undoneMsgsTotal + ", undoneMsgsSingleMQ=" + undoneMsgsSingleMQ + + ", undoneMsgsDelayTimeMills=" + undoneMsgsDelayTimeMills + "]"; } }