http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java new file mode 100644 index 0000000..24ae01d --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GetConsumerStatusCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>(); + when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + GetConsumerStatusCommand cmd = new GetConsumerStatusCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-g default-group", "-t unit-test","-i clientid"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java new file mode 100644 index 0000000..77aec53 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ResetOffsetByTimeCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + TopicRouteData topicRouteData = new TopicRouteData(); + List<BrokerData> brokerDatas = new ArrayList<>(); + HashMap<Long, String> brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList<QueueData>()); + topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + Map<MessageQueue, Long> messageQueueLongMap = new HashMap<>(); + when(mQClientAPIImpl.invokeBrokerToResetOffset(anyString(), anyString(), anyString(), anyLong(), anyBoolean(), anyLong())).thenReturn(messageQueueLongMap); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Test + public void testExecute() { + ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-g default-group", "-t unit-test", "-s 1412131213231", "-f false"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java new file mode 100644 index 0000000..a542fd5 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommandTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + + +public class ResetOffsetByTimeOldCommandTest { + @Test + public void testExecute() { + ResetOffsetByTimeOldCommand cmd = new ResetOffsetByTimeOldCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-g default-group", "-t unit-test", "-s 1412131213231", "-f false"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("default-group"); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("1412131213231"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java new file mode 100644 index 0000000..b729652 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AllocateMQSubCommandTest { + @Test + public void testExecute() { + AllocateMQSubCommand cmd = new AllocateMQSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-t unit-test", "-i 127.0.0.1:10911"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue("i").trim()).isEqualTo("127.0.0.1:10911"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java new file mode 100644 index 0000000..ae2c253 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommandTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class DeleteTopicSubCommandTest { + @Test + public void testExecute() { + DeleteTopicSubCommand cmd = new DeleteTopicSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-t unit-test", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue("c").trim()).isEqualTo("default-cluster"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java new file mode 100644 index 0000000..0003ad5 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommandTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TopicClusterSubCommandTest { + @Test + public void testExecute() { + TopicClusterSubCommand cmd = new TopicClusterSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-t unit-test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java new file mode 100644 index 0000000..6e45453 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommandTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.rocketmq.tools.command.topic; + + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TopicRouteSubCommandTest { + @Test + public void testExecute() { + TopicRouteSubCommand cmd = new TopicRouteSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-t unit-test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java new file mode 100644 index 0000000..651c539 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommandTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TopicStatusSubCommandTest { + @Test + public void testExecute() { + TopicStatusSubCommand cmd = new TopicStatusSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-t unit-test"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java new file mode 100644 index 0000000..6194095 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommandTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class UpdateOrderConfCommandTest { + @Test + public void testExecute() { + UpdateOrderConfCommand cmd = new UpdateOrderConfCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-t unit-test", "-v default-broker:8", "-m post"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('v').trim()).isEqualTo("default-broker:8"); + assertThat(commandLine.getOptionValue('m').trim()).isEqualTo("post"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java new file mode 100644 index 0000000..68fbd46 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class UpdateTopicPermSubCommandTest { + @Test + public void testExecute() { + UpdateTopicPermSubCommand cmd = new UpdateTopicPermSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-b 127.0.0.1:10911", "-c default-cluster", "-t unit-test", "-p 6"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911"); + assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6"); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java new file mode 100644 index 0000000..32dbcf1 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class UpdateTopicSubCommandTest { + @Test + public void testExecute() { + UpdateTopicSubCommand cmd = new UpdateTopicSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-b 127.0.0.1:10911", + "-c default-cluster", + "-t unit-test", + "-r 8", + "-w 8", + "-p 6", + "-o false", + "-u false", + "-s false"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911"); + assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); + assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8"); + assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8"); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6"); + assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false"); + assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false"); + assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java b/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java new file mode 100644 index 0000000..30254cb --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListenerTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.monitor; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.ConsumeStatus; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; +import org.junit.Before; +import org.junit.Test; + +import java.util.Properties; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.mockito.Mockito.mock; + +public class DefaultMonitorListenerTest { + private DefaultMonitorListener defaultMonitorListener; + + @Before + public void init() { + defaultMonitorListener = mock(DefaultMonitorListener.class); + } + + @Test + public void testBeginRound() { + defaultMonitorListener.beginRound(); + } + + @Test + public void testReportUndoneMsgs() { + UndoneMsgs undoneMsgs = new UndoneMsgs(); + undoneMsgs.setConsumerGroup("default-group"); + undoneMsgs.setTopic("unit-test"); + undoneMsgs.setUndoneMsgsDelayTimeMills(30000); + undoneMsgs.setUndoneMsgsSingleMQ(1); + undoneMsgs.setUndoneMsgsTotal(100); + defaultMonitorListener.reportUndoneMsgs(undoneMsgs); + } + + @Test + public void testReportFailedMsgs() { + FailedMsgs failedMsgs = new FailedMsgs(); + failedMsgs.setTopic("unit-test"); + failedMsgs.setConsumerGroup("default-consumer"); + failedMsgs.setFailedMsgsTotalRecently(2); + defaultMonitorListener.reportFailedMsgs(failedMsgs); + } + + @Test + public void testReportDeleteMsgsEvent() { + DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent(); + deleteMsgsEvent.setEventTimestamp(System.currentTimeMillis()); + deleteMsgsEvent.setOffsetMovedEvent(new OffsetMovedEvent()); + defaultMonitorListener.reportDeleteMsgsEvent(deleteMsgsEvent); + } + + @Test + public void testReportConsumerRunningInfo() { + TreeMap<String, ConsumerRunningInfo> criTable = new TreeMap<>(); + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>()); + consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>()); + consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>()); + consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>()); + consumerRunningInfo.setProperties(new Properties()); + criTable.put("test", consumerRunningInfo); + defaultMonitorListener.reportConsumerRunningInfo(criTable); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/78603cd6/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java new file mode 100644 index 0000000..597915c --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.monitor; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +public class MonitorServiceTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + private static MonitorConfig monitorConfig; + private static MonitorListener monitorListener; + private static DefaultMQPullConsumer defaultMQPullConsumer; + private static DefaultMQPushConsumer defaultMQPushConsumer; + private static MonitorService monitorService; + + @BeforeClass + public static void init() throws NoSuchFieldException, IllegalAccessException, RemotingException, MQClientException, InterruptedException, MQBrokerException { + monitorConfig = new MonitorConfig(); + monitorListener = new DefaultMonitorListener(); + defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); + defaultMQPushConsumer = mock(DefaultMQPushConsumer.class); + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + monitorService = new MonitorService(monitorConfig, monitorListener, null); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + + field = MonitorService.class.getDeclaredField("defaultMQAdminExt"); + field.setAccessible(true); + field.set(monitorService, defaultMQAdminExt); + field = MonitorService.class.getDeclaredField("defaultMQPullConsumer"); + field.setAccessible(true); + field.set(monitorService, defaultMQPullConsumer); + field = MonitorService.class.getDeclaredField("defaultMQPushConsumer"); + field.setAccessible(true); + field.set(monitorService, defaultMQPushConsumer); + + TopicList topicList = new TopicList(); + Set<String> topicSet = new HashSet<>(); + topicSet.add("topic_one"); + topicSet.add("topic_two"); + topicList.setTopicList(topicSet); + when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList); + + TopicRouteData topicRouteData = new TopicRouteData(); + List<BrokerData> brokerDatas = new ArrayList<>(); + HashMap<Long, String> brokerAddrs = new HashMap<>(); + brokerAddrs.put(1234l, "127.0.0.1:10911"); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster("default-cluster"); + brokerData.setBrokerName("default-broker"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(new ArrayList<QueueData>()); + topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + + ConsumeStats consumeStats = new ConsumeStats(); + consumeStats.setConsumeTps(1234); + MessageQueue messageQueue = new MessageQueue(); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>(); + stats.put(messageQueue, offsetWrapper); + consumeStats.setOffsetTable(stats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats); + + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY); + consumerConnection.setMessageModel(MessageModel.CLUSTERING); + HashSet<Connection> connections = new HashSet<>(); + Connection connection = new Connection(); + connection.setClientId("client_id"); + connection.setClientAddr("127.0.0.1:109111"); + connection.setLanguage(LanguageCode.JAVA); + connection.setVersion(MQVersion.Version.V4_0_0_SNAPSHOT.ordinal()); + connections.add(connection); + consumerConnection.setConnectionSet(connections); + consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>()); + consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection); + + ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo(); + consumerRunningInfo.setJstack("test"); + consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>()); + consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>()); + consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>()); + Properties properties = new Properties(); + properties.put(ConsumerRunningInfo.PROP_CONSUME_TYPE, CONSUME_ACTIVELY); + properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, System.currentTimeMillis()); + consumerRunningInfo.setProperties(properties); + when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); + } + + @AfterClass + public static void terminate() { + } + + @Test + public void testDoMonitorWork() throws RemotingException, MQClientException, InterruptedException { + monitorService.doMonitorWork(); + } + + @Test + public void testReportConsumerRunningInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + monitorService.reportConsumerRunningInfo("test_group"); + } +} \ No newline at end of file