http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java new file mode 100644 index 0000000..f6ee1f7 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.namesrv; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * @author xigu.lx + */ +public class UpdateNamesrvConfigCommand implements SubCommand { + @Override + public String commandName() { + return "updateNamesrvConfig"; + } + + @Override + public String commandDesc() { + return "Update configs of name server."; + } + + @Override + public Options buildCommandlineOptions(final Options options) { + Option opt = new Option("k", "key", true, "config key"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("v", "value", true, "config value"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + // key name + String key = commandLine.getOptionValue('k').trim(); + // key name + String value = commandLine.getOptionValue('v').trim(); + Properties properties = new Properties(); + properties.put(key, value); + + // servers + String servers = commandLine.getOptionValue('n'); + List<String> serverList = null; + if (servers != null && servers.length() > 0) { + String[] serverArray = servers.trim().split(";"); + + if (serverArray != null && serverArray.length > 0) { + serverList = Arrays.asList(serverArray); + } + } + + defaultMQAdminExt.start(); + + defaultMQAdminExt.updateNameServerConfig(properties, serverList); + + System.out.printf("update name server config success!%s\n%s : %s\n", + serverList == null ? "" : serverList, key, value); + return; + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java new file mode 100644 index 0000000..053ac7e --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.namesrv; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.List; + +public class WipeWritePermSubCommand implements SubCommand { + + @Override + public String commandName() { + return "wipeWritePerm"; + } + + + @Override + public String commandDesc() { + return "Wipe write perm of broker in all name server"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerName", true, "broker name"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + String brokerName = commandLine.getOptionValue('b').trim(); + List<String> namesrvList = defaultMQAdminExt.getNameServerAddressList(); + if (namesrvList != null) { + for (String namesrvAddr : namesrvList) { + try { + int wipeTopicCount = defaultMQAdminExt.wipeWritePermOfBroker(namesrvAddr, brokerName); + System.out.printf("wipe write perm of broker[%s] in name server[%s] OK, %d%n", + brokerName, + namesrvAddr, + wipeTopicCount + ); + } catch (Exception e) { + System.out.printf("wipe write perm of broker[%s] in name server[%s] Failed%n", + brokerName, + namesrvAddr + ); + + e.printStackTrace(); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java new file mode 100644 index 0000000..b72aeae --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.offset; + +import com.alibaba.rocketmq.common.admin.ConsumeStats; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Set; + +public class CloneGroupOffsetCommand implements SubCommand { + @Override + public String commandName() { + return "cloneGroupOffset"; + } + + @Override + public String commandDesc() { + return "clone offset from other group."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("s", "srcGroup", true, "set source consumer group"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("d", "destGroup", true, "set destination consumer group"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "set the topic"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("o", "offline", true, "the group or the topic is offline"); + opt.setRequired(false); + options.addOption(opt); + + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + String srcGroup = commandLine.getOptionValue("s").trim(); + String destGroup = commandLine.getOptionValue("d").trim(); + String topic = commandLine.getOptionValue("t").trim(); + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(srcGroup); + Set<MessageQueue> mqs = consumeStats.getOffsetTable().keySet(); + if (mqs != null && !mqs.isEmpty()) { + TopicRouteData topicRoute = defaultMQAdminExt.examineTopicRouteInfo(topic); + for (MessageQueue mq : mqs) { + String addr = null; + for (BrokerData brokerData : topicRoute.getBrokerDatas()) { + if (brokerData.getBrokerName().equals(mq.getBrokerName())) { + addr = brokerData.selectBrokerAddr(); + break; + } + } + long offset = consumeStats.getOffsetTable().get(mq).getBrokerOffset(); + if (offset >= 0) { + defaultMQAdminExt.updateConsumeOffset(addr, destGroup, mq, offset); + } + } + } + System.out.printf("clone group offset success. srcGroup[%s], destGroup=[%s], topic[%s]", + srcGroup, destGroup, topic); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java new file mode 100644 index 0000000..af79512 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.offset; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Map; + +public class GetConsumerStatusCommand implements SubCommand { + @Override + public String commandName() { + return "getConsumerStatus"; + } + + @Override + public String commandDesc() { + return "get consumer status from client."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "group", true, "set the consumer group"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "set the topic"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("i", "originClientId", true, "set the consumer clientId"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + String group = commandLine.getOptionValue("g").trim(); + String topic = commandLine.getOptionValue("t").trim(); + String originClientId = ""; + if (commandLine.hasOption("i")) { + originClientId = commandLine.getOptionValue("i").trim(); + } + defaultMQAdminExt.start(); + + Map<String, Map<MessageQueue, Long>> consumerStatusTable = + defaultMQAdminExt.getConsumeStatus(topic, group, originClientId); + System.out.printf("get consumer status from client. group=%s, topic=%s, originClientId=%s%n", + group, topic, originClientId); + + System.out.printf("%-50s %-15s %-15s %-20s%n", + "#clientId", + "#brokerName", + "#queueId", + "#offset"); + + for (Map.Entry<String, Map<MessageQueue, Long>> entry : consumerStatusTable.entrySet()) { + String clientId = entry.getKey(); + Map<MessageQueue, Long> mqTable = entry.getValue(); + for (Map.Entry<MessageQueue, Long> entry1 : mqTable.entrySet()) { + MessageQueue mq = entry1.getKey(); + System.out.printf("%-50s %-15s %-15d %-20d%n", + UtilAll.frontStringAtLeast(clientId, 50), + mq.getBrokerName(), + mq.getQueueId(), + mqTable.get(mq)); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java new file mode 100644 index 0000000..e2bbbff --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.offset; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.util.Iterator; +import java.util.Map; + + +/** + * @author manhong.yqd + */ +public class ResetOffsetByTimeCommand implements SubCommand { + public static void main(String[] args) { + ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-t Jodie_rest_test", "-g CID_Jodie_rest_test", "-s -1", "-f true"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + + @Override + public String commandName() { + return "resetOffsetByTime"; + } + + @Override + public String commandDesc() { + return "Reset consumer offset by timestamp(without client restart)."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "group", true, "set the consumer group"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "set the topic"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("s", "timestamp", true, "set the timestamp[now|currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "cplus", false, "reset c++ client offset"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + String group = commandLine.getOptionValue("g").trim(); + String topic = commandLine.getOptionValue("t").trim(); + String timeStampStr = commandLine.getOptionValue("s").trim(); + long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : 0; + + try { + if (timestamp == 0) { + timestamp = Long.parseLong(timeStampStr); + } + } catch (NumberFormatException e) { + + timestamp = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); + } + + boolean force = true; + if (commandLine.hasOption('f')) { + force = Boolean.valueOf(commandLine.getOptionValue("f").trim()); + } + + boolean isC = false; + if (commandLine.hasOption('c')) { + isC = true; + } + + defaultMQAdminExt.start(); + Map<MessageQueue, Long> offsetTable; + try { + offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC); + } catch (MQClientException e) { + if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { + ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, group, topic, timestamp, force, timeStampStr); + return; + } + throw e; + } + + System.out.printf("rollback consumer offset by specified group[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n", + group, topic, force, timeStampStr, timestamp); + + System.out.printf("%-40s %-40s %-40s%n", + "#brokerName", + "#queueId", + "#offset"); + + Iterator<Map.Entry<MessageQueue, Long>> iterator = offsetTable.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<MessageQueue, Long> entry = iterator.next(); + System.out.printf("%-40s %-40d %-40d%n", + UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(), 32), + entry.getKey().getQueueId(), + entry.getValue()); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java new file mode 100644 index 0000000..3420795 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.offset; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.admin.RollbackStats; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Date; +import java.util.List; + + +/** + * + * @author manhong.yqd + * + */ +public class ResetOffsetByTimeOldCommand implements SubCommand { + @Override + public String commandName() { + return "resetOffsetByTimeOld"; + } + + @Override + public String commandDesc() { + return "Reset consumer offset by timestamp(execute this command required client restart)."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "group", true, "set the consumer group"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "set the topic"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("s", "timestamp", true, "set the timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + String consumerGroup = commandLine.getOptionValue("g").trim(); + String topic = commandLine.getOptionValue("t").trim(); + String timeStampStr = commandLine.getOptionValue("s").trim(); + long timestamp = 0; + try { + timestamp = Long.parseLong(timeStampStr); + } catch (NumberFormatException e) { + + Date date = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS); + if (date != null) { + timestamp = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); + } else { + System.out.printf("specified timestamp invalid.%n"); + return; + } + + boolean force = true; + if (commandLine.hasOption('f')) { + force = Boolean.valueOf(commandLine.getOptionValue("f").trim()); + } + + defaultMQAdminExt.start(); + resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp, force, timeStampStr); + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic, long timestamp, boolean force, + String timeStampStr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + List<RollbackStats> rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); + System.out.printf( + "rollback consumer offset by specified consumerGroup[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n", + consumerGroup, topic, force, timeStampStr, timestamp); + + System.out.printf("%-20s %-20s %-20s %-20s %-20s %-20s%n", + "#brokerName", + "#queueId", + "#brokerOffset", + "#consumerOffset", + "#timestampOffset", + "#rollbackOffset" + ); + + for (RollbackStats rollbackStats : rollbackStatsList) { + System.out.printf("%-20s %-20d %-20d %-20d %-20d %-20d%n", + UtilAll.frontStringAtLeast(rollbackStats.getBrokerName(), 32), + rollbackStats.getQueueId(), + rollbackStats.getBrokerOffset(), + rollbackStats.getConsumerOffset(), + rollbackStats.getTimestampOffset(), + rollbackStats.getRollbackOffset() + ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java new file mode 100644 index 0000000..a57f04a --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.stats; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.admin.ConsumeStats; +import com.alibaba.rocketmq.common.protocol.body.BrokerStatsData; +import com.alibaba.rocketmq.common.protocol.body.GroupList; +import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +public class StatsAllSubCommand implements SubCommand { + @Override + public String commandName() { + return "statsAll"; + } + + @Override + public String commandDesc() { + return "Topic and Consumer tps stats"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("a", "activeTopic", false, "print active topic only"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "print select topic only"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + + System.out.printf("%-32s %-32s %12s %11s %11s %14s %14s%n", + "#Topic", + "#Consumer Group", + "#Accumulation", + "#InTPS", + "#OutTPS", + "#InMsg24Hour", + "#OutMsg24Hour" + ); + + boolean activeTopic = commandLine.hasOption('a'); + String selectTopic = commandLine.getOptionValue('t'); + + for (String topic : topicList.getTopicList()) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) { + continue; + } + + if (selectTopic != null && selectTopic != "" && !topic.equals(selectTopic)) { + continue; + } + + try { + printTopicDetail(defaultMQAdminExt, topic, activeTopic); + } catch (Exception e) { + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + public static void printTopicDetail(final DefaultMQAdminExt admin, final String topic, final boolean activeTopic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic); + + GroupList groupList = admin.queryTopicConsumeByWho(topic); + + double inTPS = 0; + + long inMsgCntToday = 0; + + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + try { + BrokerStatsData bsd = admin.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic); + inTPS += bsd.getStatsMinute().getTps(); + inMsgCntToday += compute24HourSum(bsd); + } catch (Exception e) { + } + } + } + + if (groupList != null && !groupList.getGroupList().isEmpty()) { + + for (String group : groupList.getGroupList()) { + double outTPS = 0; + long outMsgCntToday = 0; + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + try { + String statsKey = String.format("%s@%s", topic, group); + BrokerStatsData bsd = admin.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey); + outTPS += bsd.getStatsMinute().getTps(); + outMsgCntToday += compute24HourSum(bsd); + } catch (Exception e) { + } + } + } + + long accumulate = 0; + try { + ConsumeStats consumeStats = admin.examineConsumeStats(group, topic); + if (consumeStats != null) { + accumulate = consumeStats.computeTotalDiff(); + if (accumulate < 0) { + accumulate = 0; + } + } + } catch (Exception e) { + } + + if (!activeTopic || (inMsgCntToday > 0) || + (outMsgCntToday > 0)) { + + System.out.printf("%-32s %-32s %12d %11.2f %11.2f %14d %14d%n", + UtilAll.frontStringAtLeast(topic, 32), + UtilAll.frontStringAtLeast(group, 32), + accumulate, + inTPS, + outTPS, + inMsgCntToday, + outMsgCntToday + ); + } + } + } else { + if (!activeTopic || (inMsgCntToday > 0)) { + + System.out.printf("%-32s %-32s %12d %11.2f %11s %14d %14s%n", + UtilAll.frontStringAtLeast(topic, 32), + "", + 0, + inTPS, + "", + inMsgCntToday, + "NO_CONSUMER" + ); + } + } + } + + public static long compute24HourSum(BrokerStatsData bsd) { + if (bsd.getStatsDay().getSum() != 0) { + return bsd.getStatsDay().getSum(); + } + + if (bsd.getStatsHour().getSum() != 0) { + return bsd.getStatsHour().getSum(); + } + + if (bsd.getStatsMinute().getSum() != 0) { + return bsd.getStatsMinute().getSum(); + } + + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/AllocateMQSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/AllocateMQSubCommand.java new file mode 100644 index 0000000..35e5d3b --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/AllocateMQSubCommand.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + + +public class AllocateMQSubCommand implements SubCommand { + @Override + public String commandName() { + return "allocateMQ"; + } + + + @Override + public String commandDesc() { + return "Allocate MQ"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("i", "ipList", true, "ipList"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); + adminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + adminExt.start(); + + String topic = commandLine.getOptionValue('t').trim(); + String ips = commandLine.getOptionValue('i').trim(); + final String[] split = ips.split(","); + final List<String> ipList = new LinkedList<String>(); + for (String ip : split) { + ipList.add(ip); + } + + final TopicRouteData topicRouteData = adminExt.examineTopicRouteInfo(topic); + final Set<MessageQueue> mqs = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData); + + final AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely(); + + + RebalanceResult rr = new RebalanceResult(); + + for (String i : ipList) { + final List<MessageQueue> mqResult = averagely.allocate("aa", i, new ArrayList<MessageQueue>(mqs), ipList); + rr.getResult().put(i, mqResult); + } + + final String json = RemotingSerializable.toJson(rr, false); + System.out.printf("%s%n", json); + } catch (Exception e) { + e.printStackTrace(); + } finally { + adminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/DeleteTopicSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/DeleteTopicSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/DeleteTopicSubCommand.java new file mode 100644 index 0000000..1832de6 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/DeleteTopicSubCommand.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.CommandUtil; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + + +/** + * + * @author lansheng.zj + * + */ +public class DeleteTopicSubCommand implements SubCommand { + @Override + public String commandName() { + return "deleteTopic"; + } + + + @Override + public String commandDesc() { + return "Delete topic from broker and NameServer."; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "delete topic from which cluster"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + + public static void deleteTopic(final DefaultMQAdminExt adminExt, + final String clusterName, + final String topic + ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + + Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); + adminExt.deleteTopicInBroker(masterSet, topic); + System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName); + + + Set<String> nameServerSet = null; + if (adminExt.getNamesrvAddr() != null) { + String[] ns = adminExt.getNamesrvAddr().trim().split(";"); + nameServerSet = new HashSet(Arrays.asList(ns)); + } + + + adminExt.deleteTopicInNameServer(nameServerSet, topic); + System.out.printf("delete topic [%s] from NameServer success.%n", topic); + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); + adminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + String topic = commandLine.getOptionValue('t').trim(); + + if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + adminExt.start(); + deleteTopic(adminExt, clusterName, topic); + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + e.printStackTrace(); + } finally { + adminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/RebalanceResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/RebalanceResult.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/RebalanceResult.java new file mode 100644 index 0000000..478413e --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/RebalanceResult.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RebalanceResult { + private Map<String/*ip*/, List<MessageQueue>> result = new HashMap<String, List<MessageQueue>>(); + + public Map<String, List<MessageQueue>> getResult() { + return result; + } + + public void setResult(final Map<String, List<MessageQueue>> result) { + this.result = result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicClusterSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicClusterSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicClusterSubCommand.java new file mode 100644 index 0000000..9954305 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicClusterSubCommand.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Set; + + +/** + * + * @author zhouli + * + */ +public class TopicClusterSubCommand implements SubCommand { + + @Override + public String commandName() { + return "topicClusterList"; + } + + + @Override + public String commandDesc() { + return "get cluster info for topic"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + String topic = commandLine.getOptionValue('t').trim(); + try { + defaultMQAdminExt.start(); + Set<String> clusters = defaultMQAdminExt.getTopicClusterList(topic); + for (String value : clusters) { + System.out.printf("%s%n", value); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicListSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicListSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicListSubCommand.java new file mode 100644 index 0000000..9224d65 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicListSubCommand.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; +import com.alibaba.rocketmq.common.protocol.body.GroupList; +import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; + + +/** + * + * @author shijia.wxr + * + */ +public class TopicListSubCommand implements SubCommand { + + @Override + public String commandName() { + return "topicList"; + } + + @Override + public String commandDesc() { + return "Fetch all topic list from name server"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "clusterModel", false, "clusterModel"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + if (commandLine.hasOption('c')) { + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + + System.out.printf("%-20s %-48s %-48s%n", + "#Cluster Name", + "#Topic", + "#Consumer Group" + ); + + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + for (String topic : topicList.getTopicList()) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) + || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) { + continue; + } + + String clusterName = ""; + GroupList groupList = new GroupList(); + + try { + clusterName = + this.findTopicBelongToWhichCluster(topic, clusterInfo, defaultMQAdminExt); + groupList = defaultMQAdminExt.queryTopicConsumeByWho(topic); + } catch (Exception e) { + } + + if (null == groupList || groupList.getGroupList().isEmpty()) { + groupList = new GroupList(); + groupList.getGroupList().add(""); + } + + for (String group : groupList.getGroupList()) { + System.out.printf("%-20s %-48s %-48s%n", + UtilAll.frontStringAtLeast(clusterName, 20), + UtilAll.frontStringAtLeast(topic, 48), + UtilAll.frontStringAtLeast(group, 48) + ); + } + } + } else { + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + for (String topic : topicList.getTopicList()) { + System.out.printf("%s%n", topic); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private String findTopicBelongToWhichCluster(final String topic, final ClusterInfo clusterInfo, + final DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQClientException, + InterruptedException { + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + + BrokerData brokerData = topicRouteData.getBrokerDatas().get(0); + + String brokerName = brokerData.getBrokerName(); + + Iterator<Entry<String, Set<String>>> it = clusterInfo.getClusterAddrTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<String, Set<String>> next = it.next(); + if (next.getValue().contains(brokerName)) { + return next.getKey(); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicRouteSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicRouteSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicRouteSubCommand.java new file mode 100644 index 0000000..d1d6d28 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicRouteSubCommand.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * + * @author shijia.wxr + * + */ +public class TopicRouteSubCommand implements SubCommand { + + @Override + public String commandName() { + return "topicRoute"; + } + + + @Override + public String commandDesc() { + return "Examine topic route info"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + String topic = commandLine.getOptionValue('t').trim(); + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + String json = topicRouteData.toJson(true); + System.out.printf("%s%n", json); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicStatusSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicStatusSubCommand.java new file mode 100644 index 0000000..685dbea --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicStatusSubCommand.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.admin.TopicOffset; +import com.alibaba.rocketmq.common.admin.TopicStatsTable; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + + +/** + * + * @author shijia.wxr + * + */ +public class TopicStatusSubCommand implements SubCommand { + + @Override + public String commandName() { + return "topicStatus"; + } + + + @Override + public String commandDesc() { + return "Examine topic Status info"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + String topic = commandLine.getOptionValue('t').trim(); + TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic); + + List<MessageQueue> mqList = new LinkedList<MessageQueue>(); + mqList.addAll(topicStatsTable.getOffsetTable().keySet()); + Collections.sort(mqList); + + System.out.printf("%-32s %-4s %-20s %-20s %s%n", + "#Broker Name", + "#QID", + "#Min Offset", + "#Max Offset", + "#Last Updated" + ); + + for (MessageQueue mq : mqList) { + TopicOffset topicOffset = topicStatsTable.getOffsetTable().get(mq); + + String humanTimestamp = ""; + if (topicOffset.getLastUpdateTimestamp() > 0) { + humanTimestamp = UtilAll.timeMillisToHumanString2(topicOffset.getLastUpdateTimestamp()); + } + + System.out.printf("%-32s %-4d %-20d %-20d %s%n", + UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), + mq.getQueueId(), + topicOffset.getMinOffset(), + topicOffset.getMaxOffset(), + humanTimestamp + ); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateOrderConfCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateOrderConfCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateOrderConfCommand.java new file mode 100644 index 0000000..164579f --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateOrderConfCommand.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.namesrv.NamesrvUtil; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * + * @author manhong.yqd + * + */ +public class UpdateOrderConfCommand implements SubCommand { + + @Override + public String commandName() { + return "updateOrderConf"; + } + + + @Override + public String commandDesc() { + return "Create or update or delete order conf"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("v", "orderConf", true, "set order conf [eg. brokerName1:num;brokerName2:num]"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "method", true, "option type [eg. put|get|delete"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + String topic = commandLine.getOptionValue('t').trim(); + String type = commandLine.getOptionValue('m').trim(); + + if ("get".equals(type)) { + + defaultMQAdminExt.start(); + String orderConf = + defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic); + System.out.printf("get orderConf success. topic=[%s], orderConf=[%s] ", topic, orderConf); + + return; + } else if ("put".equals(type)) { + + defaultMQAdminExt.start(); + String orderConf = ""; + if (commandLine.hasOption('v')) { + orderConf = commandLine.getOptionValue('v').trim(); + } + if (UtilAll.isBlank(orderConf)) { + throw new Exception("please set orderConf with option -v."); + } + + defaultMQAdminExt.createOrUpdateOrderConf(topic, orderConf, true); + System.out.printf("update orderConf success. topic=[%s], orderConf=[%s]", topic, + orderConf.toString()); + return; + } else if ("delete".equals(type)) { + + defaultMQAdminExt.start(); + defaultMQAdminExt.deleteKvConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic); + System.out.printf("delete orderConf success. topic=[%s]", topic); + + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java new file mode 100644 index 0000000..1938934 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.protocol.route.QueueData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.CommandUtil; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.List; +import java.util.Set; + + +public class UpdateTopicPermSubCommand implements SubCommand { + + @Override + public String commandName() { + return "updateTopicPerm"; + } + + + @Override + public String commandDesc() { + return "Update topic perm"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "create topic to which cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:R; 4:W; 6:RW]"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + defaultMQAdminExt.start(); + TopicConfig topicConfig = new TopicConfig(); + + String topic = commandLine.getOptionValue('t').trim(); + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + assert topicRouteData != null; + List<QueueData> queueDatas = topicRouteData.getQueueDatas(); + assert queueDatas != null && queueDatas.size() > 0; + + QueueData queueData = queueDatas.get(0); + topicConfig.setTopicName(topic); + topicConfig.setWriteQueueNums(queueData.getWriteQueueNums()); + topicConfig.setReadQueueNums(queueData.getReadQueueNums()); + topicConfig.setPerm(queueData.getPerm()); + topicConfig.setTopicSysFlag(queueData.getTopicSynFlag()); + + //new perm + int perm = Integer.parseInt(commandLine.getOptionValue('p').trim()); + int oldPerm = topicConfig.getPerm(); + if (perm == oldPerm) { + System.out.printf("new perm equals to the old one!%n"); + return; + } + topicConfig.setPerm(perm); + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, addr); + System.out.printf("%s%n", topicConfig); + return; + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + Set<String> masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : masterSet) { + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, addr); + } + return; + } + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicSubCommand.java new file mode 100644 index 0000000..c33018f --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicSubCommand.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.topic; + +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.CommandUtil; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class UpdateTopicSubCommand implements SubCommand { + + @Override + public String commandName() { + return "updateTopic"; + } + + + @Override + public String commandDesc() { + return "Update or create topic"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "create topic to which cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("r", "readQueueNums", true, "set read queue nums"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("w", "writeQueueNums", true, "set write queue nums"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("o", "order", true, "set topic's order(true|false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("u", "unit", true, "is unit topic (true|false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setReadQueueNums(8); + topicConfig.setWriteQueueNums(8); + topicConfig.setTopicName(commandLine.getOptionValue('t').trim()); + + // readQueueNums + if (commandLine.hasOption('r')) { + topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim())); + } + + // writeQueueNums + if (commandLine.hasOption('w')) { + topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim())); + } + + // perm + if (commandLine.hasOption('p')) { + topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim())); + } + + boolean isUnit = false; + if (commandLine.hasOption('u')) { + isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim()); + } + + boolean isCenterSync = false; + if (commandLine.hasOption('s')) { + isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim()); + } + + int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync); + topicConfig.setTopicSysFlag(topicCenterSync); + + boolean isOrder = false; + if (commandLine.hasOption('o')) { + isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim()); + } + topicConfig.setOrder(isOrder); + + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + + defaultMQAdminExt.start(); + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + + if (isOrder) { + String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr); + String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums(); + defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false); + System.out.printf(String.format("set broker orderConf. isOrder=%s, orderConf=[%s]", + isOrder, orderConf.toString())); + } + System.out.printf("create topic to %s success.%n", addr); + System.out.printf("%s", topicConfig); + return; + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + + Set<String> masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : masterSet) { + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + System.out.printf("create topic to %s success.%n", addr); + } + + if (isOrder) { + Set<String> brokerNameSet = + CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName); + StringBuilder orderConf = new StringBuilder(); + String splitor = ""; + for (String s : brokerNameSet) { + orderConf.append(splitor).append(s).append(":") + .append(topicConfig.getWriteQueueNums()); + splitor = ";"; + } + defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), + orderConf.toString(), true); + System.out.printf(String.format("set cluster orderConf. isOrder=%s, orderConf=[%s]", + isOrder, orderConf.toString())); + } + + System.out.printf("%s", topicConfig); + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +}
