http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java
new file mode 100644
index 0000000..24ae01d
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.offset;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GetConsumerStatusCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingException, 
MQClientException, MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>();
+        when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), 
anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        GetConsumerStatusCommand cmd = new GetConsumerStatusCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-g default-group", "-t unit-test","-i 
clientid"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java
new file mode 100644
index 0000000..77aec53
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.offset;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ResetOffsetByTimeCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, InterruptedException, RemotingException, 
MQClientException, MQBrokerException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        TopicRouteData topicRouteData = new TopicRouteData();
+        List<BrokerData> brokerDatas = new ArrayList<>();
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(1234l, "127.0.0.1:10911");
+        BrokerData brokerData = new BrokerData();
+        brokerData.setCluster("default-cluster");
+        brokerData.setBrokerName("default-broker");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDatas.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDatas);
+        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(topicRouteData);
+
+        Map<MessageQueue, Long> messageQueueLongMap = new HashMap<>();
+        when(mQClientAPIImpl.invokeBrokerToResetOffset(anyString(), 
anyString(), anyString(), anyLong(), anyBoolean(), 
anyLong())).thenReturn(messageQueueLongMap);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() {
+        ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-g default-group", "-t unit-test", 
"-s 1412131213231", "-f false"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java
new file mode 100644
index 0000000..a542fd5
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.offset;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class ResetOffsetByTimeOldCommandTest {
+    @Test
+    public void testExecute() {
+        ResetOffsetByTimeOldCommand cmd = new ResetOffsetByTimeOldCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-g default-group", "-t unit-test", 
"-s 1412131213231", "-f false"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("default-group");
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+        
assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("1412131213231");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java
new file mode 100644
index 0000000..b729652
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AllocateMQSubCommandTest {
+    @Test
+    public void testExecute() {
+        AllocateMQSubCommand cmd = new AllocateMQSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-t unit-test", "-i 127.0.0.1:10911"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+        
assertThat(commandLine.getOptionValue("i").trim()).isEqualTo("127.0.0.1:10911");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java
new file mode 100644
index 0000000..ae2c253
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class DeleteTopicSubCommandTest {
+    @Test
+    public void testExecute() {
+        DeleteTopicSubCommand cmd = new DeleteTopicSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-t unit-test", "-c default-cluster"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+        
assertThat(commandLine.getOptionValue("c").trim()).isEqualTo("default-cluster");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java
new file mode 100644
index 0000000..0003ad5
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TopicClusterSubCommandTest {
+    @Test
+    public void testExecute() {
+        TopicClusterSubCommand cmd = new TopicClusterSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-t unit-test"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java
new file mode 100644
index 0000000..6e45453
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.rocketmq.tools.command.topic;
+
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TopicRouteSubCommandTest {
+    @Test
+    public void testExecute() {
+        TopicRouteSubCommand cmd = new TopicRouteSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-t unit-test"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java
new file mode 100644
index 0000000..651c539
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TopicStatusSubCommandTest {
+    @Test
+    public void testExecute() {
+        TopicStatusSubCommand cmd = new TopicStatusSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-t unit-test"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java
new file mode 100644
index 0000000..6194095
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class UpdateOrderConfCommandTest {
+    @Test
+    public void testExecute() {
+        UpdateOrderConfCommand cmd = new UpdateOrderConfCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-t unit-test", "-v default-broker:8", 
"-m post"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+        
assertThat(commandLine.getOptionValue('v').trim()).isEqualTo("default-broker:8");
+        assertThat(commandLine.getOptionValue('m').trim()).isEqualTo("post");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java
new file mode 100644
index 0000000..68fbd46
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class UpdateTopicPermSubCommandTest {
+    @Test
+    public void testExecute() {
+        UpdateTopicPermSubCommand cmd = new UpdateTopicPermSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-b 127.0.0.1:10911", "-c 
default-cluster", "-t unit-test", "-p 6"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
+        
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+        assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
new file mode 100644
index 0000000..32dbcf1
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class UpdateTopicSubCommandTest {
+    @Test
+    public void testExecute() {
+        UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-b 127.0.0.1:10911",
+                "-c default-cluster",
+                "-t unit-test",
+                "-r 8",
+                "-w 8",
+                "-p 6",
+                "-o false",
+                "-u false",
+                "-s false"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), 
subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        
assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
+        
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
+        assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
+        assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
+        
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
+        assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
+        assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false");
+        assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false");
+        assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java
new file mode 100644
index 0000000..30254cb
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.monitor;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.mockito.Mockito.mock;
+
+public class DefaultMonitorListenerTest {
+    private DefaultMonitorListener defaultMonitorListener;
+
+    @Before
+    public void init() {
+        defaultMonitorListener = mock(DefaultMonitorListener.class);
+    }
+
+    @Test
+    public void testBeginRound() {
+        defaultMonitorListener.beginRound();
+    }
+
+    @Test
+    public void testReportUndoneMsgs() {
+        UndoneMsgs undoneMsgs = new UndoneMsgs();
+        undoneMsgs.setConsumerGroup("default-group");
+        undoneMsgs.setTopic("unit-test");
+        undoneMsgs.setUndoneMsgsDelayTimeMills(30000);
+        undoneMsgs.setUndoneMsgsSingleMQ(1);
+        undoneMsgs.setUndoneMsgsTotal(100);
+        defaultMonitorListener.reportUndoneMsgs(undoneMsgs);
+    }
+
+    @Test
+    public void testReportFailedMsgs() {
+        FailedMsgs failedMsgs = new FailedMsgs();
+        failedMsgs.setTopic("unit-test");
+        failedMsgs.setConsumerGroup("default-consumer");
+        failedMsgs.setFailedMsgsTotalRecently(2);
+        defaultMonitorListener.reportFailedMsgs(failedMsgs);
+    }
+
+    @Test
+    public void testReportDeleteMsgsEvent() {
+        DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent();
+        deleteMsgsEvent.setEventTimestamp(System.currentTimeMillis());
+        deleteMsgsEvent.setOffsetMovedEvent(new OffsetMovedEvent());
+        defaultMonitorListener.reportDeleteMsgsEvent(deleteMsgsEvent);
+    }
+
+    @Test
+    public void testReportConsumerRunningInfo() {
+        TreeMap<String, ConsumerRunningInfo> criTable = new TreeMap<>();
+        ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
+        consumerRunningInfo.setSubscriptionSet(new 
TreeSet<SubscriptionData>());
+        consumerRunningInfo.setStatusTable(new TreeMap<String, 
ConsumeStatus>());
+        consumerRunningInfo.setSubscriptionSet(new 
TreeSet<SubscriptionData>());
+        consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, 
ProcessQueueInfo>());
+        consumerRunningInfo.setProperties(new Properties());
+        criTable.put("test", consumerRunningInfo);
+        defaultMonitorListener.reportConsumerRunningInfo(criTable);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java 
b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
new file mode 100644
index 0000000..597915c
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.monitor;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+public class MonitorServiceTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+    private static MonitorConfig monitorConfig;
+    private static MonitorListener monitorListener;
+    private static DefaultMQPullConsumer defaultMQPullConsumer;
+    private static DefaultMQPushConsumer defaultMQPushConsumer;
+    private static MonitorService monitorService;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, 
IllegalAccessException, RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
+        monitorConfig = new MonitorConfig();
+        monitorListener = new DefaultMonitorListener();
+        defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
+        defaultMQPushConsumer = mock(DefaultMQPushConsumer.class);
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 
1000);
+        monitorService = new MonitorService(monitorConfig, monitorListener, 
null);
+
+        Field field = 
DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = 
DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        field = MonitorService.class.getDeclaredField("defaultMQAdminExt");
+        field.setAccessible(true);
+        field.set(monitorService, defaultMQAdminExt);
+        field = MonitorService.class.getDeclaredField("defaultMQPullConsumer");
+        field.setAccessible(true);
+        field.set(monitorService, defaultMQPullConsumer);
+        field = MonitorService.class.getDeclaredField("defaultMQPushConsumer");
+        field.setAccessible(true);
+        field.set(monitorService, defaultMQPushConsumer);
+
+        TopicList topicList = new TopicList();
+        Set<String> topicSet = new HashSet<>();
+        topicSet.add("topic_one");
+        topicSet.add("topic_two");
+        topicList.setTopicList(topicSet);
+        
when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
+
+        TopicRouteData topicRouteData = new TopicRouteData();
+        List<BrokerData> brokerDatas = new ArrayList<>();
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(1234l, "127.0.0.1:10911");
+        BrokerData brokerData = new BrokerData();
+        brokerData.setCluster("default-cluster");
+        brokerData.setBrokerName("default-broker");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDatas.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDatas);
+        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(topicRouteData);
+
+        ConsumeStats consumeStats = new ConsumeStats();
+        consumeStats.setConsumeTps(1234);
+        MessageQueue messageQueue = new MessageQueue();
+        OffsetWrapper offsetWrapper = new OffsetWrapper();
+        HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
+        stats.put(messageQueue, offsetWrapper);
+        consumeStats.setOffsetTable(stats);
+        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), 
anyString(), anyLong())).thenReturn(consumeStats);
+
+        ConsumerConnection consumerConnection = new ConsumerConnection();
+        consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+        consumerConnection.setMessageModel(MessageModel.CLUSTERING);
+        HashSet<Connection> connections = new HashSet<>();
+        Connection connection = new Connection();
+        connection.setClientId("client_id");
+        connection.setClientAddr("127.0.0.1:109111");
+        connection.setLanguage(LanguageCode.JAVA);
+        connection.setVersion(MQVersion.Version.V4_0_0_SNAPSHOT.ordinal());
+        connections.add(connection);
+        consumerConnection.setConnectionSet(connections);
+        consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, 
SubscriptionData>());
+        
consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        when(mQClientAPIImpl.getConsumerConnectionList(anyString(), 
anyString(), anyLong())).thenReturn(consumerConnection);
+
+        ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
+        consumerRunningInfo.setJstack("test");
+        consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, 
ProcessQueueInfo>());
+        consumerRunningInfo.setStatusTable(new TreeMap<String, 
ConsumeStatus>());
+        consumerRunningInfo.setSubscriptionSet(new 
TreeSet<SubscriptionData>());
+        Properties properties = new Properties();
+        properties.put(ConsumerRunningInfo.PROP_CONSUME_TYPE, 
CONSUME_ACTIVELY);
+        properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, 
System.currentTimeMillis());
+        consumerRunningInfo.setProperties(properties);
+        when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), 
anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
+    }
+
+    @AfterClass
+    public static void terminate() {
+    }
+
+    @Test
+    public void testDoMonitorWork() throws RemotingException, 
MQClientException, InterruptedException {
+        monitorService.doMonitorWork();
+    }
+
+    @Test
+    public void testReportConsumerRunningInfo() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
+        monitorService.reportConsumerRunningInfo("test_group");
+    }
+}
\ No newline at end of file


Reply via email to