[ROCKETMQ-52] Add unit tests for MQClientAPIImpl

Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/032bb2b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/032bb2b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/032bb2b3

Branch: refs/heads/ROCKETMQ-57
Commit: 032bb2b32e6fd288063b70690d318628db1a6abb
Parents: ba79706
Author: yukon <[email protected]>
Authored: Tue Jan 17 20:46:16 2017 +0800
Committer: yukon <[email protected]>
Committed: Thu Jan 19 15:45:19 2017 +0800

----------------------------------------------------------------------
 .../client/common/ThreadLocalIndexTest.java     |   1 -
 .../store/LocalFileOffsetStoreTest.java         |   1 -
 .../store/RemoteBrokerOffsetStoreTest.java      |   1 -
 .../client/impl/MQClientAPIImplTest.java        | 236 +++++++++++++++++++
 .../impl/factory/MQClientInstanceTest.java      |   7 +-
 .../remoting/protocol/RemotingCommand.java      |   4 +-
 6 files changed, 238 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/032bb2b3/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
index 350cde7..1be93ce 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
@@ -21,7 +21,6 @@ import org.junit.Test;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class ThreadLocalIndexTest {
-
     @Test
     public void testGetAndIncrement() throws Exception {
         ThreadLocalIndex localIndex = new ThreadLocalIndex();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/032bb2b3/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index 58d99d6..0a54855 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -30,7 +30,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.when;
 
 public class LocalFileOffsetStoreTest {
-
     @Mock
     private static MQClientInstance mQClientFactory;
     private static String group = "FooBarGroup";

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/032bb2b3/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 9a6a4b5..7ecb022 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -43,7 +43,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class RemoteBrokerOffsetStoreTest {
-
     @Mock
     private static MQClientInstance mQClientFactory;
     @Mock

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/032bb2b3/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
new file mode 100644
index 0000000..cbcf560
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class MQClientAPIImplTest {
+    private static MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new 
NettyClientConfig(), null, null, new ClientConfig());
+    @Mock
+    private static RemotingClient remotingClient;
+    @Mock
+    private static DefaultMQProducerImpl defaultMQProducerImpl;
+
+    private String brokerAddr = "127.0.0.1";
+    private String brokerName = "DefaultBroker";
+    private static String group = "FooBarGroup";
+    private static String topic = "FooBar";
+    private Message msg = new Message("FooBar", new byte[] {});
+
+    @BeforeClass
+    public static void init() throws Exception {
+        remotingClient = mock(NettyRemotingClient.class);
+        defaultMQProducerImpl = mock(DefaultMQProducerImpl.class);
+        Field field = MQClientAPIImpl.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(mqClientAPI, remotingClient);
+    }
+
+    @Test
+    public void testSendMessageOneWay_Success() throws RemotingException, 
InterruptedException, MQBrokerException {
+        doNothing().when(remotingClient).invokeOneway(anyString(), 
any(RemotingCommand.class), anyLong());
+        SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, 
brokerName, msg, new SendMessageRequestHeader(),
+            3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), 
defaultMQProducerImpl);
+        assertThat(sendResult).isNull();
+    }
+
+    @Test
+    public void testSendMessageOneWay_WithException() throws 
RemotingException, InterruptedException, MQBrokerException {
+        doThrow(new RemotingTimeoutException("Remoting Exception in 
Test")).when(remotingClient).invokeOneway(anyString(), 
any(RemotingCommand.class), anyLong());
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new 
SendMessageRequestHeader(),
+                3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), 
defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(RemotingException.class);
+        } catch (RemotingException e) {
+            assertThat(e).hasMessage("Remoting Exception in Test");
+        }
+
+        doThrow(new InterruptedException("Interrupted Exception in 
Test")).when(remotingClient).invokeOneway(anyString(), 
any(RemotingCommand.class), anyLong());
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new 
SendMessageRequestHeader(),
+                3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), 
defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(InterruptedException.class);
+        } catch (InterruptedException e) {
+            assertThat(e).hasMessage("Interrupted Exception in Test");
+        }
+    }
+
+    @Test
+    public void testSendMessageSync_Success() throws InterruptedException, 
RemotingException, MQBrokerException {
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                return createSuccessResponse(request);
+            }
+        }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
+
+        SendMessageRequestHeader requestHeader = 
createSendMessageRequestHeader();
+
+        SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, 
brokerName, msg, requestHeader,
+            3 * 1000, CommunicationMode.SYNC, new SendMessageContext(), 
defaultMQProducerImpl);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+        assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+    }
+
+    @Test
+    public void testSendMessageSync_WithException() throws 
InterruptedException, RemotingException, MQBrokerException {
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setOpaque(request.getOpaque());
+                response.setRemark("Broker is broken.");
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
+
+        SendMessageRequestHeader requestHeader = 
createSendMessageRequestHeader();
+
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, requestHeader,
+                3 * 1000, CommunicationMode.SYNC, new SendMessageContext(), 
defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(MQBrokerException.class);
+        } catch (MQBrokerException e) {
+            assertThat(e).hasMessageContaining("Broker is broken.");
+        }
+    }
+
+    @Test
+    public void testSendMessageAsync_Success() throws RemotingException, 
InterruptedException, MQBrokerException {
+        doNothing().when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, 
brokerName, msg, new SendMessageRequestHeader(),
+            3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), 
defaultMQProducerImpl);
+        assertThat(sendResult).isNull();
+
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
+                InvokeCallback callback = mock.getArgument(3);
+                RemotingCommand request = mock.getArgument(1);
+                ResponseFuture responseFuture = new 
ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
+                
responseFuture.setResponseCommand(createSuccessResponse(request));
+                callback.operationComplete(responseFuture);
+                return null;
+            }
+        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        SendMessageContext sendMessageContext = new SendMessageContext();
+        sendMessageContext.setProducer(new DefaultMQProducerImpl(new 
DefaultMQProducer()));
+        mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new 
SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
+            new SendCallback() {
+                @Override public void onSuccess(SendResult sendResult) {
+                    
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                    assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                    assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+                    
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+                }
+
+                @Override public void onException(Throwable e) {
+                }
+            },
+            null, null, 0, sendMessageContext, defaultMQProducerImpl);
+    }
+
+    @Test
+    public void testSendMessageAsync_WithException() throws RemotingException, 
InterruptedException, MQBrokerException {
+        doThrow(new RemotingTimeoutException("Remoting Exception in 
Test")).when(remotingClient)
+            .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), 
any(InvokeCallback.class));
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new 
SendMessageRequestHeader(),
+                3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), 
defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(RemotingException.class);
+        } catch (RemotingException e) {
+            assertThat(e).hasMessage("Remoting Exception in Test");
+        }
+
+        doThrow(new InterruptedException("Interrupted Exception in 
Test")).when(remotingClient)
+            .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), 
any(InvokeCallback.class));
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new 
SendMessageRequestHeader(),
+                3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), 
defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(InterruptedException.class);
+        } catch (InterruptedException e) {
+            assertThat(e).hasMessage("Interrupted Exception in Test");
+        }
+    }
+
+    private RemotingCommand createSuccessResponse(RemotingCommand request) {
+        RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setOpaque(request.getOpaque());
+
+        SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) 
response.readCustomHeader();
+        responseHeader.setMsgId("123");
+        responseHeader.setQueueId(1);
+        responseHeader.setQueueOffset(123L);
+
+        response.addExtField(MessageConst.PROPERTY_MSG_REGION, "RegionHZ");
+        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, "true");
+        response.addExtField("queueId", 
String.valueOf(responseHeader.getQueueId()));
+        response.addExtField("msgId", responseHeader.getMsgId());
+        response.addExtField("queueOffset", 
String.valueOf(responseHeader.getQueueOffset()));
+        return response;
+    }
+
+    private SendMessageRequestHeader createSendMessageRequestHeader() {
+        SendMessageRequestHeader requestHeader = new 
SendMessageRequestHeader();
+        requestHeader.setBornTimestamp(System.currentTimeMillis());
+        requestHeader.setTopic(topic);
+        requestHeader.setProducerGroup(group);
+        requestHeader.setQueueId(1);
+        requestHeader.setMaxReconsumeTimes(10);
+        return requestHeader;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/032bb2b3/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index 956cdd9..b7b07c6 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
 import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
@@ -34,7 +33,6 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
-import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -42,11 +40,8 @@ import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientInstanceTest {
-
-    @Mock
-    private static MQClientAPIImpl mqClientAPI;
     @InjectMocks
-    private static MQClientInstance mqClientInstance =  
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
+    private MQClientInstance mqClientInstance =  
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
 
     private String topic = "FooBar";
     private String group = "FooBarGroup";

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/032bb2b3/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
----------------------------------------------------------------------
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 46ca8dd..99c13fb 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -114,9 +114,7 @@ public class RemotingCommand {
     }
 
     public static RemotingCommand createResponseCommand(Class<? extends 
CommandCustomHeader> classHeader) {
-        RemotingCommand cmd = 
createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any 
response code", classHeader);
-
-        return cmd;
+        return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, 
"not set any response code", classHeader);
     }
 
     /**

Reply via email to