[ROCKETMQ-57] Add unit test for DefaultMQAdminExt

Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f8d881ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f8d881ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f8d881ba

Branch: refs/heads/master
Commit: f8d881babe3f8f8810f1303912b2fb0ef2badea0
Parents: 0de84e2
Author: stevenschew <[email protected]>
Authored: Fri Jan 20 14:53:43 2017 +0800
Committer: stevenschew <[email protected]>
Committed: Fri Jan 20 14:53:43 2017 +0800

----------------------------------------------------------------------
 .../tools/admin/DefaultMQAdminExtTest.java      | 136 ++++++++++++++++++-
 1 file changed, 130 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f8d881ba/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index 5964cd1..da3bd8c 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -22,15 +22,21 @@ import 
org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.admin.*;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
 import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,8 +49,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -93,6 +98,8 @@ public class DefaultMQAdminExtTest {
         brokerData.setBrokerAddrs(brokerAddrs);
         brokerDatas.add(brokerData);
         topicRouteData.setBrokerDatas(brokerDatas);
+        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(topicRouteData);
 
         HashMap<String, String> result = new HashMap<>();
@@ -105,6 +112,7 @@ public class DefaultMQAdminExtTest {
         brokerAddrTable.put("default-broker", brokerData);
         brokerAddrTable.put("broker-test", new BrokerData());
         clusterInfo.setBrokerAddrTable(brokerAddrTable);
+        clusterInfo.setClusterAddrTable(new HashMap<String, Set<String>>());
         
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
         when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), 
anyLong())).thenReturn(true);
 
@@ -140,6 +148,52 @@ public class DefaultMQAdminExtTest {
         kv.put("cluster-name", "default-cluster");
         kvTable.setTable(kv);
         when(mQClientAPIImpl.getKVListByNamespace(anyString(), 
anyLong())).thenReturn(kvTable);
+
+        ConsumeStats consumeStats = new ConsumeStats();
+        consumeStats.setConsumeTps(1234);
+        MessageQueue messageQueue = new MessageQueue();
+        OffsetWrapper offsetWrapper = new OffsetWrapper();
+        HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
+        stats.put(messageQueue, offsetWrapper);
+        consumeStats.setOffsetTable(stats);
+        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), 
anyString(), anyLong())).thenReturn(consumeStats);
+
+        ConsumerConnection consumerConnection = new ConsumerConnection();
+        consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+        consumerConnection.setMessageModel(MessageModel.CLUSTERING);
+        HashSet<Connection> connections = new HashSet<>();
+        connections.add(new Connection());
+        consumerConnection.setConnectionSet(connections);
+        consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, 
SubscriptionData>());
+        
consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        when(mQClientAPIImpl.getConsumerConnectionList(anyString(), 
anyString(), anyLong())).thenReturn(consumerConnection);
+
+        ProducerConnection producerConnection = new ProducerConnection();
+        Connection connection = new Connection();
+        connection.setClientAddr("127.0.0.1:9898");
+        connection.setClientId("PID_12345");
+        HashSet<Connection> connectionSet = new HashSet<Connection>();
+        connectionSet.add(connection);
+        producerConnection.setConnectionSet(connectionSet);
+        when(mQClientAPIImpl.getProducerConnectionList(anyString(), 
anyString(), anyLong())).thenReturn(producerConnection);
+
+        when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), 
anyLong())).thenReturn(6);
+
+        TopicStatsTable topicStatsTable = new TopicStatsTable();
+        topicStatsTable.setOffsetTable(new HashMap<MessageQueue, 
TopicOffset>());
+
+        Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>();
+        when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), 
anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus);
+
+        List<QueueTimeSpan> queueTimeSpanList = new ArrayList<>();
+        when(mQClientAPIImpl.queryConsumeTimeSpan(anyString(), anyString(), 
anyString(), anyLong())).thenReturn(queueTimeSpanList);
+
+        ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
+        consumerRunningInfo.setJstack("test");
+        consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, 
ProcessQueueInfo>());
+        consumerRunningInfo.setStatusTable(new TreeMap<String, 
ConsumeStatus>());
+        consumerRunningInfo.setSubscriptionSet(new 
TreeSet<SubscriptionData>());
+        when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), 
anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
     }
 
     @After
@@ -191,6 +245,31 @@ public class DefaultMQAdminExtTest {
     }
 
     @Test
+    public void testExamineConsumeStats() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
+        ConsumeStats consumeStats = 
defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", 
"unit-test");
+        assertThat(consumeStats.getConsumeTps()).isEqualTo(1234);
+    }
+
+    @Test
+    public void testExamineConsumerConnectionInfo() throws 
InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        ConsumerConnection consumerConnection = 
defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group");
+        
assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY);
+        
assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
+    }
+
+    @Test
+    public void testExamineProducerConnectionInfo() throws 
InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        ProducerConnection producerConnection = 
defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", 
"unit-test");
+        assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1);
+    }
+
+    @Test
+    public void testWipeWritePermOfBroker() throws InterruptedException, 
RemotingCommandException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException, RemotingConnectException {
+        int result = 
defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
+        assertThat(result).isEqualTo(6);
+    }
+
+    @Test
     public void testExamineTopicRouteInfo() throws RemotingException, 
MQClientException, InterruptedException {
         TopicRouteData topicRouteData = 
defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
         
assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
@@ -225,12 +304,57 @@ public class DefaultMQAdminExtTest {
     }
 
     @Test
+    public void testQueryConsumeTimeSpan() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
+        List<QueueTimeSpan> result = 
defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group");
+        assertThat(result.size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testCleanExpiredConsumerQueue() throws InterruptedException, 
RemotingTimeoutException, MQClientException, RemotingSendRequestException, 
RemotingConnectException {
+        boolean result = 
defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster");
+        assertThat(result).isFalse();
+    }
+
+    @Test
     public void testCleanExpiredConsumerQueueByAddr() throws 
InterruptedException, RemotingTimeoutException, MQClientException, 
RemotingSendRequestException, RemotingConnectException {
         boolean clean = 
defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
         assertThat(clean).isTrue();
     }
 
     @Test
+    public void testCleanUnusedTopic() throws InterruptedException, 
RemotingTimeoutException, MQClientException, RemotingSendRequestException, 
RemotingConnectException {
+        boolean result = 
defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster");
+        assertThat(result).isFalse();
+    }
+
+    @Test
+    public void testGetConsumerRunningInfo() throws RemotingException, 
MQClientException, InterruptedException {
+        ConsumerRunningInfo consumerRunningInfo = 
defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", 
false);
+        assertThat(consumerRunningInfo.getJstack()).isEqualTo("test");
+    }
+
+    @Test
+    public void testMessageTrackDetail() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
+        MessageExt messageExt = new MessageExt();
+        messageExt.setMsgId("msgId");
+        messageExt.setTopic("unit-test");
+        List<MessageTrack> messageTrackList = 
defaultMQAdminExtImpl.messageTrackDetail(messageExt);
+        assertThat(messageTrackList.size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testGetConsumeStatus() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
+        Map<String, Map<MessageQueue, Long>> result = 
defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", 
"127.0.0.1:10911");
+        assertThat(result.size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testGetTopicClusterList() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
+        Set<String> result = 
defaultMQAdminExtImpl.getTopicClusterList("unit-test");
+        assertThat(result.size()).isEqualTo(0);
+    }
+
+    @Test
     public void testGetClusterList() throws InterruptedException, 
RemotingTimeoutException, MQClientException, RemotingSendRequestException, 
RemotingConnectException {
         Set<String> clusterlist = 
defaultMQAdminExtImpl.getClusterList("UnitTest");
         assertThat(clusterlist.contains("default-cluster-one")).isTrue();

Reply via email to