http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java index e15ce1f..b3f4377 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.tools.command.consumer; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.body.Connection; @@ -25,20 +31,12 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.MQAdminStartup; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.TreeMap; - public class ConsumerStatusSubCommand implements SubCommand { public static void main(String[] args) { System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); - MQAdminStartup.main(new String[]{new ConsumerStatusSubCommand().commandName(), "-g", "benchmark_consumer"}); + MQAdminStartup.main(new String[] {new ConsumerStatusSubCommand().commandName(), "-g", "benchmark_consumer"}); } @Override @@ -86,16 +84,16 @@ public class ConsumerStatusSubCommand implements SubCommand { for (Connection conn : cc.getConnectionSet()) { try { ConsumerRunningInfo consumerRunningInfo = - defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack); + defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack); if (consumerRunningInfo != null) { criTable.put(conn.getClientId(), consumerRunningInfo); String filePath = now + "/" + conn.getClientId(); MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath); System.out.printf("%03d %-40s %-20s %s%n", - i++, - conn.getClientId(), - MQVersion.getVersionDesc(conn.getVersion()), - filePath); + i++, + conn.getClientId(), + MQVersion.getVersionDesc(conn.getVersion()), + filePath); } } catch (Exception e) { e.printStackTrace(); @@ -114,7 +112,7 @@ public class ConsumerStatusSubCommand implements SubCommand { while (it.hasNext()) { Entry<String, ConsumerRunningInfo> next = it.next(); String result = - ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue()); + ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue()); if (result.length() > 0) { System.out.printf(result); } @@ -126,7 +124,7 @@ public class ConsumerStatusSubCommand implements SubCommand { } else { String clientId = commandLine.getOptionValue('i').trim(); ConsumerRunningInfo consumerRunningInfo = - defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack); + defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack); if (consumerRunningInfo != null) { System.out.printf("%s", consumerRunningInfo.formatString()); }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java index 6e7cc27..699625d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.tools.command.consumer; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.body.Connection; @@ -25,20 +31,12 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.MQAdminStartup; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.TreeMap; - public class ConsumerSubCommand implements SubCommand { public static void main(String[] args) { System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); - MQAdminStartup.main(new String[]{new ConsumerSubCommand().commandName(), "-g", "benchmark_consumer"}); + MQAdminStartup.main(new String[] {new ConsumerSubCommand().commandName(), "-g", "benchmark_consumer"}); } @Override @@ -81,20 +79,20 @@ public class ConsumerSubCommand implements SubCommand { int i = 1; long now = System.currentTimeMillis(); final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable = - new TreeMap<String, ConsumerRunningInfo>(); + new TreeMap<String, ConsumerRunningInfo>(); for (Connection conn : cc.getConnectionSet()) { try { ConsumerRunningInfo consumerRunningInfo = - defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack); + defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack); if (consumerRunningInfo != null) { criTable.put(conn.getClientId(), consumerRunningInfo); String filePath = now + "/" + conn.getClientId(); MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath); System.out.printf("%03d %-40s %-20s %s%n", - i++, - conn.getClientId(), - MQVersion.getVersionDesc(conn.getVersion()), - filePath); + i++, + conn.getClientId(), + MQVersion.getVersionDesc(conn.getVersion()), + filePath); } } catch (Exception e) { e.printStackTrace(); @@ -113,7 +111,7 @@ public class ConsumerSubCommand implements SubCommand { while (it.hasNext()) { Entry<String, ConsumerRunningInfo> next = it.next(); String result = - ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue()); + ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue()); if (result.length() > 0) { System.out.printf(result); } @@ -125,7 +123,7 @@ public class ConsumerSubCommand implements SubCommand { } else { String clientId = commandLine.getOptionValue('i').trim(); ConsumerRunningInfo consumerRunningInfo = - defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack); + defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack); if (consumerRunningInfo != null) { System.out.printf(consumerRunningInfo.formatString()); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java index 0cc5879..75e6b65 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java @@ -6,16 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.consumer; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; @@ -23,12 +27,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Set; - public class DeleteSubscriptionGroupCommand implements SubCommand { @Override @@ -36,13 +34,11 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { return "deleteSubGroup"; } - @Override public String commandDesc() { return "Delete subscription group from broker."; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "delete subscription group from which broker"); @@ -60,7 +56,6 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); @@ -75,7 +70,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { adminExt.deleteSubscriptionGroup(addr, groupName); System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName, - addr); + addr); return; } else if (commandLine.hasOption('c')) { @@ -86,15 +81,15 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { for (String master : masterSet) { adminExt.deleteSubscriptionGroup(master, groupName); System.out.printf( - "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n", - groupName, master, clusterName); + "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n", + groupName, master, clusterName); } try { DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.RETRY_GROUP_TOPIC_PREFIX - + groupName); + + groupName); DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.DLQ_GROUP_TOPIC_PREFIX - + groupName); + + groupName); } catch (Exception e) { } return; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java index 4d5315b..8bb7c0d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java @@ -6,54 +6,49 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.consumer; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.monitor.DefaultMonitorListener; import org.apache.rocketmq.tools.monitor.MonitorConfig; import org.apache.rocketmq.tools.monitor.MonitorService; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; import org.slf4j.Logger; - public class StartMonitoringSubCommand implements SubCommand { private final Logger log = ClientLogger.getLog(); - @Override public String commandName() { return "startMonitoring"; } - @Override public String commandDesc() { return "Start Monitoring"; } - @Override public Options buildCommandlineOptions(Options options) { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { try { MonitorService monitorService = - new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook); + new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook); monitorService.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java index 93eb8ec..4ff032e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java @@ -16,18 +16,16 @@ */ package org.apache.rocketmq.tools.command.consumer; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Set; - public class UpdateSubGroupSubCommand implements SubCommand { @@ -36,13 +34,11 @@ public class UpdateSubGroupSubCommand implements SubCommand { return "updateSubGroup"; } - @Override public String commandDesc() { return "Update or create subscription group"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "create subscription group to which broker"); @@ -92,7 +88,6 @@ public class UpdateSubGroupSubCommand implements SubCommand { return options; } - @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -110,31 +105,31 @@ public class UpdateSubGroupSubCommand implements SubCommand { // consumeEnable if (commandLine.hasOption('s')) { subscriptionGroupConfig.setConsumeEnable(Boolean.parseBoolean(commandLine.getOptionValue('s') - .trim())); + .trim())); } // consumeFromMinEnable if (commandLine.hasOption('m')) { subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.parseBoolean(commandLine - .getOptionValue('m').trim())); + .getOptionValue('m').trim())); } // consumeBroadcastEnable if (commandLine.hasOption('d')) { subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.parseBoolean(commandLine - .getOptionValue('d').trim())); + .getOptionValue('d').trim())); } // retryQueueNums if (commandLine.hasOption('q')) { subscriptionGroupConfig.setRetryQueueNums(Integer.parseInt(commandLine.getOptionValue('q') - .trim())); + .trim())); } // retryMaxTimes if (commandLine.hasOption('r')) { subscriptionGroupConfig.setRetryMaxTimes(Integer.parseInt(commandLine.getOptionValue('r') - .trim())); + .trim())); } // brokerId @@ -145,13 +140,13 @@ public class UpdateSubGroupSubCommand implements SubCommand { // whichBrokerWhenConsumeSlowly if (commandLine.hasOption('w')) { subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(Long.parseLong(commandLine - .getOptionValue('w').trim())); + .getOptionValue('w').trim())); } // notifyConsumerIdsChanged if (commandLine.hasOption('a')) { subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(Boolean.parseBoolean(commandLine - .getOptionValue('a').trim())); + .getOptionValue('a').trim())); } if (commandLine.hasOption('b')) { @@ -169,7 +164,7 @@ public class UpdateSubGroupSubCommand implements SubCommand { defaultMQAdminExt.start(); Set<String> masterSet = - CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { try { defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java index fb0061e..cade0e0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java @@ -16,6 +16,10 @@ */ package org.apache.rocketmq.tools.command.message; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.MixAll; @@ -23,11 +27,6 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.List; public class CheckMsgSendRTCommand implements SubCommand { private static String brokerName = ""; @@ -72,16 +71,16 @@ public class CheckMsgSendRTCommand implements SubCommand { boolean sendSuccess = false; String topic = commandLine.getOptionValue('t').trim(); long amount = !commandLine.hasOption('a') ? 100 : Long.parseLong(commandLine - .getOptionValue('a').trim()); + .getOptionValue('a').trim()); long msgSize = !commandLine.hasOption('s') ? 128 : Long.parseLong(commandLine - .getOptionValue('s').trim()); + .getOptionValue('s').trim()); Message msg = new Message(topic, getStringBySize(msgSize).getBytes(MixAll.DEFAULT_CHARSET)); System.out.printf("%-32s %-4s %-20s %s%n", - "#Broker Name", - "#QID", - "#Send Result", - "#RT" + "#Broker Name", + "#QID", + "#Send Result", + "#RT" ); for (int i = 0; i < amount; i++) { start = System.currentTimeMillis(); @@ -89,7 +88,7 @@ public class CheckMsgSendRTCommand implements SubCommand { producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { - int queueIndex = (Integer) arg % mqs.size(); + int queueIndex = (Integer)arg % mqs.size(); MessageQueue queue = mqs.get(queueIndex); brokerName = queue.getBrokerName(); queueId = queue.getQueueId(); @@ -103,20 +102,19 @@ public class CheckMsgSendRTCommand implements SubCommand { end = System.currentTimeMillis(); } - if (i != 0) { timeElapsed += end - start; } System.out.printf("%-32s %-4s %-20s %s%n", - brokerName, - queueId, - sendSuccess, - end - start + brokerName, + queueId, + sendSuccess, + end - start ); } - double rt = (double) timeElapsed / (amount - 1); + double rt = (double)timeElapsed / (amount - 1); System.out.printf("Avg RT: %s%n", String.format("%.2f", rt)); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java index 88264b5..40adec9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java @@ -6,23 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.message; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; public class DecodeMessageIdCommond implements SubCommand { @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java index 33e6804..aad1644 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java @@ -17,6 +17,17 @@ package org.apache.rocketmq.tools.command.message; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.common.MixAll; @@ -25,33 +36,77 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; - -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; public class PrintMessageByQueueCommand implements SubCommand { + public static long timestampFormat(final String value) { + long timestamp = 0; + try { + timestamp = Long.parseLong(value); + } catch (NumberFormatException e) { + + timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); + } + + return timestamp; + } + + private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap, final boolean calByTag) { + if (!calByTag) + return; + + for (MessageExt msg : msgs) { + String tag = msg.getTags(); + if (StringUtils.isNotBlank(tag)) { + AtomicLong count = tagCalmap.get(tag); + if (count == null) { + count = new AtomicLong(); + tagCalmap.put(tag, count); + } + count.incrementAndGet(); + } + } + } + + private static void printCalculateByTag(final Map<String, AtomicLong> tagCalmap, final boolean calByTag) { + if (!calByTag) + return; + + List<TagCountBean> list = new ArrayList<TagCountBean>(); + for (Map.Entry<String, AtomicLong> entry : tagCalmap.entrySet()) { + TagCountBean tagBean = new TagCountBean(entry.getKey(), entry.getValue()); + list.add(tagBean); + } + Collections.sort(list); + + for (TagCountBean tagCountBean : list) { + System.out.printf("Tag: %-30s Count: %s%n", tagCountBean.getTag(), tagCountBean.getCount()); + } + } + + public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg, boolean printBody) { + if (!printMsg) + return; + + for (MessageExt msg : msgs) { + try { + System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(), + printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY"); + } catch (UnsupportedEncodingException e) { + } + } + } + @Override public String commandName() { return "printMsgByQueue"; } - @Override public String commandDesc() { return "Print Message Detail"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); @@ -94,7 +149,6 @@ public class PrintMessageByQueueCommand implements SubCommand { opt.setRequired(false); options.addOption(opt); - return options; } @@ -104,15 +158,15 @@ public class PrintMessageByQueueCommand implements SubCommand { try { String charsetName = - !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); + !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); boolean printMsg = - !commandLine.hasOption('p') ? false : Boolean.parseBoolean(commandLine.getOptionValue('p').trim()); + !commandLine.hasOption('p') ? false : Boolean.parseBoolean(commandLine.getOptionValue('p').trim()); boolean printBody = - !commandLine.hasOption('d') ? false : Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); + !commandLine.hasOption('d') ? false : Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); boolean calByTag = - !commandLine.hasOption('f') ? false : Boolean.parseBoolean(commandLine.getOptionValue('f').trim()); + !commandLine.hasOption('f') ? false : Boolean.parseBoolean(commandLine.getOptionValue('f').trim()); String subExpression = - !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); + !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); String topic = commandLine.getOptionValue('t').trim(); String brokerName = commandLine.getOptionValue('a').trim(); @@ -165,70 +219,10 @@ public class PrintMessageByQueueCommand implements SubCommand { } } - public static long timestampFormat(final String value) { - long timestamp = 0; - try { - timestamp = Long.parseLong(value); - } catch (NumberFormatException e) { - - timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); - } - - return timestamp; - } - - - private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap, final boolean calByTag) { - if (!calByTag) - return; - - for (MessageExt msg : msgs) { - String tag = msg.getTags(); - if (StringUtils.isNotBlank(tag)) { - AtomicLong count = tagCalmap.get(tag); - if (count == null) { - count = new AtomicLong(); - tagCalmap.put(tag, count); - } - count.incrementAndGet(); - } - } - } - - private static void printCalculateByTag(final Map<String, AtomicLong> tagCalmap, final boolean calByTag) { - if (!calByTag) - return; - - List<TagCountBean> list = new ArrayList<TagCountBean>(); - for (Map.Entry<String, AtomicLong> entry : tagCalmap.entrySet()) { - TagCountBean tagBean = new TagCountBean(entry.getKey(), entry.getValue()); - list.add(tagBean); - } - Collections.sort(list); - - for (TagCountBean tagCountBean : list) { - System.out.printf("Tag: %-30s Count: %s%n", tagCountBean.getTag(), tagCountBean.getCount()); - } - } - - public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg, boolean printBody) { - if (!printMsg) - return; - - for (MessageExt msg : msgs) { - try { - System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(), - printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY"); - } catch (UnsupportedEncodingException e) { - } - } - } - static class TagCountBean implements Comparable<TagCountBean> { private String tag; private AtomicLong count; - public TagCountBean(final String tag, final AtomicLong count) { this.tag = tag; this.count = count; @@ -250,10 +244,9 @@ public class PrintMessageByQueueCommand implements SubCommand { this.count = count; } - @Override public int compareTo(final TagCountBean o) { - return (int) (o.getCount().get() - this.count.get()); + return (int)(o.getCount().get() - this.count.get()); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java index e8c9368..4f87d77 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.tools.command.message; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.common.MixAll; @@ -24,29 +30,41 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.Set; +public class PrintMessageSubCommand implements SubCommand { + + public static long timestampFormat(final String value) { + long timestamp = 0; + try { + timestamp = Long.parseLong(value); + } catch (NumberFormatException e) { + timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); + } + return timestamp; + } -public class PrintMessageSubCommand implements SubCommand { + public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printBody) { + for (MessageExt msg : msgs) { + try { + System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(), + printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY"); + } catch (UnsupportedEncodingException e) { + // + } + } + } @Override public String commandName() { return "printMsg"; } - @Override public String commandDesc() { return "Print Message Detail"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); @@ -62,20 +80,20 @@ public class PrintMessageSubCommand implements SubCommand { options.addOption(opt); opt = - new Option("b", "beginTimestamp ", true, - "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + new Option("b", "beginTimestamp ", true, + "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); opt.setRequired(false); options.addOption(opt); opt = - new Option("e", "endTimestamp ", true, - "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + new Option("e", "endTimestamp ", true, + "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); opt.setRequired(false); options.addOption(opt); opt = - new Option("d", "printBody ", true, - "print body"); + new Option("d", "printBody ", true, + "print body"); opt.setRequired(false); options.addOption(opt); @@ -90,13 +108,13 @@ public class PrintMessageSubCommand implements SubCommand { String topic = commandLine.getOptionValue('t').trim(); String charsetName = // - !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); + !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); String subExpression = // - !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); + !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); boolean printBody = // - !commandLine.hasOption('d') ? true : Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); + !commandLine.hasOption('d') ? true : Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); consumer.start(); @@ -149,26 +167,4 @@ public class PrintMessageSubCommand implements SubCommand { consumer.shutdown(); } } - - public static long timestampFormat(final String value) { - long timestamp = 0; - try { - timestamp = Long.parseLong(value); - } catch (NumberFormatException e) { - timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); - } - - return timestamp; - } - - public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printBody) { - for (MessageExt msg : msgs) { - try { - System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(), - printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY"); - } catch (UnsupportedEncodingException e) { - // - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java index fded7b6..38f9a72 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java @@ -16,6 +16,15 @@ */ package org.apache.rocketmq.tools.command.message; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -30,19 +39,140 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.List; +public class QueryMsgByIdSubCommand implements SubCommand { + public static void queryById(final DefaultMQAdminExt admin, final String msgId) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException, IOException { + MessageExt msg = admin.viewMessage(msgId); + printMsg(admin, msg); + } + + public static void printMsg(final DefaultMQAdminExt admin, final MessageExt msg) throws IOException { + if (msg == null) { + System.out.printf("%nMessage not found!"); + return; + } + + String bodyTmpFilePath = createBodyFile(msg); + String msgId = msg.getMsgId(); + if (msg instanceof MessageClientExt) { + msgId = ((MessageClientExt)msg).getOffsetMsgId(); + } + + System.out.printf("%-20s %s%n", + "OffsetID:", + msgId + ); + + System.out.printf("%-20s %s%n", + "OffsetID:", + msgId + ); + + System.out.printf("%-20s %s%n", + "Topic:", + msg.getTopic() + ); + + System.out.printf("%-20s %s%n", + "Tags:", + "[" + msg.getTags() + "]" + ); + + System.out.printf("%-20s %s%n", + "Keys:", + "[" + msg.getKeys() + "]" + ); + + System.out.printf("%-20s %d%n", + "Queue ID:", + msg.getQueueId() + ); + + System.out.printf("%-20s %d%n", + "Queue Offset:", + msg.getQueueOffset() + ); + + System.out.printf("%-20s %d%n", + "CommitLog Offset:", + msg.getCommitLogOffset() + ); + + System.out.printf("%-20s %d%n", + "Reconsume Times:", + msg.getReconsumeTimes() + ); + + System.out.printf("%-20s %s%n", + "Born Timestamp:", + UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()) + ); + + System.out.printf("%-20s %s%n", + "Store Timestamp:", + UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()) + ); + + System.out.printf("%-20s %s%n", + "Born Host:", + RemotingHelper.parseSocketAddressAddr(msg.getBornHost()) + ); + + System.out.printf("%-20s %s%n", + "Store Host:", + RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()) + ); + + System.out.printf("%-20s %d%n", + "System Flag:", + msg.getSysFlag() + ); + + System.out.printf("%-20s %s%n", + "Properties:", + msg.getProperties() != null ? msg.getProperties().toString() : "" + ); + + System.out.printf("%-20s %s%n", + "Message Body Path:", + bodyTmpFilePath + ); + + try { + List<MessageTrack> mtdList = admin.messageTrackDetail(msg); + if (mtdList.isEmpty()) { + System.out.printf("%n%nWARN: No Consumer"); + } else { + System.out.printf("%n%n"); + for (MessageTrack mt : mtdList) { + System.out.printf("%s", mt); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static String createBodyFile(MessageExt msg) throws IOException { + DataOutputStream dos = null; + try { + String bodyTmpFilePath = "/tmp/rocketmq/msgbodys"; + File file = new File(bodyTmpFilePath); + if (!file.exists()) { + file.mkdirs(); + } + bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId(); + dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath)); + dos.write(msg.getBody()); + return bodyTmpFilePath; + } finally { + if (dos != null) + dos.close(); + } + } -public class QueryMsgByIdSubCommand implements SubCommand { @Override public String commandName() { return "queryMsgById"; @@ -134,14 +264,15 @@ public class QueryMsgByIdSubCommand implements SubCommand { private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId, final String msgId) { try { ConsumeMessageDirectlyResult result = - defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId); + defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId); System.out.printf("%s", result); } catch (Exception e) { e.printStackTrace(); } } - private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final DefaultMQProducer defaultMQProducer, final String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final DefaultMQProducer defaultMQProducer, + final String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { MessageExt msg = defaultMQAdminExt.viewMessage(msgId); if (msg != null) { @@ -156,136 +287,4 @@ public class QueryMsgByIdSubCommand implements SubCommand { e.printStackTrace(); } } - - public static void queryById(final DefaultMQAdminExt admin, final String msgId) throws MQClientException, - RemotingException, MQBrokerException, InterruptedException, IOException { - MessageExt msg = admin.viewMessage(msgId); - - printMsg(admin, msg); - } - - public static void printMsg(final DefaultMQAdminExt admin, final MessageExt msg) throws IOException { - if (msg == null) { - System.out.printf("%nMessage not found!"); - return; - } - - String bodyTmpFilePath = createBodyFile(msg); - String msgId = msg.getMsgId(); - if (msg instanceof MessageClientExt) { - msgId = ((MessageClientExt) msg).getOffsetMsgId(); - } - - System.out.printf("%-20s %s%n", - "OffsetID:", - msgId - ); - - System.out.printf("%-20s %s%n", - "OffsetID:", - msgId - ); - - System.out.printf("%-20s %s%n", - "Topic:", - msg.getTopic() - ); - - System.out.printf("%-20s %s%n", - "Tags:", - "[" + msg.getTags() + "]" - ); - - System.out.printf("%-20s %s%n", - "Keys:", - "[" + msg.getKeys() + "]" - ); - - System.out.printf("%-20s %d%n", - "Queue ID:", - msg.getQueueId() - ); - - System.out.printf("%-20s %d%n", - "Queue Offset:", - msg.getQueueOffset() - ); - - System.out.printf("%-20s %d%n", - "CommitLog Offset:", - msg.getCommitLogOffset() - ); - - System.out.printf("%-20s %d%n", - "Reconsume Times:", - msg.getReconsumeTimes() - ); - - System.out.printf("%-20s %s%n", - "Born Timestamp:", - UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()) - ); - - System.out.printf("%-20s %s%n", - "Store Timestamp:", - UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()) - ); - - System.out.printf("%-20s %s%n", - "Born Host:", - RemotingHelper.parseSocketAddressAddr(msg.getBornHost()) - ); - - System.out.printf("%-20s %s%n", - "Store Host:", - RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()) - ); - - System.out.printf("%-20s %d%n", - "System Flag:", - msg.getSysFlag() - ); - - System.out.printf("%-20s %s%n", - "Properties:", - msg.getProperties() != null ? msg.getProperties().toString() : "" - ); - - System.out.printf("%-20s %s%n", - "Message Body Path:", - bodyTmpFilePath - ); - - try { - List<MessageTrack> mtdList = admin.messageTrackDetail(msg); - if (mtdList.isEmpty()) { - System.out.printf("%n%nWARN: No Consumer"); - } else { - System.out.printf("%n%n"); - for (MessageTrack mt : mtdList) { - System.out.printf("%s", mt); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - private static String createBodyFile(MessageExt msg) throws IOException { - DataOutputStream dos = null; - try { - String bodyTmpFilePath = "/tmp/rocketmq/msgbodys"; - File file = new File(bodyTmpFilePath); - if (!file.exists()) { - file.mkdirs(); - } - bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId(); - dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath)); - dos.write(msg.getBody()); - return bodyTmpFilePath; - } finally { - if (dos != null) - dos.close(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java index ebfc80e..159bd6e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java @@ -6,26 +6,25 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.message; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - /** * @@ -75,14 +74,14 @@ public class QueryMsgByKeySubCommand implements SubCommand { } void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { admin.start(); QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE); System.out.printf("%-50s %4s %40s%n", - "#Message ID", - "#QID", - "#Offset"); + "#Message ID", + "#QID", + "#Offset"); for (MessageExt msg : queryResult.getMessageList()) { System.out.printf("%-50s %4d %40d%n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset()); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java index fc5fd56..2133636 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java @@ -16,6 +16,9 @@ */ package org.apache.rocketmq.tools.command.message; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.common.MixAll; @@ -23,10 +26,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - public class QueryMsgByOffsetSubCommand implements SubCommand { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java index 4e4bd61..2a6904e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java @@ -16,6 +16,14 @@ */ package org.apache.rocketmq.tools.command.message; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; @@ -27,125 +35,83 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.List; public class QueryMsgByUniqueKeySubCommand implements SubCommand { - @Override - public String commandName() { - return "queryMsgByUniqueKey"; - } - - @Override - public String commandDesc() { - return "Query Message by Unique key"; - } - - @Override - public Options buildCommandlineOptions(Options options) { - Option opt = new Option("i", "msgId", true, "Message Id"); - opt.setRequired(true); - options.addOption(opt); - - opt = new Option("g", "consumerGroup", true, "consumer group name"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("d", "clientId", true, "The consumer's client id"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("t", "topic", true, "The topic of msg"); - opt.setRequired(true); - options.addOption(opt); - - return options; - } - - public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId) throws MQClientException, - RemotingException, MQBrokerException, InterruptedException, IOException { + RemotingException, MQBrokerException, InterruptedException, IOException { MessageExt msg = admin.viewMessage(topic, msgId); - String bodyTmpFilePath = createBodyFile(msg); System.out.printf("%-20s %s%n", - "Topic:", - msg.getTopic() + "Topic:", + msg.getTopic() ); System.out.printf("%-20s %s%n", - "Tags:", - "[" + msg.getTags() + "]" + "Tags:", + "[" + msg.getTags() + "]" ); System.out.printf("%-20s %s%n", - "Keys:", - "[" + msg.getKeys() + "]" + "Keys:", + "[" + msg.getKeys() + "]" ); System.out.printf("%-20s %d%n", - "Queue ID:", - msg.getQueueId() + "Queue ID:", + msg.getQueueId() ); System.out.printf("%-20s %d%n", - "Queue Offset:", - msg.getQueueOffset() + "Queue Offset:", + msg.getQueueOffset() ); System.out.printf("%-20s %d%n", - "CommitLog Offset:", - msg.getCommitLogOffset() + "CommitLog Offset:", + msg.getCommitLogOffset() ); System.out.printf("%-20s %d%n", - "Reconsume Times:", - msg.getReconsumeTimes() + "Reconsume Times:", + msg.getReconsumeTimes() ); System.out.printf("%-20s %s%n", - "Born Timestamp:", - UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()) + "Born Timestamp:", + UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()) ); System.out.printf("%-20s %s%n", - "Store Timestamp:", - UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()) + "Store Timestamp:", + UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()) ); System.out.printf("%-20s %s%n", - "Born Host:", - RemotingHelper.parseSocketAddressAddr(msg.getBornHost()) + "Born Host:", + RemotingHelper.parseSocketAddressAddr(msg.getBornHost()) ); System.out.printf("%-20s %s%n", - "Store Host:", - RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()) + "Store Host:", + RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()) ); System.out.printf("%-20s %d%n", - "System Flag:", - msg.getSysFlag() + "System Flag:", + msg.getSysFlag() ); System.out.printf("%-20s %s%n", - "Properties:", - msg.getProperties() != null ? msg.getProperties().toString() : "" + "Properties:", + msg.getProperties() != null ? msg.getProperties().toString() : "" ); System.out.printf("%-20s %s%n", - "Message Body Path:", - bodyTmpFilePath + "Message Body Path:", + bodyTmpFilePath ); try { @@ -163,6 +129,54 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { } } + private static String createBodyFile(MessageExt msg) throws IOException { + DataOutputStream dos = null; + try { + String bodyTmpFilePath = "/tmp/rocketmq/msgbodys"; + File file = new File(bodyTmpFilePath); + if (!file.exists()) { + file.mkdirs(); + } + bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId(); + dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath)); + dos.write(msg.getBody()); + return bodyTmpFilePath; + } finally { + if (dos != null) + dos.close(); + } + } + + @Override + public String commandName() { + return "queryMsgByUniqueKey"; + } + + @Override + public String commandDesc() { + return "Query Message by Unique key"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("i", "msgId", true, "Message Id"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("g", "consumerGroup", true, "consumer group name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("d", "clientId", true, "The consumer's client id"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "The topic of msg"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { @@ -178,7 +192,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { final String consumerGroup = commandLine.getOptionValue('g').trim(); final String clientId = commandLine.getOptionValue('d').trim(); ConsumeMessageDirectlyResult result = - defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); System.out.printf("%s", result); } else { queryById(defaultMQAdminExt, topic, msgId); @@ -189,23 +203,4 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { defaultMQAdminExt.shutdown(); } } - - - private static String createBodyFile(MessageExt msg) throws IOException { - DataOutputStream dos = null; - try { - String bodyTmpFilePath = "/tmp/rocketmq/msgbodys"; - File file = new File(bodyTmpFilePath); - if (!file.exists()) { - file.mkdirs(); - } - bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId(); - dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath)); - dos.write(msg.getBody()); - return bodyTmpFilePath; - } finally { - if (dos != null) - dos.close(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java index ee923c6..e25c61f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java @@ -17,19 +17,18 @@ package org.apache.rocketmq.tools.command.message; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.store.ConsumeQueue; -import org.apache.rocketmq.store.MappedFile; -import org.apache.rocketmq.store.MappedFileQueue; -import org.apache.rocketmq.store.SelectMappedBufferResult; -import org.apache.rocketmq.store.config.StorePathConfigHelper; - import java.io.File; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.MappedFile; +import org.apache.rocketmq.store.MappedFileQueue; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.config.StorePathConfigHelper; public class Store { public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; @@ -49,7 +48,7 @@ public class Store { this.lSize = lSize; mapedFileQueue = new MappedFileQueue(cStorePath, cSize, null); consumeQueueTable = - new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>(); + new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>(); } public boolean load() { @@ -75,11 +74,11 @@ public class Store { for (File fileQueueId : fileQueueIdList) { int queueId = Integer.parseInt(fileQueueId.getName()); ConsumeQueue logic = new ConsumeQueue( - topic, - queueId, - StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), - lSize, - null); + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), + lSize, + null); this.putConsumeQueue(topic, queueId, logic); if (!logic.load()) { return false; @@ -92,7 +91,6 @@ public class Store { return true; } - private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic); if (null == map) { @@ -181,16 +179,14 @@ public class Store { e.printStackTrace(); } - Date storeTime = new Date(storeTimestamp); - long currentPhyOffset = startOffset + position; if (physicOffset != currentPhyOffset) { System.out.printf(storeTime - + " [fetal error] physicOffset != currentPhyOffset. position=" + position - + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset - + ", currentPhyOffset=" + currentPhyOffset); + + " [fetal error] physicOffset != currentPhyOffset. position=" + position + + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset + + ", currentPhyOffset=" + currentPhyOffset); errorCount++; if (!openAll) { success = false; @@ -205,8 +201,8 @@ public class Store { int sizePy = smb.getByteBuffer().getInt(); if (physicOffset != offsetPy) { System.out.printf(storeTime + " [fetal error] physicOffset != offsetPy. position=" - + position + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset - + ", offsetPy=" + offsetPy); + + position + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset + + ", offsetPy=" + offsetPy); errorCount++; if (!openAll) { success = false; @@ -215,8 +211,8 @@ public class Store { } if (totalSize != sizePy) { System.out.printf(storeTime + " [fetal error] totalSize != sizePy. position=" - + position + ", msgCount=" + msgCount + ", totalSize=" + totalSize - + ", sizePy=" + sizePy); + + position + ", msgCount=" + msgCount + ", totalSize=" + totalSize + + ", sizePy=" + sizePy); errorCount++; if (!openAll) { success = false; @@ -233,7 +229,7 @@ public class Store { } System.out.printf("end travel " + mapedFile.getFileName() + ", total msg=" + msgCount - + ", error count=" + errorCount + ", cost:" + (System.currentTimeMillis() - startTime)); + + ", error count=" + errorCount + ", cost:" + (System.currentTimeMillis() - startTime)); } System.out.printf("travel " + (success ? "ok" : "fail")); @@ -243,7 +239,7 @@ public class Store { ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { ConcurrentHashMap<Integer, ConsumeQueue> newMap = - new ConcurrentHashMap<Integer, ConsumeQueue>(128); + new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; @@ -254,11 +250,11 @@ public class Store { ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( - topic, - queueId, - StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), - lSize, - null); + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), + lSize, + null); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java index 0ae8f44..b4fb7dd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java @@ -6,23 +6,22 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.namesrv; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.SubCommand; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; - +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; /** * @@ -34,13 +33,11 @@ public class DeleteKvConfigCommand implements SubCommand { return "deleteKvConfig"; } - @Override public String commandDesc() { return "Delete KV config."; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("s", "namespace", true, "set the namespace"); @@ -53,7 +50,6 @@ public class DeleteKvConfigCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java index b2c95d3..f4c95d3 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java @@ -6,27 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.namesrv; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; - import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; public class GetNamesrvConfigCommand implements SubCommand { @@ -67,7 +66,7 @@ public class GetNamesrvConfigCommand implements SubCommand { for (String server : nameServerConfigs.keySet()) { System.out.printf("============%s============\n", - server); + server); for (Object key : nameServerConfigs.get(server).keySet()) { System.out.printf("%-50s= %s\n", key, nameServerConfigs.get(server).get(key)); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java index a2d4f43..9d5f7a9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java @@ -6,23 +6,22 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.namesrv; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.SubCommand; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; - +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; /** * @@ -34,13 +33,11 @@ public class UpdateKvConfigCommand implements SubCommand { return "updateKvConfig"; } - @Override public String commandDesc() { return "Create or update KV config."; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("s", "namespace", true, "set the namespace"); @@ -57,7 +54,6 @@ public class UpdateKvConfigCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java index c6517d6..807636c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java @@ -6,27 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.namesrv; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - import java.util.Arrays; import java.util.List; import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; public class UpdateNamesrvConfigCommand implements SubCommand { @Override @@ -80,7 +79,7 @@ public class UpdateNamesrvConfigCommand implements SubCommand { defaultMQAdminExt.updateNameServerConfig(properties, serverList); System.out.printf("update name server config success!%s\n%s : %s\n", - serverList == null ? "" : serverList, key, value); + serverList == null ? "" : serverList, key, value); return; } catch (Exception e) { e.printStackTrace();