http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java new file mode 100644 index 0000000..8e4bba8 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.consumer; + +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.command.SubCommand; +import com.alibaba.rocketmq.tools.monitor.DefaultMonitorListener; +import com.alibaba.rocketmq.tools.monitor.MonitorConfig; +import com.alibaba.rocketmq.tools.monitor.MonitorService; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; + + +/** + * @author shijia.wxr + */ +public class StartMonitoringSubCommand implements SubCommand { + private final Logger log = ClientLogger.getLog(); + + + @Override + public String commandName() { + return "startMonitoring"; + } + + + @Override + public String commandDesc() { + return "Start Monitoring"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + try { + MonitorService monitorService = + new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook); + + monitorService.start(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java new file mode 100644 index 0000000..683e0fe --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.consumer; + +import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.CommandUtil; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class UpdateSubGroupSubCommand implements SubCommand { + + @Override + public String commandName() { + return "updateSubGroup"; + } + + + @Override + public String commandDesc() { + return "Update or create subscription group"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "create subscription group to which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "create subscription group to which cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "groupName", true, "consumer group name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("s", "consumeEnable", true, "consume enable"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "consumeFromMinEnable", true, "from min offset"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("d", "consumeBroadcastEnable", true, "broadcast"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("q", "retryQueueNums", true, "retry queue nums"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("r", "retryMaxTimes", true, "retry max times"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("i", "brokerId", true, "consumer from which broker id"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("w", "whichBrokerWhenConsumeSlowly", true, "which broker id when consume slowly"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("a", "notifyConsumerIdsChanged", true, "notify consumerId changed"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setConsumeBroadcastEnable(false); + subscriptionGroupConfig.setConsumeFromMinEnable(false); + + // groupName + subscriptionGroupConfig.setGroupName(commandLine.getOptionValue('g').trim()); + + // consumeEnable + if (commandLine.hasOption('s')) { + subscriptionGroupConfig.setConsumeEnable(Boolean.parseBoolean(commandLine.getOptionValue('s') + .trim())); + } + + // consumeFromMinEnable + if (commandLine.hasOption('m')) { + subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.parseBoolean(commandLine + .getOptionValue('m').trim())); + } + + // consumeBroadcastEnable + if (commandLine.hasOption('d')) { + subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.parseBoolean(commandLine + .getOptionValue('d').trim())); + } + + // retryQueueNums + if (commandLine.hasOption('q')) { + subscriptionGroupConfig.setRetryQueueNums(Integer.parseInt(commandLine.getOptionValue('q') + .trim())); + } + + // retryMaxTimes + if (commandLine.hasOption('r')) { + subscriptionGroupConfig.setRetryMaxTimes(Integer.parseInt(commandLine.getOptionValue('r') + .trim())); + } + + // brokerId + if (commandLine.hasOption('i')) { + subscriptionGroupConfig.setBrokerId(Long.parseLong(commandLine.getOptionValue('i').trim())); + } + + // whichBrokerWhenConsumeSlowly + if (commandLine.hasOption('w')) { + subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(Long.parseLong(commandLine + .getOptionValue('w').trim())); + } + + // notifyConsumerIdsChanged + if (commandLine.hasOption('a')) { + subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(Boolean.parseBoolean(commandLine + .getOptionValue('a').trim())); + } + + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + + defaultMQAdminExt.start(); + + defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig); + System.out.printf("create subscription group to %s success.%n", addr); + System.out.printf("%s", subscriptionGroupConfig); + return; + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + Set<String> masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : masterSet) { + try { + defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig); + System.out.printf("create subscription group to %s success.%n", addr); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000 * 1); + } + } + System.out.printf("%s", subscriptionGroupConfig); + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/CheckMsgSendRTCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/CheckMsgSendRTCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/CheckMsgSendRTCommand.java new file mode 100644 index 0000000..58c2fbf --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/CheckMsgSendRTCommand.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.MessageQueueSelector; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.List; + +public class CheckMsgSendRTCommand implements SubCommand { + private static String brokerName = ""; + private static int queueId = 0; + + @Override + public String commandName() { + return "checkMsgSendRT"; + } + + @Override + public String commandDesc() { + return "check message send response time"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("a", "amout", true, "message amout | default 100"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "size", true, "message size | default 128 Byte"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQProducer producer = new DefaultMQProducer(rpcHook); + producer.setProducerGroup(Long.toString(System.currentTimeMillis())); + + try { + producer.start(); + long start = 0; + long end = 0; + long timeElapsed = 0; + boolean sendSuccess = false; + String topic = commandLine.getOptionValue('t').trim(); + long amount = !commandLine.hasOption('a') ? 100 : Long.parseLong(commandLine + .getOptionValue('a').trim()); + long msgSize = !commandLine.hasOption('s') ? 128 : Long.parseLong(commandLine + .getOptionValue('s').trim()); + Message msg = new Message(topic, getStringBySize(msgSize).getBytes(MixAll.DEFAULT_CHARSET)); + + System.out.printf("%-32s %-4s %-20s %s%n", + "#Broker Name", + "#QID", + "#Send Result", + "#RT" + ); + for (int i = 0; i < amount; i++) { + start = System.currentTimeMillis(); + try { + producer.send(msg, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + int queueIndex = (Integer) arg % mqs.size(); + MessageQueue queue = mqs.get(queueIndex); + brokerName = queue.getBrokerName(); + queueId = queue.getQueueId(); + return queue; + } + }, i); + sendSuccess = true; + end = System.currentTimeMillis(); + } catch (Exception e) { + sendSuccess = false; + end = System.currentTimeMillis(); + } + + + if (i != 0) { + timeElapsed += end - start; + } + + System.out.printf("%-32s %-4s %-20s %s%n", + brokerName, + queueId, + sendSuccess, + end - start + ); + } + + double rt = (double) timeElapsed / (amount - 1); + System.out.printf("Avg RT: %s%n", String.format("%.2f", rt)); + } catch (Exception e) { + e.printStackTrace(); + } finally { + producer.shutdown(); + } + } + + public String getStringBySize(long size) { + StringBuilder res = new StringBuilder(); + for (int i = 0; i < size; i++) { + res.append('a'); + } + return res.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/DecodeMessageIdCommond.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/DecodeMessageIdCommond.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/DecodeMessageIdCommond.java new file mode 100644 index 0000000..bd5eb7e --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/DecodeMessageIdCommond.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.MessageClientIDSetter; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +public class DecodeMessageIdCommond implements SubCommand { + @Override + public String commandName() { + return "DecodeMessageId"; + } + + @Override + public String commandDesc() { + return "decode unique message ID"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("i", "messageId", true, "unique message ID"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + String messageId = commandLine.getOptionValue('i').trim(); + + try { + System.out.printf("ip=" + MessageClientIDSetter.getIPStrFromID(messageId)); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + String date = UtilAll.formatDate(MessageClientIDSetter.getNearlyTimeFromID(messageId), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS); + System.out.printf("date=" + date); + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/PrintMessageByQueueCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/PrintMessageByQueueCommand.java new file mode 100644 index 0000000..956a360 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/PrintMessageByQueueCommand.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class PrintMessageByQueueCommand implements SubCommand { + + @Override + public String commandName() { + return "printMsgByQueue"; + } + + + @Override + public String commandDesc() { + return "Print Message Detail"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("a", "brokerName ", true, "broker name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("i", "queueId ", true, "queue id"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("c", "charsetName ", true, "CharsetName(eg: UTF-8,GBK)"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "subExpression ", true, "Subscribe Expression(eg: TagA || TagB)"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "beginTimestamp ", true, "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("e", "endTimestamp ", true, "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "print msg", true, "print msg. eg: true | false(default)"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("d", "printBody ", true, "print body. eg: true | false(default)"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("f", "calculate", true, "calculate by tag. eg: true | false(default)"); + opt.setRequired(false); + options.addOption(opt); + + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + + try { + String charsetName = + !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); + boolean printMsg = + !commandLine.hasOption('p') ? false : Boolean.parseBoolean(commandLine.getOptionValue('p').trim()); + boolean printBody = + !commandLine.hasOption('d') ? false : Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); + boolean calByTag = + !commandLine.hasOption('f') ? false : Boolean.parseBoolean(commandLine.getOptionValue('f').trim()); + String subExpression = + !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); + + String topic = commandLine.getOptionValue('t').trim(); + String brokerName = commandLine.getOptionValue('a').trim(); + int queueId = Integer.parseInt(commandLine.getOptionValue('i').trim()); + consumer.start(); + + MessageQueue mq = new MessageQueue(topic, brokerName, queueId); + long minOffset = consumer.minOffset(mq); + long maxOffset = consumer.maxOffset(mq); + + if (commandLine.hasOption('b')) { + String timestampStr = commandLine.getOptionValue('b').trim(); + long timeValue = timestampFormat(timestampStr); + minOffset = consumer.searchOffset(mq, timeValue); + } + + if (commandLine.hasOption('e')) { + String timestampStr = commandLine.getOptionValue('e').trim(); + long timeValue = timestampFormat(timestampStr); + maxOffset = consumer.searchOffset(mq, timeValue); + } + + final Map<String, AtomicLong> tagCalmap = new HashMap<String, AtomicLong>(); + READQ: + for (long offset = minOffset; offset < maxOffset; ) { + try { + PullResult pullResult = consumer.pull(mq, subExpression, offset, 32); + offset = pullResult.getNextBeginOffset(); + switch (pullResult.getPullStatus()) { + case FOUND: + calculateByTag(pullResult.getMsgFoundList(), tagCalmap, calByTag); + printMessage(pullResult.getMsgFoundList(), charsetName, printMsg, printBody); + break; + case NO_MATCHED_MSG: + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + break READQ; + } + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + + printCalculateByTag(tagCalmap, calByTag); + } catch (Exception e) { + e.printStackTrace(); + } finally { + consumer.shutdown(); + } + } + + public static long timestampFormat(final String value) { + long timestamp = 0; + try { + timestamp = Long.parseLong(value); + } catch (NumberFormatException e) { + + timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); + } + + return timestamp; + } + + + private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap, final boolean calByTag) { + if (!calByTag) + return; + + for (MessageExt msg : msgs) { + String tag = msg.getTags(); + if (StringUtils.isNotBlank(tag)) { + AtomicLong count = tagCalmap.get(tag); + if (count == null) { + count = new AtomicLong(); + tagCalmap.put(tag, count); + } + count.incrementAndGet(); + } + } + } + + private static void printCalculateByTag(final Map<String, AtomicLong> tagCalmap, final boolean calByTag) { + if (!calByTag) + return; + + List<TagCountBean> list = new ArrayList<TagCountBean>(); + for (Map.Entry<String, AtomicLong> entry : tagCalmap.entrySet()) { + TagCountBean tagBean = new TagCountBean(entry.getKey(), entry.getValue()); + list.add(tagBean); + } + Collections.sort(list); + + for (TagCountBean tagCountBean : list) { + System.out.printf("Tag: %-30s Count: %s%n", tagCountBean.getTag(), tagCountBean.getCount()); + } + } + + public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg, boolean printBody) { + if (!printMsg) + return; + + for (MessageExt msg : msgs) { + try { + System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(), + printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY"); + } catch (UnsupportedEncodingException e) { + } + } + } + + static class TagCountBean implements Comparable<TagCountBean> { + private String tag; + private AtomicLong count; + + + public TagCountBean(final String tag, final AtomicLong count) { + this.tag = tag; + this.count = count; + } + + public String getTag() { + return tag; + } + + public void setTag(final String tag) { + this.tag = tag; + } + + public AtomicLong getCount() { + return count; + } + + public void setCount(final AtomicLong count) { + this.count = count; + } + + + @Override + public int compareTo(final TagCountBean o) { + return (int) (o.getCount().get() - this.count.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/PrintMessageSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/PrintMessageSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/PrintMessageSubCommand.java new file mode 100644 index 0000000..ab908d4 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/PrintMessageSubCommand.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class PrintMessageSubCommand implements SubCommand { + + @Override + public String commandName() { + return "printMsg"; + } + + + @Override + public String commandDesc() { + return "Print Message Detail"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("c", "charsetName ", true, "CharsetName(eg: UTF-8,GBK)"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "subExpression ", true, "Subscribe Expression(eg: TagA || TagB)"); + opt.setRequired(false); + options.addOption(opt); + + opt = + new Option("b", "beginTimestamp ", true, + "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + opt.setRequired(false); + options.addOption(opt); + + opt = + new Option("e", "endTimestamp ", true, + "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + opt.setRequired(false); + options.addOption(opt); + + opt = + new Option("d", "printBody ", true, + "print body"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + + try { + String topic = commandLine.getOptionValue('t').trim(); + + String charsetName = // + !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); + + String subExpression = // + !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); + + boolean printBody = // + !commandLine.hasOption('d') ? true : Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); + + consumer.start(); + + Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic); + for (MessageQueue mq : mqs) { + long minOffset = consumer.minOffset(mq); + long maxOffset = consumer.maxOffset(mq); + + if (commandLine.hasOption('b')) { + String timestampStr = commandLine.getOptionValue('b').trim(); + long timeValue = timestampFormat(timestampStr); + minOffset = consumer.searchOffset(mq, timeValue); + } + + if (commandLine.hasOption('e')) { + String timestampStr = commandLine.getOptionValue('e').trim(); + long timeValue = timestampFormat(timestampStr); + maxOffset = consumer.searchOffset(mq, timeValue); + } + + System.out.printf("minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", " + mq); + + READQ: + for (long offset = minOffset; offset < maxOffset; ) { + try { + PullResult pullResult = consumer.pull(mq, subExpression, offset, 32); + offset = pullResult.getNextBeginOffset(); + switch (pullResult.getPullStatus()) { + case FOUND: + printMessage(pullResult.getMsgFoundList(), charsetName, printBody); + break; + case NO_MATCHED_MSG: + System.out.printf(mq + " no matched msg. status=" + pullResult.getPullStatus() + ", offset=" + offset); + break; + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + System.out.printf(mq + " print msg finished. status=" + pullResult.getPullStatus() + ", offset=" + offset); + break READQ; + } + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + consumer.shutdown(); + } + } + + public static long timestampFormat(final String value) { + long timestamp = 0; + try { + timestamp = Long.parseLong(value); + } catch (NumberFormatException e) { + timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); + } + + return timestamp; + } + + public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printBody) { + for (MessageExt msg : msgs) { + try { + System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(), + printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY"); + } catch (UnsupportedEncodingException e) { + // + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java new file mode 100644 index 0000000..a6bf0bd --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.MessageClientExt; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.admin.api.MessageTrack; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class QueryMsgByIdSubCommand implements SubCommand { + @Override + public String commandName() { + return "queryMsgById"; + } + + @Override + public String commandDesc() { + return "Query Message by Id"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("i", "msgId", true, "Message Id"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("g", "consumerGroup", true, "consumer group name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("d", "clientId", true, "The consumer's client id"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "sendMessage", true, "resend message"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("u", "unitName", true, "unit name"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById"); + defaultMQProducer.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + if (commandLine.hasOption('s')) { + if (commandLine.hasOption('u')) { + String unitName = commandLine.getOptionValue('u').trim(); + defaultMQProducer.setUnitName(unitName); + } + defaultMQProducer.start(); + } + + final String msgIds = commandLine.getOptionValue('i').trim(); + final String[] msgIdArr = StringUtils.split(msgIds, ","); + + if (commandLine.hasOption('g') && commandLine.hasOption('d')) { + final String consumerGroup = commandLine.getOptionValue('g').trim(); + final String clientId = commandLine.getOptionValue('d').trim(); + for (String msgId : msgIdArr) { + if (StringUtils.isNotBlank(msgId)) { + pushMsg(defaultMQAdminExt, consumerGroup, clientId, msgId.trim()); + } + } + } else if (commandLine.hasOption('s')) { + boolean resend = Boolean.parseBoolean(commandLine.getOptionValue('s', "false").trim()); + if (resend) { + for (String msgId : msgIdArr) { + if (StringUtils.isNotBlank(msgId)) { + sendMsg(defaultMQAdminExt, defaultMQProducer, msgId.trim()); + } + } + } + } else { + for (String msgId : msgIdArr) { + if (StringUtils.isNotBlank(msgId)) { + queryById(defaultMQAdminExt, msgId.trim()); + } + } + + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQProducer.shutdown(); + defaultMQAdminExt.shutdown(); + } + } + + private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId, final String msgId) { + try { + ConsumeMessageDirectlyResult result = + defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId); + System.out.printf("%s", result); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final DefaultMQProducer defaultMQProducer, final String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + MessageExt msg = defaultMQAdminExt.viewMessage(msgId); + if (msg != null) { + // resend msg by id + System.out.printf("prepare resend msg. originalMsgId=" + msgId); + SendResult result = defaultMQProducer.send(msg); + System.out.printf("%s", result); + } else { + System.out.printf("no message. msgId=" + msgId); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void queryById(final DefaultMQAdminExt admin, final String msgId) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException, IOException { + MessageExt msg = admin.viewMessage(msgId); + + printMsg(admin, msg); + } + + public static void printMsg(final DefaultMQAdminExt admin, final MessageExt msg) throws IOException { + if (msg == null) { + System.out.printf("%nMessage not found!"); + return; + } + + String bodyTmpFilePath = createBodyFile(msg); + String msgId = msg.getMsgId(); + if (msg instanceof MessageClientExt) { + msgId = ((MessageClientExt) msg).getOffsetMsgId(); + } + + System.out.printf("%-20s %s%n", + "OffsetID:", + msgId + ); + + System.out.printf("%-20s %s%n", + "OffsetID:", + msgId + ); + + System.out.printf("%-20s %s%n", + "Topic:", + msg.getTopic() + ); + + System.out.printf("%-20s %s%n", + "Tags:", + "[" + msg.getTags() + "]" + ); + + System.out.printf("%-20s %s%n", + "Keys:", + "[" + msg.getKeys() + "]" + ); + + System.out.printf("%-20s %d%n", + "Queue ID:", + msg.getQueueId() + ); + + System.out.printf("%-20s %d%n", + "Queue Offset:", + msg.getQueueOffset() + ); + + System.out.printf("%-20s %d%n", + "CommitLog Offset:", + msg.getCommitLogOffset() + ); + + System.out.printf("%-20s %d%n", + "Reconsume Times:", + msg.getReconsumeTimes() + ); + + System.out.printf("%-20s %s%n", + "Born Timestamp:", + UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()) + ); + + System.out.printf("%-20s %s%n", + "Store Timestamp:", + UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()) + ); + + System.out.printf("%-20s %s%n", + "Born Host:", + RemotingHelper.parseSocketAddressAddr(msg.getBornHost()) + ); + + System.out.printf("%-20s %s%n", + "Store Host:", + RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()) + ); + + System.out.printf("%-20s %d%n", + "System Flag:", + msg.getSysFlag() + ); + + System.out.printf("%-20s %s%n", + "Properties:", + msg.getProperties() != null ? msg.getProperties().toString() : "" + ); + + System.out.printf("%-20s %s%n", + "Message Body Path:", + bodyTmpFilePath + ); + + try { + List<MessageTrack> mtdList = admin.messageTrackDetail(msg); + if (mtdList.isEmpty()) { + System.out.printf("%n%nWARN: No Consumer"); + } else { + System.out.printf("%n%n"); + for (MessageTrack mt : mtdList) { + System.out.printf("%s", mt); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static String createBodyFile(MessageExt msg) throws IOException { + DataOutputStream dos = null; + try { + String bodyTmpFilePath = "/tmp/rocketmq/msgbodys"; + File file = new File(bodyTmpFilePath); + if (!file.exists()) { + file.mkdirs(); + } + bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId(); + dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath)); + dos.write(msg.getBody()); + return bodyTmpFilePath; + } finally { + if (dos != null) + dos.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java new file mode 100644 index 0000000..abc1cb1 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.client.QueryResult; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * + * @author shijia.wxr + * + */ +public class QueryMsgByKeySubCommand implements SubCommand { + + @Override + public String commandName() { + return "queryMsgByKey"; + } + + @Override + public String commandDesc() { + return "Query Message by Key"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("k", "msgKey", true, "Message Key"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + final String topic = commandLine.getOptionValue('t').trim(); + final String key = commandLine.getOptionValue('k').trim(); + + this.queryByKey(defaultMQAdminExt, topic, key); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key) + throws MQClientException, InterruptedException { + admin.start(); + + QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE); + System.out.printf("%-50s %4s %40s%n", + "#Message ID", + "#QID", + "#Offset"); + for (MessageExt msg : queryResult.getMessageList()) { + System.out.printf("%-50s %4d %40d%n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java new file mode 100644 index 0000000..8ccf907 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * @author shijia.wxr + */ +public class QueryMsgByOffsetSubCommand implements SubCommand { + + @Override + public String commandName() { + return "queryMsgByOffset"; + } + + @Override + public String commandDesc() { + return "Query Message by offset"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("b", "brokerName", true, "Broker Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("i", "queueId", true, "Queue Id"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("o", "offset", true, "Queue Offset"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + defaultMQPullConsumer.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + String topic = commandLine.getOptionValue('t').trim(); + String brokerName = commandLine.getOptionValue('b').trim(); + String queueId = commandLine.getOptionValue('i').trim(); + String offset = commandLine.getOptionValue('o').trim(); + + MessageQueue mq = new MessageQueue(); + mq.setTopic(topic); + mq.setBrokerName(brokerName); + mq.setQueueId(Integer.parseInt(queueId)); + + defaultMQPullConsumer.start(); + defaultMQAdminExt.start(); + + PullResult pullResult = defaultMQPullConsumer.pull(mq, "*", Long.parseLong(offset), 1); + if (pullResult != null) { + switch (pullResult.getPullStatus()) { + case FOUND: + QueryMsgByIdSubCommand.printMsg(defaultMQAdminExt, pullResult.getMsgFoundList().get(0)); + break; + case NO_MATCHED_MSG: + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + default: + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQPullConsumer.shutdown(); + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java new file mode 100644 index 0000000..faf49c6 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.admin.api.MessageTrack; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + +public class QueryMsgByUniqueKeySubCommand implements SubCommand { + + @Override + public String commandName() { + return "queryMsgByUniqueKey"; + } + + @Override + public String commandDesc() { + return "Query Message by Unique key"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("i", "msgId", true, "Message Id"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("g", "consumerGroup", true, "consumer group name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("d", "clientId", true, "The consumer's client id"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "The topic of msg"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + + public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException, IOException { + MessageExt msg = admin.viewMessage(topic, msgId); + + + String bodyTmpFilePath = createBodyFile(msg); + + System.out.printf("%-20s %s%n", + "Topic:", + msg.getTopic() + ); + + System.out.printf("%-20s %s%n", + "Tags:", + "[" + msg.getTags() + "]" + ); + + System.out.printf("%-20s %s%n", + "Keys:", + "[" + msg.getKeys() + "]" + ); + + System.out.printf("%-20s %d%n", + "Queue ID:", + msg.getQueueId() + ); + + System.out.printf("%-20s %d%n", + "Queue Offset:", + msg.getQueueOffset() + ); + + System.out.printf("%-20s %d%n", + "CommitLog Offset:", + msg.getCommitLogOffset() + ); + + System.out.printf("%-20s %d%n", + "Reconsume Times:", + msg.getReconsumeTimes() + ); + + System.out.printf("%-20s %s%n", + "Born Timestamp:", + UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()) + ); + + System.out.printf("%-20s %s%n", + "Store Timestamp:", + UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()) + ); + + System.out.printf("%-20s %s%n", + "Born Host:", + RemotingHelper.parseSocketAddressAddr(msg.getBornHost()) + ); + + System.out.printf("%-20s %s%n", + "Store Host:", + RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()) + ); + + System.out.printf("%-20s %d%n", + "System Flag:", + msg.getSysFlag() + ); + + System.out.printf("%-20s %s%n", + "Properties:", + msg.getProperties() != null ? msg.getProperties().toString() : "" + ); + + System.out.printf("%-20s %s%n", + "Message Body Path:", + bodyTmpFilePath + ); + + try { + List<MessageTrack> mtdList = admin.messageTrackDetail(msg); + if (mtdList.isEmpty()) { + System.out.printf("%n%nWARN: No Consumer"); + } else { + System.out.printf("%n%n"); + for (MessageTrack mt : mtdList) { + System.out.printf("%s", mt); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + final String msgId = commandLine.getOptionValue('i').trim(); + final String topic = commandLine.getOptionValue('t').trim(); + if (commandLine.hasOption('g') && commandLine.hasOption('d')) { + final String consumerGroup = commandLine.getOptionValue('g').trim(); + final String clientId = commandLine.getOptionValue('d').trim(); + ConsumeMessageDirectlyResult result = + defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + System.out.printf("%s", result); + } else { + queryById(defaultMQAdminExt, topic, msgId); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + + private static String createBodyFile(MessageExt msg) throws IOException { + DataOutputStream dos = null; + try { + String bodyTmpFilePath = "/tmp/rocketmq/msgbodys"; + File file = new File(bodyTmpFilePath); + if (!file.exists()) { + file.mkdirs(); + } + bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId(); + dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath)); + dos.write(msg.getBody()); + return bodyTmpFilePath; + } finally { + if (dos != null) + dos.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/Store.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/Store.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/Store.java new file mode 100644 index 0000000..2ce29a0 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/Store.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.message; + +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.store.ConsumeQueue; +import com.alibaba.rocketmq.store.MappedFile; +import com.alibaba.rocketmq.store.MappedFileQueue; +import com.alibaba.rocketmq.store.SelectMappedBufferResult; +import com.alibaba.rocketmq.store.config.StorePathConfigHelper; + +import java.io.File; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class Store { + public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; + private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8; + private MappedFileQueue mapedFileQueue; + private ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; + + private String cStorePath; + private int cSize; + private String lStorePath; + private int lSize; + + public Store(String cStorePath, int cSize, String lStorePath, int lSize) { + this.cStorePath = cStorePath; + this.cSize = cSize; + this.lStorePath = lStorePath; + this.lSize = lSize; + mapedFileQueue = new MappedFileQueue(cStorePath, cSize, null); + consumeQueueTable = + new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>(); + } + + public boolean load() { + boolean result = this.mapedFileQueue.load(); + System.out.printf("load commit log " + (result ? "OK" : "Failed")); + if (result) { + result = loadConsumeQueue(); + } + System.out.printf("load logics log " + (result ? "OK" : "Failed")); + return result; + } + + private boolean loadConsumeQueue() { + File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(lStorePath)); + File[] fileTopicList = dirLogic.listFiles(); + if (fileTopicList != null) { + + for (File fileTopic : fileTopicList) { + String topic = fileTopic.getName(); + + File[] fileQueueIdList = fileTopic.listFiles(); + if (fileQueueIdList != null) { + for (File fileQueueId : fileQueueIdList) { + int queueId = Integer.parseInt(fileQueueId.getName()); + ConsumeQueue logic = new ConsumeQueue( + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), + lSize, + null); + this.putConsumeQueue(topic, queueId, logic); + if (!logic.load()) { + return false; + } + } + } + } + } + System.out.printf("load logics queue all over, OK"); + return true; + } + + + private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { + ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic); + if (null == map) { + map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>(); + map.put(queueId, consumeQueue); + this.consumeQueueTable.put(topic, map); + } else { + map.put(queueId, consumeQueue); + } + } + + public void traval(boolean openAll) { + boolean success = true; + byte[] bytesContent = new byte[1024]; + List<MappedFile> mapedFiles = this.mapedFileQueue.getMappedFiles(); + ALL: + for (MappedFile mapedFile : mapedFiles) { + long startOffset = mapedFile.getFileFromOffset(); + int position = 0; + int msgCount = 0; + int errorCount = 0; + + System.out.printf("start travel " + mapedFile.getFileName()); + long startTime = System.currentTimeMillis(); + ByteBuffer byteBuffer = mapedFile.sliceByteBuffer(); + while (byteBuffer.hasRemaining()) { + // 1 TOTALSIZE + int totalSize = byteBuffer.getInt(); + // 2 MAGICCODE + int magicCode = byteBuffer.getInt(); + if (BLANK_MAGIC_CODE == magicCode) { + position = byteBuffer.limit(); + break; + } + // 3 BODYCRC + int bodyCRC = byteBuffer.getInt(); + + // 4 QUEUEID + int queueId = byteBuffer.getInt(); + + // 5 FLAG + int flag = byteBuffer.getInt(); + flag = flag + 0; + + // 6 QUEUEOFFSET + long queueOffset = byteBuffer.getLong(); + + // 7 PHYSICALOFFSET + long physicOffset = byteBuffer.getLong(); + + // 8 SYSFLAG + int sysFlag = byteBuffer.getInt(); + + // 9 BORNTIMESTAMP + long bornTimeStamp = byteBuffer.getLong(); + bornTimeStamp = bornTimeStamp + 0; + + // 10 BORNHOST(IP+PORT) + byteBuffer.position(byteBuffer.position() + 8); + + // 11 STORETIMESTAMP + long storeTimestamp = byteBuffer.getLong(); + + // 12 STOREHOST(IP+PORT) + byteBuffer.position(byteBuffer.position() + 8); + + // 13 RECONSUMETIMES + int reconsumeTimes = byteBuffer.getInt(); + + // 14 Prepared Transaction Offset + long preparedTransactionOffset = byteBuffer.getLong(); + + // 15 BODY + int bodyLen = byteBuffer.getInt(); + if (bodyLen > 0) { + byteBuffer.position(byteBuffer.position() + bodyLen); + } + + // 16 TOPIC + byte topicLen = byteBuffer.get(); + byteBuffer.get(bytesContent, 0, topicLen); + String topic = null; + try { + topic = new String(bytesContent, 0, topicLen, MixAll.DEFAULT_CHARSET); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + + + Date storeTime = new Date(storeTimestamp); + + + long currentPhyOffset = startOffset + position; + if (physicOffset != currentPhyOffset) { + System.out.printf(storeTime + + " [fetal error] physicOffset != currentPhyOffset. position=" + position + + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset + + ", currentPhyOffset=" + currentPhyOffset); + errorCount++; + if (!openAll) { + success = false; + break ALL; + } + } + + ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); + SelectMappedBufferResult smb = consumeQueue.getIndexBuffer(queueOffset); + try { + long offsetPy = smb.getByteBuffer().getLong(); + int sizePy = smb.getByteBuffer().getInt(); + if (physicOffset != offsetPy) { + System.out.printf(storeTime + " [fetal error] physicOffset != offsetPy. position=" + + position + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset + + ", offsetPy=" + offsetPy); + errorCount++; + if (!openAll) { + success = false; + break ALL; + } + } + if (totalSize != sizePy) { + System.out.printf(storeTime + " [fetal error] totalSize != sizePy. position=" + + position + ", msgCount=" + msgCount + ", totalSize=" + totalSize + + ", sizePy=" + sizePy); + errorCount++; + if (!openAll) { + success = false; + break ALL; + } + } + } finally { + smb.release(); + } + + msgCount++; + position += totalSize; + byteBuffer.position(position); + } + + System.out.printf("end travel " + mapedFile.getFileName() + ", total msg=" + msgCount + + ", error count=" + errorCount + ", cost:" + (System.currentTimeMillis() - startTime)); + } + + System.out.printf("travel " + (success ? "ok" : "fail")); + } + + public ConsumeQueue findConsumeQueue(String topic, int queueId) { + ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); + if (null == map) { + ConcurrentHashMap<Integer, ConsumeQueue> newMap = + new ConcurrentHashMap<Integer, ConsumeQueue>(128); + ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); + if (oldMap != null) { + map = oldMap; + } else { + map = newMap; + } + } + ConsumeQueue logic = map.get(queueId); + if (null == logic) { + ConsumeQueue newLogic = new ConsumeQueue( + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), + lSize, + null); + ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); + if (oldLogic != null) { + logic = oldLogic; + } else { + logic = newLogic; + } + } + return logic; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java new file mode 100644 index 0000000..a03baf4 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.namesrv; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * + * @author manhong.yqd + * + */ +public class DeleteKvConfigCommand implements SubCommand { + @Override + public String commandName() { + return "deleteKvConfig"; + } + + + @Override + public String commandDesc() { + return "Delete KV config."; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("s", "namespace", true, "set the namespace"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("k", "key", true, "set the key name"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + // namespace + String namespace = commandLine.getOptionValue('s').trim(); + // key name + String key = commandLine.getOptionValue('k').trim(); + + defaultMQAdminExt.start(); + defaultMQAdminExt.deleteKvConfig(namespace, key); + System.out.printf("delete kv config from namespace success.%n"); + return; + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java new file mode 100644 index 0000000..0a40484 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.command.namesrv; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * @author xigu.lx + */ +public class GetNamesrvConfigCommand implements SubCommand { + + @Override + public String commandName() { + return "getNamesrvConfig"; + } + + @Override + public String commandDesc() { + return "Get configs of name server."; + } + + @Override + public Options buildCommandlineOptions(final Options options) { + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + // servers + String servers = commandLine.getOptionValue('n'); + List<String> serverList = null; + if (servers != null && servers.length() > 0) { + String[] serverArray = servers.trim().split(";"); + + if (serverArray != null && serverArray.length > 0) { + serverList = Arrays.asList(serverArray); + } + } + + defaultMQAdminExt.start(); + + Map<String, Properties> nameServerConfigs = defaultMQAdminExt.getNameServerConfig(serverList); + + for (String server : nameServerConfigs.keySet()) { + System.out.printf("============%s============\n", + server); + for (Object key : nameServerConfigs.get(server).keySet()) { + System.out.printf("%-50s= %s\n", key, nameServerConfigs.get(server).get(key)); + } + } + return; + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java new file mode 100644 index 0000000..21a43e7 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.namesrv; + +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + + +/** + * + * @author manhong.yqd + * + */ +public class UpdateKvConfigCommand implements SubCommand { + @Override + public String commandName() { + return "updateKvConfig"; + } + + + @Override + public String commandDesc() { + return "Create or update KV config."; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("s", "namespace", true, "set the namespace"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("k", "key", true, "set the key name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("v", "value", true, "set the key value"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + // namespace + String namespace = commandLine.getOptionValue('s').trim(); + // key name + String key = commandLine.getOptionValue('k').trim(); + // key name + String value = commandLine.getOptionValue('v').trim(); + + defaultMQAdminExt.start(); + defaultMQAdminExt.createAndUpdateKvConfig(namespace, key, value); + System.out.printf("create or update kv config to namespace success.%n"); + return; + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +}
