Repository: incubator-rocketmq
Updated Branches:
  refs/heads/ROCKETMQ-51 [created] d87223f8c


[ROCKETMQ-51] Add unit tests for SendMessageProcessor


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b636c370
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b636c370
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b636c370

Branch: refs/heads/ROCKETMQ-51
Commit: b636c370e82efc42f867dcc58d94921b40499167
Parents: 4291348
Author: yukon <yu...@apache.org>
Authored: Fri Jan 20 10:50:43 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Sat Jan 21 16:50:24 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/BrokerControllerTest.java   |  20 +-
 .../processor/SendMessageProcessorTest.java     | 200 +++++++++++++++++++
 2 files changed, 205 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b636c370/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 4a19c0c..86b9c4e 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -21,37 +21,27 @@ import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class BrokerControllerTest {
-    private static final int RESTART_NUM = 3;
-    protected Logger logger = 
LoggerFactory.getLogger(BrokerControllerTest.class);
+import static org.assertj.core.api.Assertions.assertThat;
 
+public class BrokerControllerTest {
     /**
      * Tests if the controller can be properly stopped and started.
      *
      * @throws Exception If fails.
      */
     @Test
-    public void testRestart() throws Exception {
-
-        for (int i = 0; i < RESTART_NUM; i++) {
+    public void testBrokerRestart() throws Exception {
+        for (int i = 0; i < 2; i++) {
             BrokerController brokerController = new BrokerController(//
                 new BrokerConfig(), //
                 new NettyServerConfig(), //
                 new NettyClientConfig(), //
                 new MessageStoreConfig());
-            boolean initResult = brokerController.initialize();
-            Assert.assertTrue(initResult);
-            logger.info("Broker is initialized " + initResult);
+            assertThat(brokerController.initialize());
             brokerController.start();
-            logger.info("Broker is started");
-
             brokerController.shutdown();
-            logger.info("Broker is stopped");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b636c370/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
new file mode 100644
index 0000000..d41d03a
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.net.InetSocketAddress;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import 
org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SendMessageProcessorTest {
+    private SendMessageProcessor sendMessageProcessor;
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
+    @Mock
+    private MessageStore messageStore;
+
+    @Before
+    public void init() {
+        brokerController.setMessageStore(messageStore);
+        when(messageStore.now()).thenReturn(System.currentTimeMillis());
+        Channel mockChannel = mock(Channel.class);
+        when(mockChannel.remoteAddress()).thenReturn(new 
InetSocketAddress(1024));
+        when(handlerContext.channel()).thenReturn(mockChannel);
+        when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new 
MessageExt());
+    }
+
+    @Test
+    public void testProcessRequest() throws RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        assertPutResult(ResponseCode.SUCCESS);
+    }
+
+
+    @Test
+    public void testProcessRequest_FlushTimeOut() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT);
+    }
+
+    @Test
+    public void testProcessRequest_MessageIllegal() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
+    }
+
+    @Test
+    public void testProcessRequest_CreateMappedFileFailed() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        assertPutResult(ResponseCode.SYSTEM_ERROR);
+    }
+
+    @Test
+    public void testProcessRequest_FlushSlaveTimeout() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT);
+    }
+
+    @Test
+    public void testProcessRequest_PageCacheBusy() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        assertPutResult(ResponseCode.SYSTEM_ERROR);
+    }
+
+    @Test
+    public void testProcessRequest_PropertiesTooLong() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
+    }
+
+    @Test
+    public void testProcessRequest_ServiceNotAvailable() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE);
+    }
+
+    @Test
+    public void testProcessRequest_SlaveNotAvailable() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE);
+    }
+
+    @Test
+    public void testProcessRequest_WithMsgBack() throws 
RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        final RemotingCommand request = 
createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK);
+
+        sendMessageProcessor = new SendMessageProcessor(brokerController);
+        final RemotingCommand response = 
sendMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    private RemotingCommand createSendMsgCommand(int requestCode) {
+        SendMessageRequestHeader requestHeader = new 
SendMessageRequestHeader();
+        requestHeader.setProducerGroup("FooBar_PID");
+        requestHeader.setTopic("FooBar");
+        requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
+        requestHeader.setDefaultTopicQueueNums(3);
+        requestHeader.setQueueId(1);
+        requestHeader.setSysFlag(0);
+        requestHeader.setBornTimestamp(System.currentTimeMillis());
+        requestHeader.setFlag(124);
+        requestHeader.setReconsumeTimes(0);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(requestCode, requestHeader);
+        request.addExtField("queueId", 
String.valueOf(requestHeader.getQueueId()));
+        request.addExtField("topic", String.valueOf(requestHeader.getTopic()));
+        request.addExtField("defaultTopicQueueNums", 
String.valueOf(requestHeader.getDefaultTopicQueueNums()));
+        request.addExtField("defaultTopic", requestHeader.getDefaultTopic());
+        request.addExtField("sysFlag", 
String.valueOf(requestHeader.getSysFlag()));
+        request.addExtField("flag", String.valueOf(requestHeader.getFlag()));
+        request.addExtField("bornTimestamp", 
String.valueOf(requestHeader.getBornTimestamp()));
+        return request;
+    }
+
+    private RemotingCommand createSendMsgBackCommand(int requestCode) {
+        ConsumerSendMsgBackRequestHeader requestHeader = new 
ConsumerSendMsgBackRequestHeader();
+
+        requestHeader.setMaxReconsumeTimes(3);
+        requestHeader.setDelayLevel(4);
+        requestHeader.setGroup("FooBar_PID");
+        requestHeader.setOffset(123L);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(requestCode, requestHeader);
+        request.addExtField("group", requestHeader.getGroup());
+        request.addExtField("offset", 
String.valueOf(requestHeader.getOffset()));
+        request.addExtField("delayLevel", 
String.valueOf(requestHeader.getDelayLevel()));
+        return request;
+    }
+
+    private void assertPutResult(int responseCode) throws 
RemotingCommandException {
+        final RemotingCommand request = 
createSendMsgCommand(RequestCode.SEND_MESSAGE);
+        final RemotingCommand[] response = new RemotingCommand[1];
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws 
Throwable {
+                response[0] = invocation.getArgument(0);
+                return null;
+            }
+        }).when(handlerContext).writeAndFlush(any(Object.class));
+
+        sendMessageProcessor = new SendMessageProcessor(brokerController);
+        RemotingCommand responseToReturn = 
sendMessageProcessor.processRequest(handlerContext, request);
+        if (responseToReturn != null) {
+            assertThat(response[0]).isNull();
+            response[0] = responseToReturn;
+        }
+        assertThat(response[0].getCode()).isEqualTo(responseCode);
+        assertThat(response[0].getOpaque()).isEqualTo(request.getOpaque());
+    }
+}
\ No newline at end of file

Reply via email to