This is an automated email from the ASF dual-hosted git repository. dongeforever pushed a commit to branch 5.0.0-alpha-static-topic in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push: new d4a656c Fix unit test d4a656c is described below commit d4a656c18c30f85653079da3984dcef0c9ae4d4b Author: dongeforever <dongefore...@apache.org> AuthorDate: Wed Dec 15 16:41:31 2021 +0800 Fix unit test --- .../apache/rocketmq/broker/BrokerOuterAPITest.java | 2 +- .../apache/rocketmq/broker/BrokerStartupTest.java | 20 +++++- .../broker/processor/AdminBrokerProcessorTest.java | 4 +- .../broker/topic/TopicConfigManagerTest.java | 76 ---------------------- .../rocketmq/client/impl/MQClientAPIImpl.java | 12 ---- .../client/impl/factory/MQClientInstance.java | 3 +- .../store/RemoteBrokerOffsetStoreTest.java | 10 ++- .../protocol/header/GetMaxOffsetRequestHeader.java | 8 --- .../apache/rocketmq/common/ConfigManagerTest.java | 5 +- .../remoting/protocol/RemotingCommandTest.java | 10 ++- .../tools/admin/DefaultMQAdminExtTest.java | 43 +++++------- .../command/message/ConsumeMessageCommandTest.java | 4 +- 12 files changed, 57 insertions(+), 140 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java index 339ed11..daf771a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java @@ -84,7 +84,7 @@ public class BrokerOuterAPITest { private BrokerOuterAPI brokerOuterAPI; public void init() throws Exception { - brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null); + brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), brokerController); Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient"); field.setAccessible(true); field.set(brokerOuterAPI, nettyRemotingClient); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java index c8da08d..ce370a3 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.broker; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Properties; +import org.apache.rocketmq.common.MixAll; import org.junit.Assert; import org.junit.Test; @@ -34,8 +35,21 @@ public class BrokerStartupTest { Class<BrokerStartup> clazz = BrokerStartup.class; Method method = clazz.getDeclaredMethod("properties2SystemEnv", Properties.class); method.setAccessible(true); - System.setProperty("rocketmq.namesrv.domain", "value"); - method.invoke(null, properties); - Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain")); + { + properties.put("rmqAddressServerDomain", "value1"); + properties.put("rmqAddressServerSubGroup", "value2"); + method.invoke(null, properties); + Assert.assertEquals("value1", System.getProperty("rocketmq.namesrv.domain")); + Assert.assertEquals("value2", System.getProperty("rocketmq.namesrv.domain.subgroup")); + } + { + properties.put("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME); + properties.put("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP); + method.invoke(null, properties); + Assert.assertEquals(MixAll.WS_DOMAIN_NAME, System.getProperty("rocketmq.namesrv.domain")); + Assert.assertEquals(MixAll.WS_DOMAIN_SUBGROUP, System.getProperty("rocketmq.namesrv.domain.subgroup")); + } + + } } \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 3770dae..2141f2c 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -101,7 +101,7 @@ public class AdminBrokerProcessorTest { public void init() throws Exception { brokerController.setMessageStore(messageStore); - doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor(); + //doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor(); adminBrokerProcessor = new AdminBrokerProcessor(brokerController); @@ -203,7 +203,7 @@ public class AdminBrokerProcessorTest { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader); request.makeCustomHeaderToNet(); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST); assertThat(response.getRemark()).contains("No topic in this broker."); } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java deleted file mode 100644 index 4a60437..0000000 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.topic; - -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.junit.After; -import org.junit.Before; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import java.nio.file.Files; -import java.nio.file.Paths; - -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class TopicConfigManagerTest { - @Mock - private DefaultMessageStore messageStore; - @Mock - private BrokerController brokerController; - - private TopicConfigManager topicConfigManager; - - private static final String topic = "FooBar"; - private static final String broker1Name = "broker1"; - private static final String broker1Addr = "127.0.0.1:12345"; - private static final int queueId1 = 1; - private static final String broker2Name = "broker2"; - private static final String broker2Addr = "127.0.0.2:12345"; - private static final int queueId2 = 2; - - @Before - public void before() { - BrokerConfig brokerConfig = new BrokerConfig(); - brokerConfig.setBrokerName(broker1Name); - when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); - - when(brokerController.getMessageStore()).thenReturn(messageStore); - - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir")); - when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); - - this.topicConfigManager = new TopicConfigManager(brokerController); - this.topicConfigManager.getTopicConfigTable().put(topic, new TopicConfig(topic)); - } - - @After - public void after() throws Exception { - if (topicConfigManager != null) { - Files.deleteIfExists(Paths.get(topicConfigManager.configFilePath())); - } - } - -} \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 0c15cff..8f0138d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1109,26 +1109,14 @@ public class MQClientAPIImpl { public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { - return getMaxOffset(addr, topic, queueId, true, false, timeoutMillis); - } - - public long getMaxOffset(final String addr, final String topic, final int queueId, boolean committed, - boolean fromLogicalQueue, - final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); - requestHeader.setCommitted(committed); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; - HashMap<String, String> extFields = response.getExtFields(); - if (extFields != null && extFields.containsKey(MessageConst.PROPERTY_REDIRECT)) { - throw new MQRedirectException(response.getBody()); - } switch (response.getCode()) { case ResponseCode.SUCCESS: { GetMaxOffsetResponseHeader responseHeader = diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 793189e..0181951 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1020,7 +1020,8 @@ public class MQClientInstance { public String getBrokerNameFromMessageQueue(final MessageQueue mq) { - if (topicEndPointsTable.get(mq.getTopic()) != null + if (topicEndPointsTable != null + && topicEndPointsTable.get(mq.getTopic()) != null && !topicEndPointsTable.get(mq.getTopic()).isEmpty()) { return topicEndPointsTable.get(mq.getTopic()).get(mq); } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java index f762910..73cfefb 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java @@ -20,10 +20,12 @@ import java.util.Collections; import java.util.HashSet; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.OffsetNotFoundException; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -60,6 +62,7 @@ public class RemoteBrokerOffsetStoreTest { when(mQClientFactory.getClientId()).thenReturn(clientId); when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false)); when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI); + when(mQClientFactory.getBrokerNameFromMessageQueue(any())).thenReturn(brokerName); } @Test @@ -84,10 +87,15 @@ public class RemoteBrokerOffsetStoreTest { offsetStore.updateOffset(messageQueue, 1024, false); - doThrow(new MQBrokerException(-1, "", null)) + doThrow(new OffsetNotFoundException(ResponseCode.PULL_NOT_FOUND, "", null)) .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong()); assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1); + + doThrow(new MQBrokerException(-1, "", null)) + .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong()); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-2); + doThrow(new RemotingException("", null)) .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong()); assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-2); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java index 2a577d7..e961af9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java @@ -29,7 +29,6 @@ public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader { private String topic; @CFNotNull private Integer queueId; - private boolean committed; @Override public void checkFields() throws RemotingCommandException { @@ -55,11 +54,4 @@ public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader { this.queueId = queueId; } - public void setCommitted(boolean committed) { - this.committed = committed; - } - - public boolean isCommitted() { - return committed; - } } diff --git a/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java b/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java index a884b6a..a61ec4c 100644 --- a/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java @@ -15,13 +15,10 @@ package org.apache.rocketmq.common;/* * limitations under the License. */ -import org.apache.rocketmq.common.ConfigManager; -import org.apache.rocketmq.common.MixAll; -import org.junit.Test; - import java.io.File; import java.io.PrintWriter; import java.lang.reflect.Method; +import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java index f2f6935..a0fc765 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java @@ -209,7 +209,11 @@ public class RemotingCommandTest { SubExtFieldsHeader subExtFieldsHeader = new SubExtFieldsHeader(); RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(1, subExtFieldsHeader); Field[] fields = remotingCommand.getClazzFields(subExtFieldsHeader.getClass()); - Assert.assertEquals(7, fields.length); + Set<String> fieldNames = new HashSet<>(); + for (Field field: fields) { + fieldNames.add(field.getName()); + } + Assert.assertTrue(fields.length >= 7); Set<String> names = new HashSet<>(); names.add("stringValue"); names.add("intValue"); @@ -218,8 +222,8 @@ public class RemotingCommandTest { names.add("doubleValue"); names.add("name"); names.add("value"); - for (Field field : fields) { - Assert.assertTrue(names.contains(field.getName())); + for (String name: names) { + Assert.assertTrue(fieldNames.contains(name)); } remotingCommand.makeCustomHeaderToNet(); SubExtFieldsHeader other = (SubExtFieldsHeader) remotingCommand.decodeCommandCustomHeader(subExtFieldsHeader.getClass()); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index deb3d05..4514ef4 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -16,6 +16,18 @@ */ package org.apache.rocketmq.tools.admin; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -52,6 +64,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -66,25 +79,11 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -248,7 +247,8 @@ public class DefaultMQAdminExtTest { put("topic_test_examine_topicConfig", new TopicConfig("topic_test_examine_topicConfig")); } }); - when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper); + //when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper); + when(mQClientAPIImpl.getTopicConfig(anyString(), anyString(), anyLong())).thenReturn(new TopicConfigAndQueueMapping(new TopicConfig("topic_test_examine_topicConfig"), null)); } @AfterClass @@ -439,7 +439,7 @@ public class DefaultMQAdminExtTest { @Test public void testMaxOffset() throws Exception { - when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(100L); + when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(100L); assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, broker1Name, 0))).isEqualTo(100L); } @@ -451,19 +451,8 @@ public class DefaultMQAdminExtTest { assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, broker1Name, 0), System.currentTimeMillis())).isEqualTo(101L); } - @Test - public void testMaxOffset_LogicalQueue() throws Exception { - when(mQClientAPIImpl.getMaxOffset(eq(broker2Addr), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(10L); - assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX, 0))).isEqualTo(1010L); - } - @Test - public void testSearchOffset_LogicalQueue() throws Exception { - when(mQClientAPIImpl.searchOffset(eq(broker2Addr), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(11L); - - assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX, 0), System.currentTimeMillis())).isEqualTo(1011L); - } @Test public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException { diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java index 1154395..25aa3f8 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java @@ -113,7 +113,7 @@ public class ConsumeMessageCommandTest { @Test public void testExecuteDefaultWhenPullMessageByQueueGotException() throws SubCommandException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, IllegalAccessException { DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); - when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class); + when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(MQClientException.class); Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer"); producerField.setAccessible(true); producerField.set(consumeMessageCommand, defaultMQPullConsumer); @@ -135,7 +135,7 @@ public class ConsumeMessageCommandTest { @Test public void testExecuteByConditionWhenPullMessageByQueueGotException() throws IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException { DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); - when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class); + when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(MQClientException.class); Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer"); producerField.setAccessible(true); producerField.set(consumeMessageCommand, defaultMQPullConsumer);