http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index df4fe89..5056010 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -16,6 +16,11 @@ */ package org.apache.rocketmq.tools.admin; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -25,17 +30,27 @@ import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.api.MessageTrack; -import org.apache.rocketmq.remoting.exception.*; - -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; public interface MQAdminExt extends MQAdmin { void start() throws MQClientException; @@ -43,53 +58,53 @@ public interface MQAdminExt extends MQAdmin { void shutdown(); void updateBrokerConfig(final String brokerAddr, final Properties properties) throws RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException; + RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException; Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException; + RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException; void createAndUpdateTopicConfig(final String addr, final TopicConfig config) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + InterruptedException, MQClientException; void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException; + MQBrokerException, InterruptedException, MQClientException; SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group); TopicConfig examineTopicConfig(final String addr, final String topic); TopicStatsTable examineTopicStats(final String topic) throws RemotingException, MQClientException, InterruptedException, - MQBrokerException; + MQBrokerException; TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException; TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException; KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, InterruptedException, MQBrokerException; + RemotingTimeoutException, InterruptedException, MQBrokerException; ConsumeStats examineConsumeStats(final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, - MQBrokerException; + MQBrokerException; ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, - InterruptedException, MQBrokerException; + InterruptedException, MQBrokerException; ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException; + RemotingSendRequestException, RemotingConnectException; TopicRouteData examineTopicRouteInfo(final String topic) throws RemotingException, MQClientException, InterruptedException; ConsumerConnection examineConsumerConnectionInfo(final String consumerGroup) throws RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, - MQClientException; + RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, + MQClientException; ProducerConnection examineProducerConnectionInfo(final String producerGroup, final String topic) throws RemotingException, - MQClientException, InterruptedException, MQBrokerException; + MQClientException, InterruptedException, MQBrokerException; List<String> getNameServerAddressList(); int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, - RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException; + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException; void putKVConfig(final String namespace, final String key, final String value); @@ -98,91 +113,94 @@ public interface MQAdminExt extends MQAdmin { KVTable getKVListByNamespace(final String namespace) throws RemotingException, MQClientException, InterruptedException; void deleteTopicInBroker(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + InterruptedException, MQClientException; void deleteTopicInNameServer(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + InterruptedException, MQClientException; void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + InterruptedException, MQClientException; void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + InterruptedException, MQClientException; void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException, - MQClientException; + MQClientException; List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + InterruptedException, MQClientException; Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException; + MQBrokerException, InterruptedException, MQClientException; void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; + InterruptedException, MQClientException; GroupList queryTopicConsumeByWho(final String topic) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException; + RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException; List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException, - RemotingException, MQClientException; + RemotingException, MQClientException; boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQClientException, InterruptedException; + RemotingTimeoutException, MQClientException, InterruptedException; boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQClientException, InterruptedException; + RemotingTimeoutException, MQClientException, InterruptedException; boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQClientException, InterruptedException; - + RemotingTimeoutException, MQClientException, InterruptedException; boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQClientException, InterruptedException; + RemotingTimeoutException, MQClientException, InterruptedException; ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, final String clientId, final boolean jstack) - throws RemotingException, MQClientException, InterruptedException; + throws RemotingException, MQClientException, InterruptedException; ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, - String clientId, - String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + String clientId, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, - String clientId, - String topic, - String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + String clientId, + String topic, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException, - MQBrokerException; + MQBrokerException; void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException, - MQClientException, InterruptedException, MQBrokerException; + MQClientException, InterruptedException, MQBrokerException; BrokerStatsData viewBrokerStatsData(final String brokerAddr, final String statsName, final String statsKey) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, - InterruptedException; + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, + InterruptedException; Set<String> getClusterList(final String topic) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQClientException, InterruptedException; + RemotingTimeoutException, MQClientException, InterruptedException; - ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQClientException, InterruptedException; + ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, + long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException; Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException; - SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, - RemotingConnectException, MQBrokerException; + SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException; - TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, - RemotingConnectException, MQBrokerException; + TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException; - void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException; + void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, + long offset) throws RemotingException, InterruptedException, MQBrokerException; /** * Update name server config. @@ -193,7 +211,6 @@ public interface MQAdminExt extends MQAdmin { * * @param properties * @param nameServers - * * @throws InterruptedException * @throws RemotingConnectException * @throws UnsupportedEncodingException @@ -203,8 +220,8 @@ public interface MQAdminExt extends MQAdmin { * @throws MQBrokerException */ void updateNameServerConfig(final Properties properties, final List<String> nameServers) throws InterruptedException, RemotingConnectException, - UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, - MQClientException, MQBrokerException; + UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, + MQClientException, MQBrokerException; /** * Get name server config. @@ -213,9 +230,7 @@ public interface MQAdminExt extends MQAdmin { * <br> If param(nameServers) is null or empty, will use name servers from ns! * * @param nameServers - * * @return The fetched name server config - * * @throws InterruptedException * @throws RemotingTimeoutException * @throws RemotingSendRequestException @@ -224,6 +239,6 @@ public interface MQAdminExt extends MQAdmin { * @throws UnsupportedEncodingException */ Map<String, Properties> getNameServerConfig(final List<String> nameServers) throws InterruptedException, - RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, - MQClientException, UnsupportedEncodingException; + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, + MQClientException, UnsupportedEncodingException; }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java index 324b661..4445482 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.admin.api; @@ -22,40 +22,33 @@ public class MessageTrack { private TrackType trackType; private String exceptionDesc; - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public TrackType getTrackType() { return trackType; } - public void setTrackType(TrackType trackType) { this.trackType = trackType; } - public String getExceptionDesc() { return exceptionDesc; } - public void setExceptionDesc(String exceptionDesc) { this.exceptionDesc = exceptionDesc; } - @Override public String toString() { return "MessageTrack [consumerGroup=" + consumerGroup + ", trackType=" + trackType - + ", exceptionDesc=" + exceptionDesc + "]"; + + ", exceptionDesc=" + exceptionDesc + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java index 36345f9..df70523 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.admin.api; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java index c675d9a..8b86ab8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java @@ -16,6 +16,14 @@ */ package org.apache.rocketmq.tools.command; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.body.ClusterInfo; @@ -25,16 +33,13 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.MQAdminExt; -import java.util.*; - - public class CommandUtil { public static Map<String/*master addr*/, List<String>/*slave addr*/> fetchMasterAndSlaveDistinguish( - final MQAdminExt adminExt, final String clusterName) - throws InterruptedException, RemotingConnectException, - RemotingTimeoutException, RemotingSendRequestException, - MQBrokerException { + final MQAdminExt adminExt, final String clusterName) + throws InterruptedException, RemotingConnectException, + RemotingTimeoutException, RemotingSendRequestException, + MQBrokerException { Map<String, List<String>> masterAndSlaveMap = new HashMap<String, List<String>>(4); ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); @@ -42,7 +47,7 @@ public class CommandUtil { if (brokerNameSet == null) { System.out - .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); return masterAndSlaveMap; } @@ -58,7 +63,7 @@ public class CommandUtil { for (Long id : brokerData.getBrokerAddrs().keySet()) { if (brokerData.getBrokerAddrs().get(id) == null - || id.longValue() == MixAll.MASTER_ID) { + || id.longValue() == MixAll.MASTER_ID) { continue; } @@ -70,8 +75,8 @@ public class CommandUtil { } public static Set<String> fetchMasterAddrByClusterName(final MQAdminExt adminExt, final String clusterName) - throws InterruptedException, RemotingConnectException, RemotingTimeoutException, - RemotingSendRequestException, MQBrokerException { + throws InterruptedException, RemotingConnectException, RemotingTimeoutException, + RemotingSendRequestException, MQBrokerException { Set<String> masterSet = new HashSet<String>(); ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); @@ -91,15 +96,15 @@ public class CommandUtil { } } else { System.out - .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); } return masterSet; } public static Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, final String clusterName) - throws InterruptedException, RemotingConnectException, RemotingTimeoutException, - RemotingSendRequestException, MQBrokerException { + throws InterruptedException, RemotingConnectException, RemotingTimeoutException, + RemotingSendRequestException, MQBrokerException { Set<String> masterSet = new HashSet<String>(); ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); @@ -116,29 +121,27 @@ public class CommandUtil { } } else { System.out - .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); } return masterSet; } - public static Set<String> fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName) - throws Exception { + throws Exception { ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); if (brokerNameSet.isEmpty()) { throw new Exception( - "Make sure the specified clusterName exists or the nameserver which connected is correct."); + "Make sure the specified clusterName exists or the nameserver which connected is correct."); } return brokerNameSet; } - public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final String addr) throws Exception { ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); HashMap<String/* brokerName */, BrokerData> brokerAddrTable = - clusterInfoSerializeWrapper.getBrokerAddrTable(); + clusterInfoSerializeWrapper.getBrokerAddrTable(); Iterator<Map.Entry<String, BrokerData>> it = brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Map.Entry<String, BrokerData> entry = it.next(); @@ -147,8 +150,7 @@ public class CommandUtil { return entry.getKey(); } throw new Exception( - "Make sure the specified broker addr exists or the nameserver which connected is correct."); + "Make sure the specified broker addr exists or the nameserver which connected is correct."); } - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index d11dd23..a1753c1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -6,45 +6,71 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad; +import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand; +import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand; +import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand; +import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand; +import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand; +import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand; import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand; import org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand; import org.apache.rocketmq.tools.command.connection.ConsumerConnectionSubCommand; import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand; -import org.apache.rocketmq.tools.command.consumer.*; -import org.apache.rocketmq.tools.command.namesrv.*; +import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand; +import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand; +import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand; +import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand; +import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand; +import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; +import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand; +import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand; +import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand; +import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand; +import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; +import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; +import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; +import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; +import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand; +import org.apache.rocketmq.tools.command.namesrv.UpdateNamesrvConfigCommand; +import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand; import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand; import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; -import org.apache.rocketmq.tools.command.broker.*; -import org.apache.rocketmq.tools.command.message.*; -import org.apache.rocketmq.tools.command.topic.*; +import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand; +import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand; +import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand; +import org.apache.rocketmq.tools.command.topic.TopicListSubCommand; +import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand; +import org.apache.rocketmq.tools.command.topic.TopicStatusSubCommand; +import org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand; +import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand; +import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - - public class MQAdminStartup { protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>(); @@ -55,7 +81,6 @@ public class MQAdminStartup { public static void main0(String[] args, RPCHook rpcHook) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - //PackageConflictDetect.detectFastjson(); initCommand(); @@ -86,11 +111,10 @@ public class MQAdminStartup { if (cmd != null) { String[] subargs = parseSubArgs(args); - Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = - ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), - new PosixParser()); + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), + new PosixParser()); if (null == commandLine) { System.exit(-1); return; @@ -124,7 +148,6 @@ public class MQAdminStartup { initCommand(new TopicStatusSubCommand()); initCommand(new TopicClusterSubCommand()); - initCommand(new BrokerStatusSubCommand()); initCommand(new QueryMsgByIdSubCommand()); initCommand(new QueryMsgByKeySubCommand()); @@ -136,7 +159,6 @@ public class MQAdminStartup { initCommand(new SendMsgStatusCommand()); initCommand(new BrokerConsumeStatsSubCommad()); - initCommand(new ProducerConnectionSubCommand()); initCommand(new ConsumerConnectionSubCommand()); initCommand(new ConsumerProgressSubCommand()); @@ -172,7 +194,7 @@ public class MQAdminStartup { private static void initLogback() throws JoranException { String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java index 744685e..2035276 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java @@ -6,30 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; - +import org.apache.rocketmq.remoting.RPCHook; public interface SubCommand { public String commandName(); - public String commandDesc(); - public Options buildCommandlineOptions(final Options options); - public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java index 57ca907..485b58c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java @@ -16,6 +16,14 @@ */ package org.apache.rocketmq.tools.command.broker; +import java.util.Collections; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; @@ -24,12 +32,6 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.*; - public class BrokerConsumeStatsSubCommad implements SubCommand { @@ -86,14 +88,14 @@ public class BrokerConsumeStatsSubCommad implements SubCommand { ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis); System.out.printf("%-32s %-32s %-32s %-4s %-20s %-20s %-20s %s%n", - "#Topic", - "#Group", - "#Broker Name", - "#QID", - "#Broker Offset", - "#Consumer Offset", - "#Diff", - "#LastTime"); + "#Topic", + "#Group", + "#Broker Name", + "#QID", + "#Broker Offset", + "#Consumer Offset", + "#Diff", + "#LastTime"); for (Map<String, List<ConsumeStats>> map : consumeStatsList.getConsumeStatsList()) { for (Map.Entry<String, List<ConsumeStats>> entry : map.entrySet()) { String group = entry.getKey(); @@ -117,14 +119,14 @@ public class BrokerConsumeStatsSubCommad implements SubCommand { } if (offsetWrapper.getLastTimestamp() > 0) System.out.printf("%-32s %-32s %-32s %-4d %-20d %-20d %-20d %s%n", - UtilAll.frontStringAtLeast(mq.getTopic(), 32), - group, - UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), - mq.getQueueId(), - offsetWrapper.getBrokerOffset(), - offsetWrapper.getConsumerOffset(), - diff, - lastTime + UtilAll.frontStringAtLeast(mq.getTopic(), 32), + group, + UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), + mq.getQueueId(), + offsetWrapper.getBrokerOffset(), + offsetWrapper.getConsumerOffset(), + diff, + lastTime ); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java index 3f5ff79..2fad2d1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.tools.command.broker; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.remoting.RPCHook; @@ -25,15 +32,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; - public class BrokerStatusSubCommand implements SubCommand { @@ -42,13 +40,11 @@ public class BrokerStatusSubCommand implements SubCommand { return "brokerStatus"; } - @Override public String commandDesc() { return "Fetch broker runtime status data"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "Broker address"); @@ -77,7 +73,7 @@ public class BrokerStatusSubCommand implements SubCommand { printBrokerRuntimeStats(defaultMQAdminExt, brokerAddr, false); } else if (clusterName != null) { Set<String> masterSet = - CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName); + CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName); for (String ba : masterSet) { try { printBrokerRuntimeStats(defaultMQAdminExt, ba, true); @@ -87,7 +83,6 @@ public class BrokerStatusSubCommand implements SubCommand { } } - } catch (Exception e) { e.printStackTrace(); } finally { @@ -95,7 +90,8 @@ public class BrokerStatusSubCommand implements SubCommand { } } - public void printBrokerRuntimeStats(final DefaultMQAdminExt defaultMQAdminExt, final String brokerAddr, final boolean printBroker) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + public void printBrokerRuntimeStats(final DefaultMQAdminExt defaultMQAdminExt, final String brokerAddr, + final boolean printBroker) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(brokerAddr); TreeMap<String, String> tmp = new TreeMap<String, String>(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java index 71aa78b..d15ad64 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java @@ -6,24 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.broker; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.SubCommand; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; - +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; public class CleanExpiredCQSubCommand implements SubCommand { @@ -32,13 +31,11 @@ public class CleanExpiredCQSubCommand implements SubCommand { return "cleanExpiredCQ"; } - @Override public String commandDesc() { return "Clean expired ConsumeQueue on broker."; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "Broker address"); @@ -52,7 +49,6 @@ public class CleanExpiredCQSubCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java index 0e4c4b4..ca5778b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java @@ -6,24 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.broker; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.SubCommand; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; - +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; public class CleanUnusedTopicCommand implements SubCommand { @@ -32,13 +31,11 @@ public class CleanUnusedTopicCommand implements SubCommand { return "cleanUnusedTopic"; } - @Override public String commandDesc() { return "Clean unused topic on broker."; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "Broker address"); @@ -52,7 +49,6 @@ public class CleanUnusedTopicCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java index 2956264..d0a271e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java @@ -17,6 +17,13 @@ package org.apache.rocketmq.tools.command.broker; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -26,14 +33,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.Map; -import java.util.Properties; public class GetBrokerConfigCommand implements SubCommand { @Override @@ -72,29 +71,29 @@ public class GetBrokerConfigCommand implements SubCommand { defaultMQAdminExt.start(); getAndPrint(defaultMQAdminExt, - String.format("============%s============\n", brokerAddr), - brokerAddr); + String.format("============%s============\n", brokerAddr), + brokerAddr); } else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim(); defaultMQAdminExt.start(); Map<String, List<String>> masterAndSlaveMap - = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName); + = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName); for (String masterAddr : masterAndSlaveMap.keySet()) { getAndPrint( - defaultMQAdminExt, - String.format("============Master: %s============\n", masterAddr), - masterAddr + defaultMQAdminExt, + String.format("============Master: %s============\n", masterAddr), + masterAddr ); for (String slaveAddr : masterAndSlaveMap.get(masterAddr)) { getAndPrint( - defaultMQAdminExt, - String.format("============My Master: %s=====Slave: %s============\n", masterAddr, slaveAddr), - slaveAddr + defaultMQAdminExt, + String.format("============My Master: %s=====Slave: %s============\n", masterAddr, slaveAddr), + slaveAddr ); } } @@ -108,9 +107,9 @@ public class GetBrokerConfigCommand implements SubCommand { } protected void getAndPrint(final MQAdminExt defaultMQAdminExt, final String printPrefix, final String addr) - throws InterruptedException, RemotingConnectException, - UnsupportedEncodingException, RemotingTimeoutException, - MQBrokerException, RemotingSendRequestException { + throws InterruptedException, RemotingConnectException, + UnsupportedEncodingException, RemotingTimeoutException, + MQBrokerException, RemotingSendRequestException { System.out.print(printPrefix); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java index d40ba21..d770d12 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java @@ -16,33 +16,41 @@ */ package org.apache.rocketmq.tools.command.broker; +import java.io.UnsupportedEncodingException; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import java.io.UnsupportedEncodingException; +public class SendMsgStatusCommand implements SubCommand { + private static Message buildMessage(final String topic, final int messageSize) throws UnsupportedEncodingException { + Message msg = new Message(); + msg.setTopic(topic); -public class SendMsgStatusCommand implements SubCommand { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i += 11) { + sb.append("hello jodie"); + } + msg.setBody(sb.toString().getBytes(MixAll.DEFAULT_CHARSET)); + return msg; + } @Override public String commandName() { return "sendMsgStatus"; } - @Override public String commandDesc() { return "send msg to broker."; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerName", true, "Broker Name"); @@ -60,7 +68,6 @@ public class SendMsgStatusCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook); @@ -85,17 +92,4 @@ public class SendMsgStatusCommand implements SubCommand { producer.shutdown(); } } - - - private static Message buildMessage(final String topic, final int messageSize) throws UnsupportedEncodingException { - Message msg = new Message(); - msg.setTopic(topic); - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < messageSize; i += 11) { - sb.append("hello jodie"); - } - msg.setBody(sb.toString().getBytes(MixAll.DEFAULT_CHARSET)); - return msg; - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java index 1de9457..8718c9e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java @@ -6,28 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.tools.command.broker; +import java.util.Properties; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Properties; -import java.util.Set; - public class UpdateBrokerConfigSubCommand implements SubCommand { @@ -36,13 +34,11 @@ public class UpdateBrokerConfigSubCommand implements SubCommand { return "updateBrokerConfig"; } - @Override public String commandDesc() { return "Update broker's config"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "update which broker"); @@ -64,7 +60,6 @@ public class UpdateBrokerConfigSubCommand implements SubCommand { return options; } - @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -92,7 +87,7 @@ public class UpdateBrokerConfigSubCommand implements SubCommand { defaultMQAdminExt.start(); Set<String> masterSet = - CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String brokerAddr : masterSet) { try { defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java index a8bd3a8..1ae6d52 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java @@ -17,6 +17,16 @@ package org.apache.rocketmq.tools.command.cluster; +import java.math.BigDecimal; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeSet; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; @@ -24,13 +34,6 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.math.BigDecimal; -import java.text.SimpleDateFormat; -import java.util.*; public class CLusterSendMsgRTCommand implements SubCommand { @@ -90,24 +93,24 @@ public class CLusterSendMsgRTCommand implements SubCommand { ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); HashMap<String, Set<String>> clusterAddr = clusterInfoSerializeWrapper - .getClusterAddrTable(); + .getClusterAddrTable(); Set<String> clusterNames = null; long amount = !commandLine.hasOption('a') ? 50 : Long.parseLong(commandLine - .getOptionValue('a').trim()); + .getOptionValue('a').trim()); long size = !commandLine.hasOption('s') ? 128 : Long.parseLong(commandLine - .getOptionValue('s').trim()); + .getOptionValue('s').trim()); long interval = !commandLine.hasOption('i') ? 10 : Long.parseLong(commandLine - .getOptionValue('i').trim()); + .getOptionValue('i').trim()); boolean printAsTlog = !commandLine.hasOption('p') ? false : Boolean - .parseBoolean(commandLine.getOptionValue('p').trim()); + .parseBoolean(commandLine.getOptionValue('p').trim()); String machineRoom = !commandLine.hasOption('m') ? "noname" : commandLine - .getOptionValue('m').trim(); + .getOptionValue('m').trim(); if (commandLine.hasOption('c')) { clusterNames = new TreeSet<String>(); @@ -118,11 +121,11 @@ public class CLusterSendMsgRTCommand implements SubCommand { if (!printAsTlog) { System.out.printf("%-24s %-24s %-4s %-8s %-8s%n", - "#Cluster Name", - "#Broker Name", - "#RT", - "#successCount", - "#failCount" + "#Cluster Name", + "#Broker Name", + "#RT", + "#successCount", + "#failCount" ); } @@ -158,19 +161,19 @@ public class CLusterSendMsgRTCommand implements SubCommand { } } - double rt = (double) elapsed / (amount - 1); + double rt = (double)elapsed / (amount - 1); if (!printAsTlog) { System.out.printf("%-24s %-24s %-8s %-16s %-16s%n", - clusterName, - brokerName, - String.format("%.2f", rt), - successCount, - failCount + clusterName, + brokerName, + String.format("%.2f", rt), + successCount, + failCount ); } else { System.out.printf(String.format("%s|%s|%s|%s|%s%n", getCurTime(), - machineRoom, clusterName, brokerName, - new BigDecimal(rt).setScale(0, BigDecimal.ROUND_HALF_UP))); + machineRoom, clusterName, brokerName, + new BigDecimal(rt).setScale(0, BigDecimal.ROUND_HALF_UP))); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java index b649af1..eb250cf 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.tools.command.cluster; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.KVTable; @@ -26,15 +33,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - public class ClusterListSubCommand implements SubCommand { @@ -43,13 +41,11 @@ public class ClusterListSubCommand implements SubCommand { return "clusterList"; } - @Override public String commandDesc() { return "List all of clusters"; } - @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("m", "moreStats", false, "Print more stats"); @@ -89,7 +85,8 @@ public class ClusterListSubCommand implements SubCommand { } else { this.printClusterBaseInfo(defaultMQAdminExt); } - } while (enableInterval); + } + while (enableInterval); } catch (Exception e) { e.printStackTrace(); } finally { @@ -98,17 +95,17 @@ public class ClusterListSubCommand implements SubCommand { } private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, - RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException { + RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException { ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); System.out.printf("%-16s %-32s %14s %14s %14s %14s%n", - "#Cluster Name", - "#Broker Name", - "#InTotalYest", - "#OutTotalYest", - "#InTotalToday", - "#OutTotalToday" + "#Cluster Name", + "#Broker Name", + "#InTotalYest", + "#OutTotalYest", + "#InTotalToday", + "#OutTotalToday" ); Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator(); @@ -149,12 +146,12 @@ public class ClusterListSubCommand implements SubCommand { } System.out.printf("%-16s %-32s %14d %14d %14d %14d%n", - clusterName, - brokerName, - inTotalYest, - outTotalYest, - inTotalToday, - outTotalToday + clusterName, + brokerName, + inTotalYest, + outTotalYest, + inTotalToday, + outTotalToday ); } } @@ -167,21 +164,21 @@ public class ClusterListSubCommand implements SubCommand { } private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException, - RemotingSendRequestException, InterruptedException, MQBrokerException { + RemotingSendRequestException, InterruptedException, MQBrokerException { ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", - "#Cluster Name", - "#Broker Name", - "#BID", - "#Addr", - "#Version", - "#InTPS(LOAD)", - "#OutTPS(LOAD)", - "#PCWait(ms)", - "#Hour", - "#SPACE" + "#Cluster Name", + "#Broker Name", + "#BID", + "#Addr", + "#Version", + "#InTPS(LOAD)", + "#OutTPS(LOAD)", + "#PCWait(ms)", + "#Hour", + "#SPACE" ); Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator(); @@ -254,16 +251,16 @@ public class ClusterListSubCommand implements SubCommand { } System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", - clusterName, - brokerName, - next1.getKey().longValue(), - next1.getValue(), - version, - String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills), - String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills), - pageCacheLockTimeMills, - String.format("%2.2f", hour), - String.format("%.4f", space) + clusterName, + brokerName, + next1.getKey().longValue(), + next1.getValue(), + version, + String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills), + String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills), + pageCacheLockTimeMills, + String.format("%2.2f", hour), + String.format("%.4f", space) ); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java index 355e894..7f7f88d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java @@ -16,6 +16,11 @@ */ package org.apache.rocketmq.tools.command.connection; +import java.util.Iterator; +import java.util.Map.Entry; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; @@ -23,13 +28,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.Iterator; -import java.util.Map.Entry; - public class ConsumerConnectionSubCommand implements SubCommand { @@ -65,15 +63,14 @@ public class ConsumerConnectionSubCommand implements SubCommand { ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group); - int i = 1; for (Connection conn : cc.getConnectionSet()) { System.out.printf("%03d %-32s %-22s %-8s %s%n", - i++, - conn.getClientId(), - conn.getClientAddr(), - conn.getLanguage(), - MQVersion.getVersionDesc(conn.getVersion()) + i++, + conn.getClientId(), + conn.getClientAddr(), + conn.getLanguage(), + MQVersion.getVersionDesc(conn.getVersion()) ); } @@ -84,9 +81,9 @@ public class ConsumerConnectionSubCommand implements SubCommand { Entry<String, SubscriptionData> entry = it.next(); SubscriptionData sd = entry.getValue(); System.out.printf("%03d Topic: %-40s SubExpression: %s%n", - i++, - sd.getTopic(), - sd.getSubString() + i++, + sd.getTopic(), + sd.getSubString() ); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java index 0b5b0ab..387c9c8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java @@ -16,16 +16,15 @@ */ package org.apache.rocketmq.tools.command.connection; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - public class ProducerConnectionSubCommand implements SubCommand { @@ -69,11 +68,11 @@ public class ProducerConnectionSubCommand implements SubCommand { int i = 1; for (Connection conn : pc.getConnectionSet()) { System.out.printf("%04d %-32s %-22s %-8s %s%n", - i++, - conn.getClientId(), - conn.getClientAddr(), - conn.getLanguage(), - MQVersion.getVersionDesc(conn.getVersion()) + i++, + conn.getClientId(), + conn.getClientAddr(), + conn.getLanguage(), + MQVersion.getVersionDesc(conn.getVersion()) ); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java index 56e0853..3e70614 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.tools.command.consumer; +import java.util.Collections; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -30,17 +37,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; import org.slf4j.Logger; -import java.util.Collections; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; - - public class ConsumerProgressSubCommand implements SubCommand { private final Logger log = ClientLogger.getLog(); @@ -78,13 +76,13 @@ public class ConsumerProgressSubCommand implements SubCommand { Collections.sort(mqList); System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n", - "#Topic", - "#Broker Name", - "#QID", - "#Broker Offset", - "#Consumer Offset", - "#Diff", - "#LastTime"); + "#Topic", + "#Broker Name", + "#QID", + "#Broker Offset", + "#Consumer Offset", + "#Diff", + "#LastTime"); long diffTotal = 0L; for (MessageQueue mq : mqList) { @@ -97,13 +95,13 @@ public class ConsumerProgressSubCommand implements SubCommand { } catch (Exception e) { } System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n", - UtilAll.frontStringAtLeast(mq.getTopic(), 32), - UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), - mq.getQueueId(), - offsetWrapper.getBrokerOffset(), - offsetWrapper.getConsumerOffset(), - diff, - lastTime + UtilAll.frontStringAtLeast(mq.getTopic(), 32), + UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), + mq.getQueueId(), + offsetWrapper.getBrokerOffset(), + offsetWrapper.getConsumerOffset(), + diff, + lastTime ); } @@ -112,13 +110,13 @@ public class ConsumerProgressSubCommand implements SubCommand { System.out.printf("Diff Total: %d%n", diffTotal); } else { System.out.printf("%-32s %-6s %-24s %-5s %-14s %-7s %s%n", - "#Group", - "#Count", - "#Version", - "#Type", - "#Model", - "#TPS", - "#Diff Total" + "#Group", + "#Count", + "#Version", + "#Type", + "#Model", + "#TPS", + "#Diff Total" ); TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); for (String topic : topicList.getTopicList()) { @@ -143,7 +141,7 @@ public class ConsumerProgressSubCommand implements SubCommand { groupConsumeInfo.setGroup(consumerGroup); if (consumeStats != null) { - groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps()); + groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); } @@ -155,13 +153,13 @@ public class ConsumerProgressSubCommand implements SubCommand { } System.out.printf("%-32s %-6d %-24s %-5s %-14s %-7d %d%n", - UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 32), - groupConsumeInfo.getCount(), - groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE", - groupConsumeInfo.consumeTypeDesc(), - groupConsumeInfo.messageModelDesc(), - groupConsumeInfo.getConsumeTps(), - groupConsumeInfo.getDiffTotal() + UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 32), + groupConsumeInfo.getCount(), + groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE", + groupConsumeInfo.consumeTypeDesc(), + groupConsumeInfo.messageModelDesc(), + groupConsumeInfo.getConsumeTps(), + groupConsumeInfo.getDiffTotal() ); } catch (Exception e) { log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, e); @@ -177,7 +175,6 @@ public class ConsumerProgressSubCommand implements SubCommand { } } - class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { private String group; private int version; @@ -187,7 +184,6 @@ class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { private int consumeTps; private long diffTotal; - public String getGroup() { return group; } @@ -245,37 +241,31 @@ class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { return diffTotal; } - public void setDiffTotal(long diffTotal) { this.diffTotal = diffTotal; } - @Override public int compareTo(GroupConsumeInfo o) { if (this.count != o.count) { return o.count - this.count; } - return (int) (o.diffTotal - diffTotal); + return (int)(o.diffTotal - diffTotal); } - public int getConsumeTps() { return consumeTps; } - public void setConsumeTps(int consumeTps) { this.consumeTps = consumeTps; } - public int getVersion() { return version; } - public void setVersion(int version) { this.version = version; }