[ROCKETMQ-52] Add unit tests for DefaultMQPullConsumer

Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/acf9bc37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/acf9bc37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/acf9bc37

Branch: refs/heads/ROCKETMQ-54
Commit: acf9bc37ea987456e997259c496905e44cd71461
Parents: c549854
Author: yukon <[email protected]>
Authored: Thu Jan 19 10:43:39 2017 +0800
Committer: yukon <[email protected]>
Committed: Thu Jan 19 15:45:19 2017 +0800

----------------------------------------------------------------------
 .../consumer/DefaultMQPullConsumerTest.java     | 152 +++++++++++
 .../impl/producer/DefaultMQProducerTest.java    | 250 -------------------
 .../client/producer/DefaultMQProducerTest.java  | 248 ++++++++++++++++++
 3 files changed, 400 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/acf9bc37/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java
new file mode 100644
index 0000000..6dc4ed8
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQPullConsumerTest {
+    @Spy
+    private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    private DefaultMQPullConsumer pullConsumer;
+    private String consumerGroup = "FooBarGroup";
+    private String topic = "FooBar";
+    private String brokerName = "BrokerA";
+
+    @Before
+    public void init() throws Exception {
+        pullConsumer = new DefaultMQPullConsumer(consumerGroup);
+        pullConsumer.setNamesrvAddr("127.0.0.1:9876");
+        pullConsumer.start();
+        PullAPIWrapper pullAPIWrapper = 
pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
+        Field field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(pullAPIWrapper, mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", 
false));
+    }
+
+    @After
+    public void terminate() {
+        pullConsumer.shutdown();
+    }
+
+    @Test
+    public void testPullMessage_Success() throws Exception {
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
+                PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                return createPullResult(requestHeader, PullStatus.FOUND, 
Collections.singletonList(new MessageExt()));
+            }
+        }).when(mQClientAPIImpl).pullMessage(anyString(), 
any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class));
+
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
+        PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
+        assertThat(pullResult).isNotNull();
+        assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
+        assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
+        assertThat(pullResult.getMinOffset()).isEqualTo(123);
+        assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
+        assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<>());
+    }
+
+    @Test
+    public void testPullMessage_NotFound() throws Exception{
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
+                PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                return createPullResult(requestHeader, PullStatus.NO_NEW_MSG, 
new ArrayList<MessageExt>());
+            }
+        }).when(mQClientAPIImpl).pullMessage(anyString(), 
any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class));
+
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
+        PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
+        
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
+    }
+
+    @Test
+    public void testPullMessageAsync_Success() throws Exception {
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
+                PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.singletonList(new MessageExt()));
+
+                PullCallback pullCallback = mock.getArgument(4);
+                pullCallback.onSuccess(pullResult);
+                return null;
+            }
+        }).when(mQClientAPIImpl).pullMessage(anyString(), 
any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class));
+
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
+        pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() {
+            @Override public void onSuccess(PullResult pullResult) {
+                assertThat(pullResult).isNotNull();
+                
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
+                assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 
1);
+                assertThat(pullResult.getMinOffset()).isEqualTo(123);
+                assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
+                assertThat(pullResult.getMsgFoundList()).isEqualTo(new 
ArrayList<>());
+            }
+
+            @Override public void onException(Throwable e) {
+
+            }
+        });
+    }
+
+    private PullResultExt createPullResult(PullMessageRequestHeader 
requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) {
+        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + 
messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {});
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/acf9bc37/client/src/test/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerTest.java
deleted file mode 100644
index 5946ea8..0000000
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerTest.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.impl.producer;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.hook.SendMessageContext;
-import org.apache.rocketmq.client.hook.SendMessageHook;
-import org.apache.rocketmq.client.impl.CommunicationMode;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.Spy;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class DefaultMQProducerTest {
-    @Spy
-    private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
-    @Mock
-    private MQClientAPIImpl mQClientAPIImpl;
-
-    private DefaultMQProducer producer;
-    private Message message;
-    private Message zeroMsg;
-    private String topic = "FooBar";
-    private String producerGroupPrefix = "FooBar_PID";
-
-    @Before
-    public void init() throws Exception {
-        String producerGroupTemp = producerGroupPrefix + 
System.currentTimeMillis();
-        producer = new DefaultMQProducer(producerGroupTemp);
-        producer.setNamesrvAddr("127.0.0.1:9876");
-        message = new Message(topic, new byte[] {'a'});
-        zeroMsg = new Message(topic, new byte[] {});
-
-        producer.start();
-
-        Field field = 
DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
-        field.setAccessible(true);
-        field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
-
-        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
-        field.setAccessible(true);
-        field.set(mQClientFactory, mQClientAPIImpl);
-
-        
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp,
 producer.getDefaultMQProducerImpl());
-
-        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
-            nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class))).thenCallRealMethod();
-        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
-            nullable(SendCallback.class), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
-            .thenReturn(createSendResult(SendStatus.SEND_OK));
-    }
-
-    @After
-    public void terminate() {
-        producer.shutdown();
-    }
-
-    @Test
-    public void testSendMessage_ZeroMessage() throws InterruptedException, 
RemotingException, MQBrokerException {
-        try {
-            producer.send(zeroMsg);
-            failBecauseExceptionWasNotThrown(MQClientException.class);
-        } catch (MQClientException e) {
-            assertThat(e).hasMessageContaining("message body length is zero");
-        }
-    }
-
-    @Test
-    public void testSendMessage_NoNameSrv() throws RemotingException, 
InterruptedException, MQBrokerException {
-        when(mQClientAPIImpl.getNameServerAddressList()).thenReturn(new 
ArrayList<String>());
-        try {
-            producer.send(message);
-            failBecauseExceptionWasNotThrown(MQClientException.class);
-        } catch (MQClientException e) {
-            assertThat(e).hasMessageContaining("No name server address");
-        }
-    }
-
-    @Test
-    public void testSendMessage_NoRoute() throws RemotingException, 
InterruptedException, MQBrokerException {
-        
when(mQClientAPIImpl.getNameServerAddressList()).thenReturn(Collections.singletonList("127.0.0.1:9876"));
-        try {
-            producer.send(message);
-            failBecauseExceptionWasNotThrown(MQClientException.class);
-        } catch (MQClientException e) {
-            assertThat(e).hasMessageContaining("No route info of this topic");
-        }
-    }
-
-    @Test
-    public void testSendMessageSync_Success() throws RemotingException, 
InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
-        SendResult sendResult = producer.send(message);
-
-        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
-        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
-        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
-    }
-
-    @Test
-    public void testSendMessageSync_SuccessWithHook() throws Throwable {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
-        final Throwable[] assertionErrors = new Throwable[1];
-        final CountDownLatch countDownLatch = new CountDownLatch(2);
-        producer.getDefaultMQProducerImpl().registerSendMessageHook(new 
SendMessageHook() {
-            @Override public String hookName() {
-                return "TestHook";
-            }
-
-            @Override public void sendMessageBefore(final SendMessageContext 
context) {
-                assertionErrors[0] = assertInOtherThread(new Runnable() {
-                    @Override public void run() {
-                        assertThat(context.getMessage()).isEqualTo(message);
-                        assertThat(context.getProducer()).isEqualTo(producer);
-                        
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
-                        assertThat(context.getSendResult()).isNull();
-                    }
-                });
-                countDownLatch.countDown();
-            }
-
-            @Override public void sendMessageAfter(final SendMessageContext 
context) {
-                assertionErrors[0] = assertInOtherThread(new Runnable() {
-                    @Override public void run() {
-                        assertThat(context.getMessage()).isEqualTo(message);
-                        
assertThat(context.getProducer()).isEqualTo(producer.getDefaultMQProducerImpl());
-                        
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
-                        assertThat(context.getSendResult()).isNotNull();
-                    }
-                });
-                countDownLatch.countDown();
-            }
-        });
-        SendResult sendResult = producer.send(message);
-
-        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
-        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
-        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
-
-        countDownLatch.await();
-
-        if (assertionErrors[0] != null) {
-            throw assertionErrors[0];
-        }
-    }
-
-    private TopicRouteData createTopicRoute() {
-        TopicRouteData topicRouteData = new TopicRouteData();
-
-        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
-        List<BrokerData> brokerDataList = new ArrayList<>();
-        BrokerData brokerData = new BrokerData();
-        brokerData.setBrokerName("BrokerA");
-        brokerData.setCluster("DefaultCluster");
-        HashMap<Long, String> brokerAddrs = new HashMap<>();
-        brokerAddrs.put(0L, "127.0.0.1:10911");
-        brokerData.setBrokerAddrs(brokerAddrs);
-        brokerDataList.add(brokerData);
-        topicRouteData.setBrokerDatas(brokerDataList);
-
-        List<QueueData> queueDataList = new ArrayList<>();
-        QueueData queueData = new QueueData();
-        queueData.setBrokerName("BrokerA");
-        queueData.setPerm(6);
-        queueData.setReadQueueNums(3);
-        queueData.setWriteQueueNums(4);
-        queueData.setTopicSynFlag(0);
-        queueDataList.add(queueData);
-        topicRouteData.setQueueDatas(queueDataList);
-        return topicRouteData;
-    }
-
-    private SendResult createSendResult(SendStatus sendStatus) {
-        SendResult sendResult = new SendResult();
-        sendResult.setMsgId("123");
-        sendResult.setOffsetMsgId("123");
-        sendResult.setQueueOffset(456);
-        sendResult.setSendStatus(sendStatus);
-        sendResult.setRegionId("HZ");
-        return sendResult;
-    }
-
-    private Throwable assertInOtherThread(final Runnable runnable) {
-        final Throwable[] assertionErrors = new Throwable[1];
-        Thread thread = new Thread(new Runnable() {
-            @Override public void run() {
-                try {
-                    runnable.run();
-                } catch (AssertionError e) {
-                    assertionErrors[0] = e;
-                }
-            }
-        });
-        thread.start();
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            assertionErrors[0] = e;
-        }
-        return assertionErrors[0];
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/acf9bc37/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
new file mode 100644
index 0000000..bbaff00
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQProducerTest {
+    @Spy
+    private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+
+    private DefaultMQProducer producer;
+    private Message message;
+    private Message zeroMsg;
+    private String topic = "FooBar";
+    private String producerGroupPrefix = "FooBar_PID";
+
+    @Before
+    public void init() throws Exception {
+        String producerGroupTemp = producerGroupPrefix + 
System.currentTimeMillis();
+        producer = new DefaultMQProducer(producerGroupTemp);
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        message = new Message(topic, new byte[] {'a'});
+        zeroMsg = new Message(topic, new byte[] {});
+
+        producer.start();
+
+        Field field = 
DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp,
 producer.getDefaultMQProducerImpl());
+
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
+            nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
+            nullable(SendCallback.class), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
+            .thenReturn(createSendResult(SendStatus.SEND_OK));
+    }
+
+    @After
+    public void terminate() {
+        producer.shutdown();
+    }
+
+    @Test
+    public void testSendMessage_ZeroMessage() throws InterruptedException, 
RemotingException, MQBrokerException {
+        try {
+            producer.send(zeroMsg);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("message body length is zero");
+        }
+    }
+
+    @Test
+    public void testSendMessage_NoNameSrv() throws RemotingException, 
InterruptedException, MQBrokerException {
+        when(mQClientAPIImpl.getNameServerAddressList()).thenReturn(new 
ArrayList<String>());
+        try {
+            producer.send(message);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("No name server address");
+        }
+    }
+
+    @Test
+    public void testSendMessage_NoRoute() throws RemotingException, 
InterruptedException, MQBrokerException {
+        
when(mQClientAPIImpl.getNameServerAddressList()).thenReturn(Collections.singletonList("127.0.0.1:9876"));
+        try {
+            producer.send(message);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("No route info of this topic");
+        }
+    }
+
+    @Test
+    public void testSendMessageSync_Success() throws RemotingException, 
InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        SendResult sendResult = producer.send(message);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+    }
+
+    @Test
+    public void testSendMessageSync_SuccessWithHook() throws Throwable {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        final Throwable[] assertionErrors = new Throwable[1];
+        final CountDownLatch countDownLatch = new CountDownLatch(2);
+        producer.getDefaultMQProducerImpl().registerSendMessageHook(new 
SendMessageHook() {
+            @Override public String hookName() {
+                return "TestHook";
+            }
+
+            @Override public void sendMessageBefore(final SendMessageContext 
context) {
+                assertionErrors[0] = assertInOtherThread(new Runnable() {
+                    @Override public void run() {
+                        assertThat(context.getMessage()).isEqualTo(message);
+                        assertThat(context.getProducer()).isEqualTo(producer);
+                        
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
+                        assertThat(context.getSendResult()).isNull();
+                    }
+                });
+                countDownLatch.countDown();
+            }
+
+            @Override public void sendMessageAfter(final SendMessageContext 
context) {
+                assertionErrors[0] = assertInOtherThread(new Runnable() {
+                    @Override public void run() {
+                        assertThat(context.getMessage()).isEqualTo(message);
+                        
assertThat(context.getProducer()).isEqualTo(producer.getDefaultMQProducerImpl());
+                        
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
+                        assertThat(context.getSendResult()).isNotNull();
+                    }
+                });
+                countDownLatch.countDown();
+            }
+        });
+        SendResult sendResult = producer.send(message);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+
+        countDownLatch.await();
+
+        if (assertionErrors[0] != null) {
+            throw assertionErrors[0];
+        }
+    }
+
+    private TopicRouteData createTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSynFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+
+    private SendResult createSendResult(SendStatus sendStatus) {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("123");
+        sendResult.setOffsetMsgId("123");
+        sendResult.setQueueOffset(456);
+        sendResult.setSendStatus(sendStatus);
+        sendResult.setRegionId("HZ");
+        return sendResult;
+    }
+
+    private Throwable assertInOtherThread(final Runnable runnable) {
+        final Throwable[] assertionErrors = new Throwable[1];
+        Thread thread = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    runnable.run();
+                } catch (AssertionError e) {
+                    assertionErrors[0] = e;
+                }
+            }
+        });
+        thread.start();
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            assertionErrors[0] = e;
+        }
+        return assertionErrors[0];
+    }
+}
\ No newline at end of file

Reply via email to