[ROCKETMQ-57] Add unit test for DefaultMQAdminExt
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f8d881ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f8d881ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f8d881ba Branch: refs/heads/ROCKETMQ-57 Commit: f8d881babe3f8f8810f1303912b2fb0ef2badea0 Parents: 0de84e2 Author: stevenschew <[email protected]> Authored: Fri Jan 20 14:53:43 2017 +0800 Committer: stevenschew <[email protected]> Committed: Fri Jan 20 14:53:43 2017 +0800 ---------------------------------------------------------------------- .../tools/admin/DefaultMQAdminExtTest.java | 136 ++++++++++++++++++- 1 file changed, 130 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f8d881ba/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 5964cd1..da3bd8c 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -22,15 +22,21 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.admin.*; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.*; +import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -43,8 +49,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -93,6 +98,8 @@ public class DefaultMQAdminExtTest { brokerData.setBrokerAddrs(brokerAddrs); brokerDatas.add(brokerData); topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList<QueueData>()); + topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); HashMap<String, String> result = new HashMap<>(); @@ -105,6 +112,7 @@ public class DefaultMQAdminExtTest { brokerAddrTable.put("default-broker", brokerData); brokerAddrTable.put("broker-test", new BrokerData()); clusterInfo.setBrokerAddrTable(brokerAddrTable); + clusterInfo.setClusterAddrTable(new HashMap<String, Set<String>>()); when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo); when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); @@ -140,6 +148,52 @@ public class DefaultMQAdminExtTest { kv.put("cluster-name", "default-cluster"); kvTable.setTable(kv); when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable); + + ConsumeStats consumeStats = new ConsumeStats(); + consumeStats.setConsumeTps(1234); + MessageQueue messageQueue = new MessageQueue(); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>(); + stats.put(messageQueue, offsetWrapper); + consumeStats.setOffsetTable(stats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats); + + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY); + consumerConnection.setMessageModel(MessageModel.CLUSTERING); + HashSet<Connection> connections = new HashSet<>(); + connections.add(new Connection()); + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>()); + consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection); + + ProducerConnection producerConnection = new ProducerConnection(); + Connection connection = new Connection(); + connection.setClientAddr("127.0.0.1:9898"); + connection.setClientId("PID_12345"); + HashSet<Connection> connectionSet = new HashSet<Connection>(); + connectionSet.add(connection); + producerConnection.setConnectionSet(connectionSet); + when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection); + + when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6); + + TopicStatsTable topicStatsTable = new TopicStatsTable(); + topicStatsTable.setOffsetTable(new HashMap<MessageQueue, TopicOffset>()); + + Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>(); + when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus); + + List<QueueTimeSpan> queueTimeSpanList = new ArrayList<>(); + when(mQClientAPIImpl.queryConsumeTimeSpan(anyString(), anyString(), anyString(), anyLong())).thenReturn(queueTimeSpanList); + + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setJstack("test"); + consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>()); + consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>()); + consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>()); + when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); } @After @@ -191,6 +245,31 @@ public class DefaultMQAdminExtTest { } @Test + public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + ConsumeStats consumeStats = defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", "unit-test"); + assertThat(consumeStats.getConsumeTps()).isEqualTo(1234); + } + + @Test + public void testExamineConsumerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + ConsumerConnection consumerConnection = defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group"); + assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY); + assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING); + } + + @Test + public void testExamineProducerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + ProducerConnection producerConnection = defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", "unit-test"); + assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1); + } + + @Test + public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException { + int result = defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker"); + assertThat(result).isEqualTo(6); + } + + @Test public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest"); assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker"); @@ -225,12 +304,57 @@ public class DefaultMQAdminExtTest { } @Test + public void testQueryConsumeTimeSpan() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + List<QueueTimeSpan> result = defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group"); + assertThat(result.size()).isEqualTo(0); + } + + @Test + public void testCleanExpiredConsumerQueue() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + boolean result = defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster"); + assertThat(result).isFalse(); + } + + @Test public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911"); assertThat(clean).isTrue(); } @Test + public void testCleanUnusedTopic() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + boolean result = defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster"); + assertThat(result).isFalse(); + } + + @Test + public void testGetConsumerRunningInfo() throws RemotingException, MQClientException, InterruptedException { + ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", false); + assertThat(consumerRunningInfo.getJstack()).isEqualTo("test"); + } + + @Test + public void testMessageTrackDetail() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + MessageExt messageExt = new MessageExt(); + messageExt.setMsgId("msgId"); + messageExt.setTopic("unit-test"); + List<MessageTrack> messageTrackList = defaultMQAdminExtImpl.messageTrackDetail(messageExt); + assertThat(messageTrackList.size()).isEqualTo(2); + } + + @Test + public void testGetConsumeStatus() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911"); + assertThat(result.size()).isEqualTo(0); + } + + @Test + public void testGetTopicClusterList() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + Set<String> result = defaultMQAdminExtImpl.getTopicClusterList("unit-test"); + assertThat(result.size()).isEqualTo(0); + } + + @Test public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest"); assertThat(clusterlist.contains("default-cluster-one")).isTrue();
