[ROCKETMQ-52] Add unit tests for DefaultMQProducer

Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6511bc32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6511bc32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6511bc32

Branch: refs/heads/ROCKETMQ-54
Commit: 6511bc327126fd419801741f18a3c99f87063fe9
Parents: e06a26c
Author: yukon <[email protected]>
Authored: Wed Jan 18 16:59:48 2017 +0800
Committer: yukon <[email protected]>
Committed: Thu Jan 19 15:45:19 2017 +0800

----------------------------------------------------------------------
 .../store/LocalFileOffsetStoreTest.java         |   3 +-
 .../impl/factory/MQClientInstanceTest.java      |   6 +-
 .../impl/producer/DefaultMQProducerTest.java    | 250 +++++++++++++++++++
 3 files changed, 255 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6511bc32/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index 0b522e6..bf0adcb 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.consumer.store;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.HashSet;
 import org.apache.rocketmq.client.ClientConfig;
@@ -40,7 +41,7 @@ public class LocalFileOffsetStoreTest {
 
     @Before
     public void init() {
-        System.setProperty("rocketmq.client.localOffsetStoreDir", 
System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
+        System.setProperty("rocketmq.client.localOffsetStoreDir", 
System.getProperty("java.io.tmpdir") + File.separator + ".rocketmq_offsets");
         String clientId = new ClientConfig().buildMQClientId() + 
"#TestNamespace" + System.currentTimeMillis();
         when(mQClientFactory.getClientId()).thenReturn(clientId);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6511bc32/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index c32cdab..eedf0b1 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientInstanceTest {
-    private MQClientInstance mqClientInstance =  
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
+    private MQClientInstance mqClientInstance =  
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
     private String topic = "FooBar";
     private String group = "FooBarGroup";
 
@@ -53,7 +53,7 @@ public class MQClientInstanceTest {
         brokerData.setBrokerName("BrokerA");
         brokerData.setCluster("DefaultCluster");
         HashMap<Long, String> brokerAddrs = new HashMap<>();
-        brokerAddrs.put(0L, "127.0.0.1");
+        brokerAddrs.put(0L, "127.0.0.1:10911");
         brokerData.setBrokerAddrs(brokerAddrs);
         brokerDataList.add(brokerData);
         topicRouteData.setBrokerDatas(brokerDataList);
@@ -61,7 +61,7 @@ public class MQClientInstanceTest {
         List<QueueData> queueDataList = new ArrayList<>();
         QueueData queueData = new QueueData();
         queueData.setBrokerName("BrokerA");
-        queueData.setPerm(2);
+        queueData.setPerm(6);
         queueData.setReadQueueNums(3);
         queueData.setWriteQueueNums(4);
         queueData.setTopicSynFlag(0);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6511bc32/client/src/test/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerTest.java
new file mode 100644
index 0000000..5946ea8
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.producer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQProducerTest {
+    @Spy
+    private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+
+    private DefaultMQProducer producer;
+    private Message message;
+    private Message zeroMsg;
+    private String topic = "FooBar";
+    private String producerGroupPrefix = "FooBar_PID";
+
+    @Before
+    public void init() throws Exception {
+        String producerGroupTemp = producerGroupPrefix + 
System.currentTimeMillis();
+        producer = new DefaultMQProducer(producerGroupTemp);
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        message = new Message(topic, new byte[] {'a'});
+        zeroMsg = new Message(topic, new byte[] {});
+
+        producer.start();
+
+        Field field = 
DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp,
 producer.getDefaultMQProducerImpl());
+
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
+            nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
+            nullable(SendCallback.class), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
+            .thenReturn(createSendResult(SendStatus.SEND_OK));
+    }
+
+    @After
+    public void terminate() {
+        producer.shutdown();
+    }
+
+    @Test
+    public void testSendMessage_ZeroMessage() throws InterruptedException, 
RemotingException, MQBrokerException {
+        try {
+            producer.send(zeroMsg);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("message body length is zero");
+        }
+    }
+
+    @Test
+    public void testSendMessage_NoNameSrv() throws RemotingException, 
InterruptedException, MQBrokerException {
+        when(mQClientAPIImpl.getNameServerAddressList()).thenReturn(new 
ArrayList<String>());
+        try {
+            producer.send(message);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("No name server address");
+        }
+    }
+
+    @Test
+    public void testSendMessage_NoRoute() throws RemotingException, 
InterruptedException, MQBrokerException {
+        
when(mQClientAPIImpl.getNameServerAddressList()).thenReturn(Collections.singletonList("127.0.0.1:9876"));
+        try {
+            producer.send(message);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("No route info of this topic");
+        }
+    }
+
+    @Test
+    public void testSendMessageSync_Success() throws RemotingException, 
InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        SendResult sendResult = producer.send(message);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+    }
+
+    @Test
+    public void testSendMessageSync_SuccessWithHook() throws Throwable {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        final Throwable[] assertionErrors = new Throwable[1];
+        final CountDownLatch countDownLatch = new CountDownLatch(2);
+        producer.getDefaultMQProducerImpl().registerSendMessageHook(new 
SendMessageHook() {
+            @Override public String hookName() {
+                return "TestHook";
+            }
+
+            @Override public void sendMessageBefore(final SendMessageContext 
context) {
+                assertionErrors[0] = assertInOtherThread(new Runnable() {
+                    @Override public void run() {
+                        assertThat(context.getMessage()).isEqualTo(message);
+                        assertThat(context.getProducer()).isEqualTo(producer);
+                        
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
+                        assertThat(context.getSendResult()).isNull();
+                    }
+                });
+                countDownLatch.countDown();
+            }
+
+            @Override public void sendMessageAfter(final SendMessageContext 
context) {
+                assertionErrors[0] = assertInOtherThread(new Runnable() {
+                    @Override public void run() {
+                        assertThat(context.getMessage()).isEqualTo(message);
+                        
assertThat(context.getProducer()).isEqualTo(producer.getDefaultMQProducerImpl());
+                        
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
+                        assertThat(context.getSendResult()).isNotNull();
+                    }
+                });
+                countDownLatch.countDown();
+            }
+        });
+        SendResult sendResult = producer.send(message);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+
+        countDownLatch.await();
+
+        if (assertionErrors[0] != null) {
+            throw assertionErrors[0];
+        }
+    }
+
+    private TopicRouteData createTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSynFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+
+    private SendResult createSendResult(SendStatus sendStatus) {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("123");
+        sendResult.setOffsetMsgId("123");
+        sendResult.setQueueOffset(456);
+        sendResult.setSendStatus(sendStatus);
+        sendResult.setRegionId("HZ");
+        return sendResult;
+    }
+
+    private Throwable assertInOtherThread(final Runnable runnable) {
+        final Throwable[] assertionErrors = new Throwable[1];
+        Thread thread = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    runnable.run();
+                } catch (AssertionError e) {
+                    assertionErrors[0] = e;
+                }
+            }
+        });
+        thread.start();
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            assertionErrors[0] = e;
+        }
+        return assertionErrors[0];
+    }
+}
\ No newline at end of file

Reply via email to