http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java new file mode 100644 index 0000000..5f5409b --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.broker; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.common.protocol.body.KVTable; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.CommandUtil; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + + +/** + * @author shijia.wxr + */ +public class BrokerStatusSubCommand implements SubCommand { + + @Override + public String commandName() { + return "brokerStatus"; + } + + + @Override + public String commandDesc() { + return "Fetch broker runtime status data"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "Broker address"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "which cluster"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + String brokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null; + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; + if (brokerAddr != null) { + printBrokerRuntimeStats(defaultMQAdminExt, brokerAddr, false); + } else if (clusterName != null) { + Set<String> masterSet = + CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName); + for (String ba : masterSet) { + try { + printBrokerRuntimeStats(defaultMQAdminExt, ba, true); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + public void printBrokerRuntimeStats(final DefaultMQAdminExt defaultMQAdminExt, final String brokerAddr, final boolean printBroker) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(brokerAddr); + + TreeMap<String, String> tmp = new TreeMap<String, String>(); + tmp.putAll(kvTable.getTable()); + + Iterator<Entry<String, String>> it = tmp.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, String> next = it.next(); + if (printBroker) { + System.out.printf("%-24s %-32s: %s%n", brokerAddr, next.getKey(), next.getValue()); + } else { + System.out.printf("%-32s: %s%n", next.getKey(), next.getValue()); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java new file mode 100644 index 0000000..c2918c1 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.broker; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * @author lansheng.zj + */ +public class CleanExpiredCQSubCommand implements SubCommand { + + @Override + public String commandName() { + return "cleanExpiredCQ"; + } + + + @Override + public String commandDesc() { + return "Clean expired ConsumeQueue on broker."; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "Broker address"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "cluster", true, "clustername"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + boolean result = false; + defaultMQAdminExt.start(); + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + result = defaultMQAdminExt.cleanExpiredConsumerQueueByAddr(addr); + + } else { + String cluster = commandLine.getOptionValue('c'); + if (null != cluster) + cluster = cluster.trim(); + result = defaultMQAdminExt.cleanExpiredConsumerQueue(cluster); + } + System.out.printf(result ? "success" : "false"); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java new file mode 100644 index 0000000..f7b543f --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.broker; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * @author lansheng.zj + */ +public class CleanUnusedTopicCommand implements SubCommand { + + @Override + public String commandName() { + return "cleanUnusedTopic"; + } + + + @Override + public String commandDesc() { + return "Clean unused topic on broker."; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "Broker address"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "cluster", true, "cluster name"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + boolean result = false; + defaultMQAdminExt.start(); + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + result = defaultMQAdminExt.cleanUnusedTopicByAddr(addr); + + } else { + String cluster = commandLine.getOptionValue('c'); + if (null != cluster) + cluster = cluster.trim(); + result = defaultMQAdminExt.cleanUnusedTopicByAddr(cluster); + } + System.out.printf(result ? "success" : "false"); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java new file mode 100644 index 0000000..703d69b --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.broker; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.admin.MQAdminExt; +import com.alibaba.rocketmq.tools.command.CommandUtil; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * @author xigu.lx + */ +public class GetBrokerConfigCommand implements SubCommand { + @Override + public String commandName() { + return "getBrokerConfig"; + } + + @Override + public String commandDesc() { + return "Get broker config by cluster or special broker!"; + } + + @Override + public Options buildCommandlineOptions(final Options options) { + Option opt = new Option("b", "brokerAddr", true, "update which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "update which cluster"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + + if (commandLine.hasOption('b')) { + String brokerAddr = commandLine.getOptionValue('b').trim(); + defaultMQAdminExt.start(); + + getAndPrint(defaultMQAdminExt, + String.format("============%s============\n", brokerAddr), + brokerAddr); + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + defaultMQAdminExt.start(); + + Map<String, List<String>> masterAndSlaveMap + = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName); + + for (String masterAddr : masterAndSlaveMap.keySet()) { + + getAndPrint( + defaultMQAdminExt, + String.format("============Master: %s============\n", masterAddr), + masterAddr + ); + for (String slaveAddr : masterAndSlaveMap.get(masterAddr)) { + + getAndPrint( + defaultMQAdminExt, + String.format("============My Master: %s=====Slave: %s============\n", masterAddr, slaveAddr), + slaveAddr + ); + } + } + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + protected void getAndPrint(final MQAdminExt defaultMQAdminExt, final String printPrefix, final String addr) + throws InterruptedException, RemotingConnectException, + UnsupportedEncodingException, RemotingTimeoutException, + MQBrokerException, RemotingSendRequestException { + + System.out.print(printPrefix); + + Properties properties = defaultMQAdminExt.getBrokerConfig(addr); + if (properties == null) { + System.out.printf("Broker[%s] has no config property!\n", addr); + return; + } + + for (Object key : properties.keySet()) { + System.out.printf("%-50s= %s\n", key, properties.get(key)); + } + + System.out.printf("%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java new file mode 100644 index 0000000..165e397 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.broker; + +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.io.UnsupportedEncodingException; + + +/** + * @author lansheng.zj + */ +public class SendMsgStatusCommand implements SubCommand { + + @Override + public String commandName() { + return "sendMsgStatus"; + } + + + @Override + public String commandDesc() { + return "send msg to broker."; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerName", true, "Broker Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("s", "messageSize", true, "Message Size, Default: 128"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "count", true, "send message count, Default: 50"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook); + producer.setInstanceName("PID_SMSC_" + System.currentTimeMillis()); + + try { + producer.start(); + String brokerName = commandLine.getOptionValue('b').trim(); + int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; + int count = commandLine.hasOption('c') ? Integer.parseInt(commandLine.getOptionValue('c')) : 50; + + producer.send(buildMessage(brokerName, 16)); + + for (int i = 0; i < count; i++) { + long begin = System.currentTimeMillis(); + SendResult result = producer.send(buildMessage(brokerName, messageSize)); + System.out.printf("rt:" + (System.currentTimeMillis() - begin) + "ms, SendResult=" + result); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + producer.shutdown(); + } + } + + + private static Message buildMessage(final String topic, final int messageSize) throws UnsupportedEncodingException { + Message msg = new Message(); + msg.setTopic(topic); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i += 11) { + sb.append("hello jodie"); + } + msg.setBody(sb.toString().getBytes(MixAll.DEFAULT_CHARSET)); + return msg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java new file mode 100644 index 0000000..86938a7 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.broker; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.CommandUtil; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Properties; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class UpdateBrokerConfigSubCommand implements SubCommand { + + @Override + public String commandName() { + return "updateBrokerConfig"; + } + + + @Override + public String commandDesc() { + return "Update broker's config"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "update which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "update which cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("k", "key", true, "config key"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("v", "value", true, "config value"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + String key = commandLine.getOptionValue('k').trim(); + String value = commandLine.getOptionValue('v').trim(); + Properties properties = new Properties(); + properties.put(key, value); + + if (commandLine.hasOption('b')) { + String brokerAddr = commandLine.getOptionValue('b').trim(); + + defaultMQAdminExt.start(); + + defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties); + System.out.printf("update broker config success, %s\n", brokerAddr); + return; + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + + Set<String> masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String brokerAddr : masterSet) { + try { + defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties); + System.out.printf("update broker config success, %s\n", brokerAddr); + } catch (Exception e) { + e.printStackTrace(); + } + } + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java new file mode 100644 index 0000000..3a28522 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.cluster; + +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.math.BigDecimal; +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * @author fengliang.hfl + */ +public class CLusterSendMsgRTCommand implements SubCommand { + + public static void main(String args[]) { + } + + @Override + public String commandName() { + return "clusterRT"; + } + + @Override + public String commandDesc() { + return "List All clusters Message Send RT"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("a", "amout", true, "message amout | default 100"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "size", true, "message size | default 128 Byte"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("c", "cluster", true, "cluster name | default display all cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "print log", true, "print as tlog | default false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "machine room", true, "machine room name | default noname"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("i", "interval", true, "print interval | default 10 seconds"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + DefaultMQProducer producer = new DefaultMQProducer(rpcHook); + producer.setProducerGroup(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + producer.start(); + + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + HashMap<String, Set<String>> clusterAddr = clusterInfoSerializeWrapper + .getClusterAddrTable(); + + Set<String> clusterNames = null; + + long amount = !commandLine.hasOption('a') ? 50 : Long.parseLong(commandLine + .getOptionValue('a').trim()); + + long size = !commandLine.hasOption('s') ? 128 : Long.parseLong(commandLine + .getOptionValue('s').trim()); + + long interval = !commandLine.hasOption('i') ? 10 : Long.parseLong(commandLine + .getOptionValue('i').trim()); + + boolean printAsTlog = !commandLine.hasOption('p') ? false : Boolean + .parseBoolean(commandLine.getOptionValue('p').trim()); + + String machineRoom = !commandLine.hasOption('m') ? "noname" : commandLine + .getOptionValue('m').trim(); + + if (commandLine.hasOption('c')) { + clusterNames = new TreeSet<String>(); + clusterNames.add(commandLine.getOptionValue('c').trim()); + } else { + clusterNames = clusterAddr.keySet(); + } + + if (!printAsTlog) { + System.out.printf("%-24s %-24s %-4s %-8s %-8s%n", + "#Cluster Name", + "#Broker Name", + "#RT", + "#successCount", + "#failCount" + ); + } + + while (true) { + for (String clusterName : clusterNames) { + Set<String> brokerNames = clusterAddr.get(clusterName); + if (brokerNames == null) { + System.out.printf("cluster [%s] not exist", clusterName); + break; + } + + for (String brokerName : brokerNames) { + Message msg = new Message(brokerName, getStringBySize(size).getBytes(MixAll.DEFAULT_CHARSET)); + long start = 0; + long end = 0; + long elapsed = 0; + int successCount = 0; + int failCount = 0; + + for (int i = 0; i < amount; i++) { + start = System.currentTimeMillis(); + try { + producer.send(msg); + successCount++; + end = System.currentTimeMillis(); + } catch (Exception e) { + failCount++; + end = System.currentTimeMillis(); + } + + if (i != 0) { + elapsed += end - start; + } + } + + double rt = (double) elapsed / (amount - 1); + if (!printAsTlog) { + System.out.printf("%-24s %-24s %-8s %-16s %-16s%n", + clusterName, + brokerName, + String.format("%.2f", rt), + successCount, + failCount + ); + } else { + System.out.printf(String.format("%s|%s|%s|%s|%s%n", getCurTime(), + machineRoom, clusterName, brokerName, + new BigDecimal(rt).setScale(0, BigDecimal.ROUND_HALF_UP))); + } + + } + + } + + Thread.sleep(interval * 1000); + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + producer.shutdown(); + } + } + + public String getStringBySize(long size) { + StringBuilder res = new StringBuilder(); + for (int i = 0; i < size; i++) { + res.append('a'); + } + return res.toString(); + } + + public String getCurTime() { + String fromTimeZone = "GMT+8"; + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = new Date(); + format.setTimeZone(TimeZone.getTimeZone(fromTimeZone)); + String chinaDate = format.format(date); + return chinaDate; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java new file mode 100644 index 0000000..baf4f3c --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.cluster; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; +import com.alibaba.rocketmq.common.protocol.body.KVTable; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + + +/** + * @author shijia.wxr + */ +public class ClusterListSubCommand implements SubCommand { + + @Override + public String commandName() { + return "clusterList"; + } + + + @Override + public String commandDesc() { + return "List all of clusters"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("m", "moreStats", false, "Print more stats"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("i", "interval", true, "specify intervals numbers, it is in seconds"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + long printInterval = 1; + boolean enableInterval = commandLine.hasOption('i'); + + if (enableInterval) { + printInterval = Long.parseLong(commandLine.getOptionValue('i')) * 1000; + } + + try { + defaultMQAdminExt.start(); + long i = 0; + + do { + if (i++ > 0) { + Thread.sleep(printInterval); + } + if (commandLine.hasOption('m')) { + this.printClusterMoreStats(defaultMQAdminExt); + } else { + this.printClusterBaseInfo(defaultMQAdminExt); + } + } while (enableInterval); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, + RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException { + + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + + System.out.printf("%-16s %-32s %14s %14s %14s %14s%n", + "#Cluster Name", + "#Broker Name", + "#InTotalYest", + "#OutTotalYest", + "#InTotalToday", + "#OutTotalToday" + ); + + Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator(); + while (itCluster.hasNext()) { + Map.Entry<String, Set<String>> next = itCluster.next(); + String clusterName = next.getKey(); + TreeSet<String> brokerNameSet = new TreeSet<String>(); + brokerNameSet.addAll(next.getValue()); + + for (String brokerName : brokerNameSet) { + BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); + if (brokerData != null) { + + Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator(); + while (itAddr.hasNext()) { + Map.Entry<Long, String> next1 = itAddr.next(); + long inTotalYest = 0; + long outTotalYest = 0; + long inTotalToday = 0; + long outTotalToday = 0; + + try { + KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue()); + String msgPutTotalYesterdayMorning = kvTable.getTable().get("msgPutTotalYesterdayMorning"); + String msgPutTotalTodayMorning = kvTable.getTable().get("msgPutTotalTodayMorning"); + String msgPutTotalTodayNow = kvTable.getTable().get("msgPutTotalTodayNow"); + String msgGetTotalYesterdayMorning = kvTable.getTable().get("msgGetTotalYesterdayMorning"); + String msgGetTotalTodayMorning = kvTable.getTable().get("msgGetTotalTodayMorning"); + String msgGetTotalTodayNow = kvTable.getTable().get("msgGetTotalTodayNow"); + + inTotalYest = Long.parseLong(msgPutTotalTodayMorning) - Long.parseLong(msgPutTotalYesterdayMorning); + outTotalYest = Long.parseLong(msgGetTotalTodayMorning) - Long.parseLong(msgGetTotalYesterdayMorning); + + inTotalToday = Long.parseLong(msgPutTotalTodayNow) - Long.parseLong(msgPutTotalTodayMorning); + outTotalToday = Long.parseLong(msgGetTotalTodayNow) - Long.parseLong(msgGetTotalTodayMorning); + + } catch (Exception e) { + } + + System.out.printf("%-16s %-32s %14d %14d %14d %14d%n", + clusterName, + brokerName, + inTotalYest, + outTotalYest, + inTotalToday, + outTotalToday + ); + } + } + } + + if (itCluster.hasNext()) { + System.out.printf(""); + } + } + } + + private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException, + RemotingSendRequestException, InterruptedException, MQBrokerException { + + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + + System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", + "#Cluster Name", + "#Broker Name", + "#BID", + "#Addr", + "#Version", + "#InTPS(LOAD)", + "#OutTPS(LOAD)", + "#PCWait(ms)", + "#Hour", + "#SPACE" + ); + + Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator(); + while (itCluster.hasNext()) { + Map.Entry<String, Set<String>> next = itCluster.next(); + String clusterName = next.getKey(); + TreeSet<String> brokerNameSet = new TreeSet<String>(); + brokerNameSet.addAll(next.getValue()); + + for (String brokerName : brokerNameSet) { + BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); + if (brokerData != null) { + + Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator(); + while (itAddr.hasNext()) { + Map.Entry<Long, String> next1 = itAddr.next(); + double in = 0; + double out = 0; + String version = ""; + String sendThreadPoolQueueSize = ""; + String pullThreadPoolQueueSize = ""; + String sendThreadPoolQueueHeadWaitTimeMills = ""; + String pullThreadPoolQueueHeadWaitTimeMills = ""; + String pageCacheLockTimeMills = ""; + String earliestMessageTimeStamp = ""; + String commitLogDiskRatio = ""; + try { + KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue()); + String putTps = kvTable.getTable().get("putTps"); + String getTransferedTps = kvTable.getTable().get("getTransferedTps"); + sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize"); + pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize"); + + sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize"); + pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize"); + + sendThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills"); + pullThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills"); + pageCacheLockTimeMills = kvTable.getTable().get("pageCacheLockTimeMills"); + earliestMessageTimeStamp = kvTable.getTable().get("earliestMessageTimeStamp"); + commitLogDiskRatio = kvTable.getTable().get("commitLogDiskRatio"); + + version = kvTable.getTable().get("brokerVersionDesc"); + { + String[] tpss = putTps.split(" "); + if (tpss != null && tpss.length > 0) { + in = Double.parseDouble(tpss[0]); + } + } + + { + String[] tpss = getTransferedTps.split(" "); + if (tpss != null && tpss.length > 0) { + out = Double.parseDouble(tpss[0]); + } + } + } catch (Exception e) { + } + + double hour = 0.0; + double space = 0.0; + + if (earliestMessageTimeStamp != null && earliestMessageTimeStamp.length() > 0) { + long mills = System.currentTimeMillis() - Long.valueOf(earliestMessageTimeStamp); + hour = mills / 1000.0 / 60.0 / 60.0; + } + + if (commitLogDiskRatio != null && commitLogDiskRatio.length() > 0) { + space = Double.valueOf(commitLogDiskRatio); + } + + System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", + clusterName, + brokerName, + next1.getKey().longValue(), + next1.getValue(), + version, + String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills), + String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills), + pageCacheLockTimeMills, + String.format("%2.2f", hour), + String.format("%.4f", space) + ); + } + } + } + + if (itCluster.hasNext()) { + System.out.printf(""); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java new file mode 100644 index 0000000..aa0598e --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.connection; + +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.protocol.body.Connection; +import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Iterator; +import java.util.Map.Entry; + + +/** + * @author shijia.wxr + */ +public class ConsumerConnectionSubCommand implements SubCommand { + + @Override + public String commandName() { + return "consumerConnection"; + } + + @Override + public String commandDesc() { + return "Query consumer's socket connection, client version and subscription"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "consumerGroup", true, "consumer group name"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + String group = commandLine.getOptionValue('g').trim(); + + ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group); + + + int i = 1; + for (Connection conn : cc.getConnectionSet()) { + System.out.printf("%03d %-32s %-22s %-8s %s%n", + i++, + conn.getClientId(), + conn.getClientAddr(), + conn.getLanguage(), + MQVersion.getVersionDesc(conn.getVersion()) + ); + } + + System.out.printf("%nBelow is subscription:"); + Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator(); + i = 1; + while (it.hasNext()) { + Entry<String, SubscriptionData> entry = it.next(); + SubscriptionData sd = entry.getValue(); + System.out.printf("%03d Topic: %-40s SubExpression: %s%n", + i++, + sd.getTopic(), + sd.getSubString() + ); + } + + System.out.printf(""); + System.out.printf("ConsumeType: %s%n", cc.getConsumeType()); + System.out.printf("MessageModel: %s%n", cc.getMessageModel()); + System.out.printf("ConsumeFromWhere: %s%n", cc.getConsumeFromWhere()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java new file mode 100644 index 0000000..97ba792 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.connection; + +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.protocol.body.Connection; +import com.alibaba.rocketmq.common.protocol.body.ProducerConnection; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * @author shijia.wxr + */ +public class ProducerConnectionSubCommand implements SubCommand { + + @Override + public String commandName() { + return "producerConnection"; + } + + @Override + public String commandDesc() { + return "Query producer's socket connection and client version"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "producerGroup", true, "producer group name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + String group = commandLine.getOptionValue('g').trim(); + String topic = commandLine.getOptionValue('t').trim(); + + ProducerConnection pc = defaultMQAdminExt.examineProducerConnectionInfo(group, topic); + + int i = 1; + for (Connection conn : pc.getConnectionSet()) { + System.out.printf("%04d %-32s %-22s %-8s %s%n", + i++, + conn.getClientId(), + conn.getClientAddr(), + conn.getLanguage(), + MQVersion.getVersionDesc(conn.getVersion()) + ); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java new file mode 100644 index 0000000..d09b74a --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.consumer; + +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.admin.ConsumeStats; +import com.alibaba.rocketmq.common.admin.OffsetWrapper; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; +import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; + +import java.util.Collections; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class ConsumerProgressSubCommand implements SubCommand { + private final Logger log = ClientLogger.getLog(); + + @Override + public String commandName() { + return "consumerProgress"; + } + + @Override + public String commandDesc() { + return "Query consumers's progress, speed"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "groupName", true, "consumer group name"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + if (commandLine.hasOption('g')) { + String consumerGroup = commandLine.getOptionValue('g').trim(); + ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup); + List<MessageQueue> mqList = new LinkedList<MessageQueue>(); + mqList.addAll(consumeStats.getOffsetTable().keySet()); + Collections.sort(mqList); + + System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n", + "#Topic", + "#Broker Name", + "#QID", + "#Broker Offset", + "#Consumer Offset", + "#Diff", + "#LastTime"); + + long diffTotal = 0L; + for (MessageQueue mq : mqList) { + OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq); + long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset(); + diffTotal += diff; + String lastTime = ""; + try { + lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS); + } catch (Exception e) { + } + System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n", + UtilAll.frontStringAtLeast(mq.getTopic(), 32), + UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), + mq.getQueueId(), + offsetWrapper.getBrokerOffset(), + offsetWrapper.getConsumerOffset(), + diff, + lastTime + ); + } + + System.out.printf("%n"); + System.out.printf("Consume TPS: %s%n", consumeStats.getConsumeTps()); + System.out.printf("Diff Total: %d%n", diffTotal); + } else { + System.out.printf("%-32s %-6s %-24s %-5s %-14s %-7s %s%n", + "#Group", + "#Count", + "#Version", + "#Type", + "#Model", + "#TPS", + "#Diff Total" + ); + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + for (String topic : topicList.getTopicList()) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + try { + ConsumeStats consumeStats = null; + try { + consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup); + } catch (Exception e) { + log.warn("examineConsumeStats exception, " + consumerGroup, e); + } + + ConsumerConnection cc = null; + try { + cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup); + } catch (Exception e) { + log.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e); + } + + GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); + groupConsumeInfo.setGroup(consumerGroup); + + if (consumeStats != null) { + groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps()); + groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); + } + + if (cc != null) { + groupConsumeInfo.setCount(cc.getConnectionSet().size()); + groupConsumeInfo.setMessageModel(cc.getMessageModel()); + groupConsumeInfo.setConsumeType(cc.getConsumeType()); + groupConsumeInfo.setVersion(cc.computeMinVersion()); + } + + System.out.printf("%-32s %-6d %-24s %-5s %-14s %-7d %d%n", + UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 32), + groupConsumeInfo.getCount(), + groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE", + groupConsumeInfo.consumeTypeDesc(), + groupConsumeInfo.messageModelDesc(), + groupConsumeInfo.getConsumeTps(), + groupConsumeInfo.getDiffTotal() + ); + } catch (Exception e) { + log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, e); + } + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} + + +class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { + private String group; + private int version; + private int count; + private ConsumeType consumeType; + private MessageModel messageModel; + private int consumeTps; + private long diffTotal; + + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public String consumeTypeDesc() { + if (this.count != 0) { + return this.getConsumeType() == ConsumeType.CONSUME_ACTIVELY ? "PULL" : "PUSH"; + } + return ""; + } + + public ConsumeType getConsumeType() { + return consumeType; + } + + public void setConsumeType(ConsumeType consumeType) { + this.consumeType = consumeType; + } + + public String messageModelDesc() { + if (this.count != 0 && this.getConsumeType() == ConsumeType.CONSUME_PASSIVELY) { + return this.getMessageModel().toString(); + } + return ""; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + public String versionDesc() { + if (this.count != 0) { + return MQVersion.getVersionDesc(this.version); + } + return ""; + } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public long getDiffTotal() { + return diffTotal; + } + + + public void setDiffTotal(long diffTotal) { + this.diffTotal = diffTotal; + } + + + @Override + public int compareTo(GroupConsumeInfo o) { + if (this.count != o.count) { + return o.count - this.count; + } + + return (int) (o.diffTotal - diffTotal); + } + + + public int getConsumeTps() { + return consumeTps; + } + + + public void setConsumeTps(int consumeTps) { + this.consumeTps = consumeTps; + } + + + public int getVersion() { + return version; + } + + + public void setVersion(int version) { + this.version = version; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java new file mode 100644 index 0000000..cf796d8 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.consumer; + +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.protocol.body.Connection; +import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; +import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.MQAdminStartup; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerStatusSubCommand implements SubCommand { + + public static void main(String[] args) { + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); + MQAdminStartup.main(new String[]{new ConsumerStatusSubCommand().commandName(), "-g", "benchmark_consumer"}); + } + + @Override + public String commandName() { + return "consumerStatus"; + } + + @Override + public String commandDesc() { + return "Query consumer's internal data structure"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "consumerGroup", true, "consumer group name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("i", "clientId", true, "The consumer's client id"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "jstack", false, "Run jstack command in the consumer progress"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + String group = commandLine.getOptionValue('g').trim(); + ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group); + boolean jstack = commandLine.hasOption('s'); + if (!commandLine.hasOption('i')) { + int i = 1; + long now = System.currentTimeMillis(); + final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable = new TreeMap<String, ConsumerRunningInfo>(); + for (Connection conn : cc.getConnectionSet()) { + try { + ConsumerRunningInfo consumerRunningInfo = + defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack); + if (consumerRunningInfo != null) { + criTable.put(conn.getClientId(), consumerRunningInfo); + String filePath = now + "/" + conn.getClientId(); + MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath); + System.out.printf("%03d %-40s %-20s %s%n", + i++, + conn.getClientId(), + MQVersion.getVersionDesc(conn.getVersion()), + filePath); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + if (!criTable.isEmpty()) { + boolean subSame = ConsumerRunningInfo.analyzeSubscription(criTable); + + boolean rebalanceOK = subSame && ConsumerRunningInfo.analyzeRebalance(criTable); + + if (subSame) { + System.out.printf("%n%nSame subscription in the same group of consumer"); + System.out.printf("%n%nRebalance %s%n", rebalanceOK ? "OK" : "Failed"); + Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerRunningInfo> next = it.next(); + String result = + ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue()); + if (result.length() > 0) { + System.out.printf(result); + } + } + } else { + System.out.printf("%n%nWARN: Different subscription in the same group of consumer!!!"); + } + } + } else { + String clientId = commandLine.getOptionValue('i').trim(); + ConsumerRunningInfo consumerRunningInfo = + defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack); + if (consumerRunningInfo != null) { + System.out.printf("%s", consumerRunningInfo.formatString()); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java new file mode 100644 index 0000000..373da1e --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.consumer; + +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.protocol.body.Connection; +import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; +import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.MQAdminStartup; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerSubCommand implements SubCommand { + + public static void main(String[] args) { + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); + MQAdminStartup.main(new String[]{new ConsumerSubCommand().commandName(), "-g", "benchmark_consumer"}); + } + + @Override + public String commandName() { + return "consumer"; + } + + @Override + public String commandDesc() { + return "Query consumer's connection, status, etc."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "consumerGroup", true, "consumer group name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("s", "jstack", false, "Run jstack command in the consumer progress"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + String group = commandLine.getOptionValue('g').trim(); + ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group); + boolean jstack = commandLine.hasOption('s'); + + if (!commandLine.hasOption('i')) { + + int i = 1; + long now = System.currentTimeMillis(); + final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable = + new TreeMap<String, ConsumerRunningInfo>(); + for (Connection conn : cc.getConnectionSet()) { + try { + ConsumerRunningInfo consumerRunningInfo = + defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack); + if (consumerRunningInfo != null) { + criTable.put(conn.getClientId(), consumerRunningInfo); + String filePath = now + "/" + conn.getClientId(); + MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath); + System.out.printf("%03d %-40s %-20s %s%n", + i++, + conn.getClientId(), + MQVersion.getVersionDesc(conn.getVersion()), + filePath); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + if (!criTable.isEmpty()) { + boolean subSame = ConsumerRunningInfo.analyzeSubscription(criTable); + boolean rebalanceOK = subSame && ConsumerRunningInfo.analyzeRebalance(criTable); + + if (subSame) { + System.out.printf("%n%nSame subscription in the same group of consumer"); + System.out.printf("%n%nRebalance %s%n", rebalanceOK ? "OK" : "Failed"); + + Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerRunningInfo> next = it.next(); + String result = + ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue()); + if (result.length() > 0) { + System.out.printf(result); + } + } + } else { + System.out.printf("%n%nWARN: Different subscription in the same group of consumer!!!"); + } + } + } else { + String clientId = commandLine.getOptionValue('i').trim(); + ConsumerRunningInfo consumerRunningInfo = + defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack); + if (consumerRunningInfo != null) { + System.out.printf(consumerRunningInfo.formatString()); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java new file mode 100644 index 0000000..712a0d0 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.consumer; + +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.CommandUtil; +import com.alibaba.rocketmq.tools.command.SubCommand; +import com.alibaba.rocketmq.tools.command.topic.DeleteTopicSubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Set; + + +/** + * @author lansheng.zj + */ +public class DeleteSubscriptionGroupCommand implements SubCommand { + @Override + public String commandName() { + return "deleteSubGroup"; + } + + + @Override + public String commandDesc() { + return "Delete subscription group from broker."; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "delete subscription group from which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "delete subscription group from which cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "groupName", true, "subscription group name"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); + adminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + // groupName + String groupName = commandLine.getOptionValue('g').trim(); + + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + adminExt.start(); + + adminExt.deleteSubscriptionGroup(addr, groupName); + System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName, + addr); + + return; + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + adminExt.start(); + + Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); + for (String master : masterSet) { + adminExt.deleteSubscriptionGroup(master, groupName); + System.out.printf( + "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n", + groupName, master, clusterName); + } + + try { + DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.RETRY_GROUP_TOPIC_PREFIX + + groupName); + DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.DLQ_GROUP_TOPIC_PREFIX + + groupName); + } catch (Exception e) { + } + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + e.printStackTrace(); + } finally { + adminExt.shutdown(); + } + } +}
