[ROCKETMQ-57] Add unit test for all commands

Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/13f4297e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/13f4297e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/13f4297e

Branch: refs/heads/master
Commit: 13f4297eea9d73e5e0a290f6589b94e16a67e948
Parents: 712dec5
Author: stevenschew <[email protected]>
Authored: Sun Jan 22 20:53:13 2017 +0800
Committer: yukon <[email protected]>
Committed: Sun Jan 22 20:53:13 2017 +0800

----------------------------------------------------------------------
 .../tools/admin/DefaultMQAdminExtTest.java      |  86 ++++-----
 .../rocketmq/tools/command/CommandUtilTest.java |   9 +-
 .../broker/BrokerConsumeStatsSubCommadTest.java |  92 ++++++++++
 .../broker/BrokerStatusSubCommandTest.java      |  88 +++++++++
 .../broker/CleanExpiredCQSubCommandTest.java    |  84 +++++++++
 .../broker/CleanUnusedTopicCommandTest.java     |  84 +++++++++
 .../broker/GetBrokerConfigCommandTest.java      |  89 +++++++++
 .../broker/SendMsgStatusCommandTest.java        |  78 ++++++++
 .../UpdateBrokerConfigSubCommandTest.java       |  79 ++++++++
 .../ConsumerConnectionSubCommandTest.java       |  99 ++++++++++
 .../ProducerConnectionSubCommandTest.java       |  93 ++++++++++
 .../ConsumerProgressSubCommandTest.java         | 111 ++++++++++++
 .../consumer/ConsumerStatusSubCommandTest.java  | 131 ++++++++++++++
 .../namesrv/GetNamesrvConfigCommandTest.java    |  91 ++++++++++
 .../namesrv/WipeWritePermSubCommandTest.java    |  90 +++++++++
 .../offset/GetConsumerStatusCommandTest.java    |  85 +++++++++
 .../offset/ResetOffsetByTimeCommandTest.java    | 105 +++++++++++
 .../offset/ResetOffsetByTimeOldCommandTest.java |  39 ++++
 .../command/topic/AllocateMQSubCommandTest.java |  38 ++++
 .../topic/DeleteTopicSubCommandTest.java        |  38 ++++
 .../topic/TopicClusterSubCommandTest.java       |  37 ++++
 .../command/topic/TopicRouteSubCommandTest.java |  37 ++++
 .../topic/TopicStatusSubCommandTest.java        |  37 ++++
 .../topic/UpdateOrderConfCommandTest.java       |  39 ++++
 .../topic/UpdateTopicPermSubCommandTest.java    |  41 +++++
 .../topic/UpdateTopicSubCommandTest.java        |  54 ++++++
 .../monitor/DefaultMonitorListenerTest.java     |  86 +++++++++
 .../tools/monitor/MonitorServiceTest.java       | 181 +++++++++++++++++++
 28 files changed, 2077 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index bb80eb4..7865980 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -68,8 +68,8 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -83,19 +83,20 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class DefaultMQAdminExtTest {
-    private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
-    private MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
-    private MQClientAPIImpl mQClientAPIImpl;
-    private Properties properties = new Properties();
-    private TopicList topicList = new TopicList();
-    private TopicRouteData topicRouteData = new TopicRouteData();
-    private KVTable kvTable = new KVTable();
-    private ClusterInfo clusterInfo = new ClusterInfo();
-
-    @Before
-    public void init() throws Exception {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+    private static Properties properties = new Properties();
+    private static TopicList topicList = new TopicList();
+    private static TopicRouteData topicRouteData = new TopicRouteData();
+    private static KVTable kvTable = new KVTable();
+    private static ClusterInfo clusterInfo = new ClusterInfo();
+
+    @BeforeClass
+    public static void init() throws Exception {
         mQClientAPIImpl = mock(MQClientAPIImpl.class);
-        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExt = new DefaultMQAdminExt();
         defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
 
         Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
@@ -104,6 +105,9 @@ public class DefaultMQAdminExtTest {
         field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
         field.setAccessible(true);
         field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
 
         properties.setProperty("maxMessageSize", "5000000");
         properties.setProperty("flushDelayOffsetInterval", "15000");
@@ -223,15 +227,15 @@ public class DefaultMQAdminExtTest {
         when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), 
anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
     }
 
-    @After
-    public void terminate() throws Exception {
+    @AfterClass
+    public static void terminate() throws Exception {
         if (defaultMQAdminExtImpl != null)
-            defaultMQAdminExtImpl.shutdown();
+            defaultMQAdminExt.shutdown();
     }
 
     @Test
     public void testUpdateBrokerConfig() throws InterruptedException, 
RemotingConnectException, UnsupportedEncodingException, 
RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
-        Properties result = 
defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
+        Properties result = 
defaultMQAdminExt.getBrokerConfig("127.0.0.1:10911");
         assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
         
assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
         
assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
@@ -239,21 +243,21 @@ public class DefaultMQAdminExtTest {
 
     @Test
     public void testFetchAllTopicList() throws RemotingException, 
MQClientException, InterruptedException {
-        TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
+        TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
         assertThat(topicList.getTopicList().size()).isEqualTo(2);
         assertThat(topicList.getTopicList()).contains("topic_one");
     }
 
     @Test
     public void testFetchBrokerRuntimeStats() throws InterruptedException, 
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
-        KVTable brokerStats = 
defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
+        KVTable brokerStats = 
defaultMQAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911");
         assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
         
assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
     }
 
     @Test
     public void testExamineBrokerClusterInfo() throws InterruptedException, 
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
-        ClusterInfo clusterInfo = 
defaultMQAdminExtImpl.examineBrokerClusterInfo();
+        ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
         HashMap<String, BrokerData> brokerList = 
clusterInfo.getBrokerAddrTable();
         
assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
         assertThat(brokerList.containsKey("broker-test")).isTrue();
@@ -272,32 +276,32 @@ public class DefaultMQAdminExtTest {
 
     @Test
     public void testExamineConsumeStats() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
-        ConsumeStats consumeStats = 
defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", 
"unit-test");
+        ConsumeStats consumeStats = 
defaultMQAdminExt.examineConsumeStats("default-consumer-group", "unit-test");
         assertThat(consumeStats.getConsumeTps()).isEqualTo(1234);
     }
 
     @Test
     public void testExamineConsumerConnectionInfo() throws 
InterruptedException, RemotingException, MQClientException, MQBrokerException {
-        ConsumerConnection consumerConnection = 
defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group");
+        ConsumerConnection consumerConnection = 
defaultMQAdminExt.examineConsumerConnectionInfo("default-consumer-group");
         
assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY);
         
assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
     }
 
     @Test
     public void testExamineProducerConnectionInfo() throws 
InterruptedException, RemotingException, MQClientException, MQBrokerException {
-        ProducerConnection producerConnection = 
defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", 
"unit-test");
+        ProducerConnection producerConnection = 
defaultMQAdminExt.examineProducerConnectionInfo("default-producer-group", 
"unit-test");
         assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1);
     }
 
     @Test
     public void testWipeWritePermOfBroker() throws InterruptedException, 
RemotingCommandException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException, RemotingConnectException {
-        int result = 
defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
+        int result = defaultMQAdminExt.wipeWritePermOfBroker("127.0.0.1:9876", 
"default-broker");
         assertThat(result).isEqualTo(6);
     }
 
     @Test
     public void testExamineTopicRouteInfo() throws RemotingException, 
MQClientException, InterruptedException {
-        TopicRouteData topicRouteData = 
defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
+        TopicRouteData topicRouteData = 
defaultMQAdminExt.examineTopicRouteInfo("UnitTest");
         
assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
         
assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
     }
@@ -308,53 +312,53 @@ public class DefaultMQAdminExtTest {
         result.add("default-name-one");
         result.add("default-name-two");
         
when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
-        List<String> nameList = 
defaultMQAdminExtImpl.getNameServerAddressList();
+        List<String> nameList = defaultMQAdminExt.getNameServerAddressList();
         assertThat(nameList.get(0)).isEqualTo("default-name-one");
         assertThat(nameList.get(1)).isEqualTo("default-name-two");
     }
 
     @Test
     public void testPutKVConfig() throws RemotingException, MQClientException, 
InterruptedException {
-        String topicConfig = 
defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, 
"UnitTest");
+        String topicConfig = 
defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, 
"UnitTest");
         assertThat(topicConfig).isEqualTo("topicListConfig");
-        KVTable kvs = 
defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+        KVTable kvs = 
defaultMQAdminExt.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
         assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
         
assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
     }
 
     @Test
     public void testQueryTopicConsumeByWho() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
-        GroupList groupList = 
defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
+        GroupList groupList = 
defaultMQAdminExt.queryTopicConsumeByWho("UnitTest");
         
assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
     }
 
     @Test
     public void testQueryConsumeTimeSpan() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
-        List<QueueTimeSpan> result = 
defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group");
+        List<QueueTimeSpan> result = 
defaultMQAdminExt.queryConsumeTimeSpan("unit-test", "default-broker-group");
         assertThat(result.size()).isEqualTo(0);
     }
 
     @Test
     public void testCleanExpiredConsumerQueue() throws InterruptedException, 
RemotingTimeoutException, MQClientException, RemotingSendRequestException, 
RemotingConnectException {
-        boolean result = 
defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster");
+        boolean result = 
defaultMQAdminExt.cleanExpiredConsumerQueue("default-cluster");
         assertThat(result).isFalse();
     }
 
     @Test
     public void testCleanExpiredConsumerQueueByAddr() throws 
InterruptedException, RemotingTimeoutException, MQClientException, 
RemotingSendRequestException, RemotingConnectException {
-        boolean clean = 
defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
+        boolean clean = 
defaultMQAdminExt.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
         assertThat(clean).isTrue();
     }
 
     @Test
     public void testCleanUnusedTopic() throws InterruptedException, 
RemotingTimeoutException, MQClientException, RemotingSendRequestException, 
RemotingConnectException {
-        boolean result = 
defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster");
+        boolean result = defaultMQAdminExt.cleanUnusedTopic("default-cluster");
         assertThat(result).isFalse();
     }
 
     @Test
     public void testGetConsumerRunningInfo() throws RemotingException, 
MQClientException, InterruptedException {
-        ConsumerRunningInfo consumerRunningInfo = 
defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", 
false);
+        ConsumerRunningInfo consumerRunningInfo = 
defaultMQAdminExt.getConsumerRunningInfo("consumer-group", "cid_123", false);
         assertThat(consumerRunningInfo.getJstack()).isEqualTo("test");
     }
 
@@ -363,25 +367,25 @@ public class DefaultMQAdminExtTest {
         MessageExt messageExt = new MessageExt();
         messageExt.setMsgId("msgId");
         messageExt.setTopic("unit-test");
-        List<MessageTrack> messageTrackList = 
defaultMQAdminExtImpl.messageTrackDetail(messageExt);
+        List<MessageTrack> messageTrackList = 
defaultMQAdminExt.messageTrackDetail(messageExt);
         assertThat(messageTrackList.size()).isEqualTo(2);
     }
 
     @Test
     public void testGetConsumeStatus() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
-        Map<String, Map<MessageQueue, Long>> result = 
defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", 
"127.0.0.1:10911");
+        Map<String, Map<MessageQueue, Long>> result = 
defaultMQAdminExt.getConsumeStatus("unit-test", "default-broker-group", 
"127.0.0.1:10911");
         assertThat(result.size()).isEqualTo(0);
     }
 
     @Test
     public void testGetTopicClusterList() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
-        Set<String> result = 
defaultMQAdminExtImpl.getTopicClusterList("unit-test");
+        Set<String> result = 
defaultMQAdminExt.getTopicClusterList("unit-test");
         assertThat(result.size()).isEqualTo(0);
     }
 
     @Test
     public void testGetClusterList() throws InterruptedException, 
RemotingTimeoutException, MQClientException, RemotingSendRequestException, 
RemotingConnectException {
-        Set<String> clusterlist = 
defaultMQAdminExtImpl.getClusterList("UnitTest");
+        Set<String> clusterlist = defaultMQAdminExt.getClusterList("UnitTest");
         assertThat(clusterlist.contains("default-cluster-one")).isTrue();
         assertThat(clusterlist.contains("default-cluster-two")).isTrue();
     }
@@ -391,13 +395,13 @@ public class DefaultMQAdminExtTest {
         ConsumeStatsList result = new ConsumeStatsList();
         result.setBrokerAddr("127.0.0.1:10911");
         
when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911",
 false, 10000)).thenReturn(result);
-        ConsumeStatsList consumeStatsList = 
defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 
10000);
+        ConsumeStatsList consumeStatsList = 
defaultMQAdminExt.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
         
assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
     }
 
     @Test
     public void testGetAllSubscriptionGroup() throws InterruptedException, 
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
-        SubscriptionGroupWrapper subscriptionGroupWrapper = 
defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
+        SubscriptionGroupWrapper subscriptionGroupWrapper = 
defaultMQAdminExt.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
         
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
         
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
         
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java 
b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
index ba58010..33b4497 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.rocketmq.tools.command;
 
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -33,9 +39,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
-import java.util.*;
-
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java
new file mode 100644
index 0000000..3523175
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BrokerConsumeStatsSubCommadTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        ConsumeStatsList consumeStatsList = new ConsumeStatsList();
+        consumeStatsList.setBrokerAddr("127.0l.0.1:10911");
+        consumeStatsList.setConsumeStatsList(new ArrayList<Map<String, 
List<ConsumeStats>>>());
+        consumeStatsList.setTotalDiff(123);
+        when(mQClientAPIImpl.fetchConsumeStatsInBroker(anyString(), 
anyBoolean(), anyLong())).thenReturn(consumeStatsList);
+    }
+
+    @AfterClass
+    public static void terminate() {
+    }
+
+    @Test
+    public void testExecute() {
+        BrokerConsumeStatsSubCommad cmd = new BrokerConsumeStatsSubCommad();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-t 3000", "-l 
5", "-o true"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java
new file mode 100644
index 0000000..1b08735
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BrokerStatusSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        KVTable kvTable = new KVTable();
+        kvTable.setTable(new HashMap<String, String>());
+        when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), 
anyLong())).thenReturn(kvTable);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        BrokerStatusSubCommand cmd = new BrokerStatusSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c 
default-cluster"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java
new file mode 100644
index 0000000..6fcf044
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.lang.reflect.Field;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CleanExpiredCQSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), 
anyLong())).thenReturn(true);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        CleanExpiredCQSubCommand cmd = new CleanExpiredCQSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c 
default-cluster"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java
new file mode 100644
index 0000000..3ae2c48
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.lang.reflect.Field;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CleanUnusedTopicCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        when(mQClientAPIImpl.cleanUnusedTopicByAddr(anyString(), 
anyLong())).thenReturn(true);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        CleanUnusedTopicCommand cmd = new CleanUnusedTopicCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c 
default-cluster"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java
new file mode 100644
index 0000000..88a8ea8
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GetBrokerConfigCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException, UnsupportedEncodingException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        Properties properties = new Properties();
+        properties.setProperty("maxMessageSize", "5000000");
+        properties.setProperty("flushDelayOffsetInterval", "15000");
+        properties.setProperty("serverSocketRcvBufSize", "655350");
+        when(mQClientAPIImpl.getBrokerConfig(anyString(), 
anyLong())).thenReturn(properties);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        GetBrokerConfigCommand cmd = new GetBrokerConfigCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c 
default-cluster"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java
new file mode 100644
index 0000000..9089a39
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.lang.reflect.Field;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class SendMsgStatusCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        SendMsgStatusCommand cmd = new SendMsgStatusCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-s 1024 -c 
10"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        //cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java
new file mode 100644
index 0000000..cc459ba
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class UpdateBrokerConfigSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException, UnsupportedEncodingException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        UpdateBrokerConfigSubCommand cmd = new UpdateBrokerConfigSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c 
default-cluster", "-k topicname", "-v unit_test"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java
new file mode 100644
index 0000000..88530e2
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.connection;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumerConnectionSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        ConsumerConnection consumerConnection = new ConsumerConnection();
+        consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+        consumerConnection.setMessageModel(MessageModel.CLUSTERING);
+        HashSet<Connection> connections = new HashSet<>();
+        connections.add(new Connection());
+        consumerConnection.setConnectionSet(connections);
+        consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, 
SubscriptionData>());
+        
consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        when(mQClientAPIImpl.getConsumerConnectionList(anyString(), 
anyString(), anyLong())).thenReturn(consumerConnection);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        ConsumerConnectionSubCommand cmd = new ConsumerConnectionSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-g default-consumer-group"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java
new file mode 100644
index 0000000..8df66fb
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.connection;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ProducerConnectionSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        ProducerConnection producerConnection = new ProducerConnection();
+        Connection connection = new Connection();
+        connection.setClientAddr("127.0.0.1:9898");
+        connection.setClientId("PID_12345");
+        HashSet<Connection> connectionSet = new HashSet<>();
+        connectionSet.add(connection);
+        producerConnection.setConnectionSet(connectionSet);
+        when(mQClientAPIImpl.getProducerConnectionList(anyString(), 
anyString(), anyLong())).thenReturn(producerConnection);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        ProducerConnectionSubCommand cmd = new ProducerConnectionSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-g default-producer-group", "-t 
unit-test"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java
new file mode 100644
index 0000000..a5af04a
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumerProgressSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingException, 
MQClientException, MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        TopicRouteData topicRouteData = new TopicRouteData();
+        List<BrokerData> brokerDatas = new ArrayList<>();
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(1234l, "127.0.0.1:10911");
+        BrokerData brokerData = new BrokerData();
+        brokerData.setCluster("default-cluster");
+        brokerData.setBrokerName("default-broker");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDatas.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDatas);
+        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(topicRouteData);
+
+        ConsumeStats consumeStats = new ConsumeStats();
+        consumeStats.setConsumeTps(1234);
+        MessageQueue messageQueue = new MessageQueue();
+        OffsetWrapper offsetWrapper = new OffsetWrapper();
+        HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
+        stats.put(messageQueue, offsetWrapper);
+        consumeStats.setOffsetTable(stats);
+        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), 
anyString(), anyLong())).thenReturn(consumeStats);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-g default-group"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java
new file mode 100644
index 0000000..8e846bc
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumerStatusSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingException, 
MQClientException, MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        TopicRouteData topicRouteData = new TopicRouteData();
+        List<BrokerData> brokerDatas = new ArrayList<>();
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(1234l, "127.0.0.1:10911");
+        BrokerData brokerData = new BrokerData();
+        brokerData.setCluster("default-cluster");
+        brokerData.setBrokerName("default-broker");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDatas.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDatas);
+        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(topicRouteData);
+
+        ConsumerConnection consumerConnection = new ConsumerConnection();
+        consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+        consumerConnection.setMessageModel(MessageModel.CLUSTERING);
+        HashSet<Connection> connections = new HashSet<>();
+        connections.add(new Connection());
+        consumerConnection.setConnectionSet(connections);
+        consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, 
SubscriptionData>());
+        
consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        when(mQClientAPIImpl.getConsumerConnectionList(anyString(), 
anyString(), anyLong())).thenReturn(consumerConnection);
+
+        ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
+        consumerRunningInfo.setJstack("test");
+        consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, 
ProcessQueueInfo>());
+        consumerRunningInfo.setStatusTable(new TreeMap<String, 
ConsumeStatus>());
+        consumerRunningInfo.setSubscriptionSet(new 
TreeSet<SubscriptionData>());
+        when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), 
anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        ConsumerStatusSubCommand cmd = new ConsumerStatusSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-g default-group", "-i cid_one"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java
new file mode 100644
index 0000000..49802b9
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.namesrv;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GetNamesrvConfigCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException, UnsupportedEncodingException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        Map<String, Properties> propertiesMap = new HashMap<>();
+        List<String> nameServers = new ArrayList<>();
+        
when(mQClientAPIImpl.getNameServerConfig(ArgumentMatchers.<String>anyList(), 
anyLong())).thenReturn(propertiesMap);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        GetNamesrvConfigCommand cmd = new GetNamesrvConfigCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java
new file mode 100644
index 0000000..5d2781a
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.namesrv;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class WipeWritePermSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingTimeoutException, 
MQClientException, RemotingSendRequestException, RemotingConnectException, 
MQBrokerException, RemotingCommandException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        List<String> result = new ArrayList<>();
+        result.add("default-name-one");
+        result.add("default-name-two");
+        
when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
+        when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), 
anyLong())).thenReturn(6);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        WipeWritePermSubCommand cmd = new WipeWritePermSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b default-broker"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, 
cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file


Reply via email to