[ROCKETMQ-57] Add unit test for all commands
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/13f4297e Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/13f4297e Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/13f4297e Branch: refs/heads/master Commit: 13f4297eea9d73e5e0a290f6589b94e16a67e948 Parents: 712dec5 Author: stevenschew <[email protected]> Authored: Sun Jan 22 20:53:13 2017 +0800 Committer: yukon <[email protected]> Committed: Sun Jan 22 20:53:13 2017 +0800 ---------------------------------------------------------------------- .../tools/admin/DefaultMQAdminExtTest.java | 86 ++++----- .../rocketmq/tools/command/CommandUtilTest.java | 9 +- .../broker/BrokerConsumeStatsSubCommadTest.java | 92 ++++++++++ .../broker/BrokerStatusSubCommandTest.java | 88 +++++++++ .../broker/CleanExpiredCQSubCommandTest.java | 84 +++++++++ .../broker/CleanUnusedTopicCommandTest.java | 84 +++++++++ .../broker/GetBrokerConfigCommandTest.java | 89 +++++++++ .../broker/SendMsgStatusCommandTest.java | 78 ++++++++ .../UpdateBrokerConfigSubCommandTest.java | 79 ++++++++ .../ConsumerConnectionSubCommandTest.java | 99 ++++++++++ .../ProducerConnectionSubCommandTest.java | 93 ++++++++++ .../ConsumerProgressSubCommandTest.java | 111 ++++++++++++ .../consumer/ConsumerStatusSubCommandTest.java | 131 ++++++++++++++ .../namesrv/GetNamesrvConfigCommandTest.java | 91 ++++++++++ .../namesrv/WipeWritePermSubCommandTest.java | 90 +++++++++ .../offset/GetConsumerStatusCommandTest.java | 85 +++++++++ .../offset/ResetOffsetByTimeCommandTest.java | 105 +++++++++++ .../offset/ResetOffsetByTimeOldCommandTest.java | 39 ++++ .../command/topic/AllocateMQSubCommandTest.java | 38 ++++ .../topic/DeleteTopicSubCommandTest.java | 38 ++++ .../topic/TopicClusterSubCommandTest.java | 37 ++++ .../command/topic/TopicRouteSubCommandTest.java | 37 ++++ .../topic/TopicStatusSubCommandTest.java | 37 ++++ .../topic/UpdateOrderConfCommandTest.java | 39 ++++ .../topic/UpdateTopicPermSubCommandTest.java | 41 +++++ .../topic/UpdateTopicSubCommandTest.java | 54 ++++++ .../monitor/DefaultMonitorListenerTest.java | 86 +++++++++ .../tools/monitor/MonitorServiceTest.java | 181 +++++++++++++++++++ 28 files changed, 2077 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index bb80eb4..7865980 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -68,8 +68,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.api.MessageTrack; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @@ -83,19 +83,20 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQAdminExtTest { - private DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); - private MQClientAPIImpl mQClientAPIImpl; - private Properties properties = new Properties(); - private TopicList topicList = new TopicList(); - private TopicRouteData topicRouteData = new TopicRouteData(); - private KVTable kvTable = new KVTable(); - private ClusterInfo clusterInfo = new ClusterInfo(); - - @Before - public void init() throws Exception { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + private static Properties properties = new Properties(); + private static TopicList topicList = new TopicList(); + private static TopicRouteData topicRouteData = new TopicRouteData(); + private static KVTable kvTable = new KVTable(); + private static ClusterInfo clusterInfo = new ClusterInfo(); + + @BeforeClass + public static void init() throws Exception { mQClientAPIImpl = mock(MQClientAPIImpl.class); - DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExt = new DefaultMQAdminExt(); defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); @@ -104,6 +105,9 @@ public class DefaultMQAdminExtTest { field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field.setAccessible(true); field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); properties.setProperty("maxMessageSize", "5000000"); properties.setProperty("flushDelayOffsetInterval", "15000"); @@ -223,15 +227,15 @@ public class DefaultMQAdminExtTest { when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); } - @After - public void terminate() throws Exception { + @AfterClass + public static void terminate() throws Exception { if (defaultMQAdminExtImpl != null) - defaultMQAdminExtImpl.shutdown(); + defaultMQAdminExt.shutdown(); } @Test public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException { - Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911"); + Properties result = defaultMQAdminExt.getBrokerConfig("127.0.0.1:10911"); assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000"); assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000"); assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350"); @@ -239,21 +243,21 @@ public class DefaultMQAdminExtTest { @Test public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { - TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList(); + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); assertThat(topicList.getTopicList().size()).isEqualTo(2); assertThat(topicList.getTopicList()).contains("topic_one"); } @Test public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911"); + KVTable brokerStats = defaultMQAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911"); assertThat(brokerStats.getTable().get("id")).isEqualTo("1234"); assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker"); } @Test public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable(); assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker"); assertThat(brokerList.containsKey("broker-test")).isTrue(); @@ -272,32 +276,32 @@ public class DefaultMQAdminExtTest { @Test public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - ConsumeStats consumeStats = defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", "unit-test"); + ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats("default-consumer-group", "unit-test"); assertThat(consumeStats.getConsumeTps()).isEqualTo(1234); } @Test public void testExamineConsumerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - ConsumerConnection consumerConnection = defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group"); + ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo("default-consumer-group"); assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY); assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING); } @Test public void testExamineProducerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - ProducerConnection producerConnection = defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", "unit-test"); + ProducerConnection producerConnection = defaultMQAdminExt.examineProducerConnectionInfo("default-producer-group", "unit-test"); assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1); } @Test public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException { - int result = defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker"); + int result = defaultMQAdminExt.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker"); assertThat(result).isEqualTo(6); } @Test public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { - TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest"); + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo("UnitTest"); assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker"); assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster"); } @@ -308,53 +312,53 @@ public class DefaultMQAdminExtTest { result.add("default-name-one"); result.add("default-name-two"); when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result); - List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList(); + List<String> nameList = defaultMQAdminExt.getNameServerAddressList(); assertThat(nameList.get(0)).isEqualTo("default-name-one"); assertThat(nameList.get(1)).isEqualTo("default-name-two"); } @Test public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException { - String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest"); + String topicConfig = defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest"); assertThat(topicConfig).isEqualTo("topicListConfig"); - KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); + KVTable kvs = defaultMQAdminExt.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one"); assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster"); } @Test public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest"); + GroupList groupList = defaultMQAdminExt.queryTopicConsumeByWho("UnitTest"); assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue(); } @Test public void testQueryConsumeTimeSpan() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - List<QueueTimeSpan> result = defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group"); + List<QueueTimeSpan> result = defaultMQAdminExt.queryConsumeTimeSpan("unit-test", "default-broker-group"); assertThat(result.size()).isEqualTo(0); } @Test public void testCleanExpiredConsumerQueue() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { - boolean result = defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster"); + boolean result = defaultMQAdminExt.cleanExpiredConsumerQueue("default-cluster"); assertThat(result).isFalse(); } @Test public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { - boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911"); + boolean clean = defaultMQAdminExt.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911"); assertThat(clean).isTrue(); } @Test public void testCleanUnusedTopic() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { - boolean result = defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster"); + boolean result = defaultMQAdminExt.cleanUnusedTopic("default-cluster"); assertThat(result).isFalse(); } @Test public void testGetConsumerRunningInfo() throws RemotingException, MQClientException, InterruptedException { - ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", false); + ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo("consumer-group", "cid_123", false); assertThat(consumerRunningInfo.getJstack()).isEqualTo("test"); } @@ -363,25 +367,25 @@ public class DefaultMQAdminExtTest { MessageExt messageExt = new MessageExt(); messageExt.setMsgId("msgId"); messageExt.setTopic("unit-test"); - List<MessageTrack> messageTrackList = defaultMQAdminExtImpl.messageTrackDetail(messageExt); + List<MessageTrack> messageTrackList = defaultMQAdminExt.messageTrackDetail(messageExt); assertThat(messageTrackList.size()).isEqualTo(2); } @Test public void testGetConsumeStatus() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911"); + Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExt.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911"); assertThat(result.size()).isEqualTo(0); } @Test public void testGetTopicClusterList() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Set<String> result = defaultMQAdminExtImpl.getTopicClusterList("unit-test"); + Set<String> result = defaultMQAdminExt.getTopicClusterList("unit-test"); assertThat(result.size()).isEqualTo(0); } @Test public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { - Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest"); + Set<String> clusterlist = defaultMQAdminExt.getClusterList("UnitTest"); assertThat(clusterlist.contains("default-cluster-one")).isTrue(); assertThat(clusterlist.contains("default-cluster-two")).isTrue(); } @@ -391,13 +395,13 @@ public class DefaultMQAdminExtTest { ConsumeStatsList result = new ConsumeStatsList(); result.setBrokerAddr("127.0.0.1:10911"); when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result); - ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000); + ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000); assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911"); } @Test public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000); + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup("127.0.0.1:10911", 10000); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one"); assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java index ba58010..33b4497 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.tools.command; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -33,9 +39,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.lang.reflect.Field; -import java.util.*; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java new file mode 100644 index 0000000..3523175 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BrokerConsumeStatsSubCommadTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + ConsumeStatsList consumeStatsList = new ConsumeStatsList(); + consumeStatsList.setBrokerAddr("127.0l.0.1:10911"); + consumeStatsList.setConsumeStatsList(new ArrayList<Map<String, List<ConsumeStats>>>()); + consumeStatsList.setTotalDiff(123); + when(mQClientAPIImpl.fetchConsumeStatsInBroker(anyString(), anyBoolean(), anyLong())).thenReturn(consumeStatsList); + } + + @AfterClass + public static void terminate() { + } + + @Test + public void testExecute() { + BrokerConsumeStatsSubCommad cmd = new BrokerConsumeStatsSubCommad(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-t 3000", "-l 5", "-o true"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java new file mode 100644 index 0000000..1b08735 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import java.util.HashMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BrokerStatusSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + KVTable kvTable = new KVTable(); + kvTable.setTable(new HashMap<String, String>()); + when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + BrokerStatusSubCommand cmd = new BrokerStatusSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java new file mode 100644 index 0000000..6fcf044 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CleanExpiredCQSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + CleanExpiredCQSubCommand cmd = new CleanExpiredCQSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java new file mode 100644 index 0000000..3ae2c48 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CleanUnusedTopicCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + when(mQClientAPIImpl.cleanUnusedTopicByAddr(anyString(), anyLong())).thenReturn(true); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + CleanUnusedTopicCommand cmd = new CleanUnusedTopicCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java new file mode 100644 index 0000000..88a8ea8 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GetBrokerConfigCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + Properties properties = new Properties(); + properties.setProperty("maxMessageSize", "5000000"); + properties.setProperty("flushDelayOffsetInterval", "15000"); + properties.setProperty("serverSocketRcvBufSize", "655350"); + when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + GetBrokerConfigCommand cmd = new GetBrokerConfigCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java new file mode 100644 index 0000000..9089a39 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class SendMsgStatusCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + SendMsgStatusCommand cmd = new SendMsgStatusCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-s 1024 -c 10"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + //cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java new file mode 100644 index 0000000..cc459ba --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class UpdateBrokerConfigSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + UpdateBrokerConfigSubCommand cmd = new UpdateBrokerConfigSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-k topicname", "-v unit_test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java new file mode 100644 index 0000000..88530e2 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.connection; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumerConnectionSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY); + consumerConnection.setMessageModel(MessageModel.CLUSTERING); + HashSet<Connection> connections = new HashSet<>(); + connections.add(new Connection()); + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>()); + consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ConsumerConnectionSubCommand cmd = new ConsumerConnectionSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-consumer-group"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java new file mode 100644 index 0000000..8df66fb --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.connection; + +import java.lang.reflect.Field; +import java.util.HashSet; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ProducerConnectionSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + ProducerConnection producerConnection = new ProducerConnection(); + Connection connection = new Connection(); + connection.setClientAddr("127.0.0.1:9898"); + connection.setClientId("PID_12345"); + HashSet<Connection> connectionSet = new HashSet<>(); + connectionSet.add(connection); + producerConnection.setConnectionSet(connectionSet); + when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ProducerConnectionSubCommand cmd = new ProducerConnectionSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-producer-group", "-t unit-test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java new file mode 100644 index 0000000..a5af04a --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.consumer; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumerProgressSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + TopicRouteData topicRouteData = new TopicRouteData(); + List<BrokerData> brokerDatas = new ArrayList<>(); + HashMap<Long, String> brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList<QueueData>()); + topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + ConsumeStats consumeStats = new ConsumeStats(); + consumeStats.setConsumeTps(1234); + MessageQueue messageQueue = new MessageQueue(); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>(); + stats.put(messageQueue, offsetWrapper); + consumeStats.setOffsetTable(stats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-group"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java new file mode 100644 index 0000000..8e846bc --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.consumer; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumeStatus; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumerStatusSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + TopicRouteData topicRouteData = new TopicRouteData(); + List<BrokerData> brokerDatas = new ArrayList<>(); + HashMap<Long, String> brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList<QueueData>()); + topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY); + consumerConnection.setMessageModel(MessageModel.CLUSTERING); + HashSet<Connection> connections = new HashSet<>(); + connections.add(new Connection()); + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>()); + consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection); + + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setJstack("test"); + consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>()); + consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>()); + consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>()); + when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ConsumerStatusSubCommand cmd = new ConsumerStatusSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g default-group", "-i cid_one"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java new file mode 100644 index 0000000..49802b9 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.namesrv; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentMatchers; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GetNamesrvConfigCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + Map<String, Properties> propertiesMap = new HashMap<>(); + List<String> nameServers = new ArrayList<>(); + when(mQClientAPIImpl.getNameServerConfig(ArgumentMatchers.<String>anyList(), anyLong())).thenReturn(propertiesMap); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + GetNamesrvConfigCommand cmd = new GetNamesrvConfigCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13f4297e/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java new file mode 100644 index 0000000..5d2781a --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.namesrv; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class WipeWritePermSubCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, RemotingCommandException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + List<String> result = new ArrayList<>(); + result.add("default-name-one"); + result.add("default-name-two"); + when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result); + when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + WipeWritePermSubCommand cmd = new WipeWritePermSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b default-broker"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file
