[ROCKETMQ-51] Add unit tests for ClientManageProcessor

Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b29c318c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b29c318c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b29c318c

Branch: refs/heads/ROCKETMQ-54
Commit: b29c318cdde225ef3a33a73e939e49e087766a28
Parents: 6729967
Author: yukon <[email protected]>
Authored: Sat Jan 21 23:13:01 2017 +0800
Committer: yukon <[email protected]>
Committed: Sat Jan 21 23:21:37 2017 +0800

----------------------------------------------------------------------
 .../processor/ClientManageProcessorTest.java    | 132 +++++++++++++++++++
 .../processor/PullMessageProcessorTest.java     |  38 +++++-
 .../processor/SendMessageProcessorTest.java     |  44 ++++++-
 3 files changed, 207 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b29c318c/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
new file mode 100644
index 0000000..147c732
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.HashMap;
+import java.util.UUID;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import 
org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ClientManageProcessorTest {
+    private ClientManageProcessor clientManageProcessor;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    @Mock
+    private Channel channel;
+
+    private ClientChannelInfo clientChannelInfo;
+    private String clientId = UUID.randomUUID().toString();
+    private String group = "FooBarGroup";
+    private String topic = "FooBar";
+
+    @Before
+    public void init() {
+        when(handlerContext.channel()).thenReturn(channel);
+        clientManageProcessor = new ClientManageProcessor(brokerController);
+        clientChannelInfo = new ClientChannelInfo(channel, clientId, 
LanguageCode.JAVA, 100);
+        brokerController.getProducerManager().registerProducer(group, 
clientChannelInfo);
+
+        ConsumerData consumerData = createConsumerData(group, topic);
+        brokerController.getConsumerManager().registerConsumer(
+            consumerData.getGroupName(),
+            clientChannelInfo,
+            consumerData.getConsumeType(),
+            consumerData.getMessageModel(),
+            consumerData.getConsumeFromWhere(),
+            consumerData.getSubscriptionDataSet(),
+            false);
+    }
+
+    @Test
+    public void processRequest_UnRegisterProducer() throws Exception {
+        brokerController.getProducerManager().registerProducer(group, 
clientChannelInfo);
+        HashMap<Channel, ClientChannelInfo> channelMap = 
brokerController.getProducerManager().getGroupChannelTable().get(group);
+        assertThat(channelMap).isNotNull();
+        assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo);
+
+        RemotingCommand request = createUnRegisterProducerCommand();
+        RemotingCommand response = 
clientManageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        channelMap = 
brokerController.getProducerManager().getGroupChannelTable().get(group);
+        assertThat(channelMap).isNull();
+    }
+
+    @Test
+    public void processRequest_UnRegisterConsumer() throws 
RemotingCommandException {
+        ConsumerGroupInfo consumerGroupInfo = 
brokerController.getConsumerManager().getConsumerGroupInfo(group);
+        assertThat(consumerGroupInfo).isNotNull();
+
+        RemotingCommand request = createUnRegisterConsumerCommand();
+        RemotingCommand response = 
clientManageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        consumerGroupInfo = 
brokerController.getConsumerManager().getConsumerGroupInfo(group);
+        assertThat(consumerGroupInfo).isNull();
+    }
+
+    private RemotingCommand createUnRegisterProducerCommand() {
+        UnregisterClientRequestHeader requestHeader = new 
UnregisterClientRequestHeader();
+        requestHeader.setClientID(clientId);
+        requestHeader.setProducerGroup(group);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, 
requestHeader);
+        request.setLanguage(LanguageCode.JAVA);
+        request.setVersion(100);
+        request.makeCustomHeaderToNet();
+        return request;
+    }
+
+    private RemotingCommand createUnRegisterConsumerCommand() {
+        UnregisterClientRequestHeader requestHeader = new 
UnregisterClientRequestHeader();
+        requestHeader.setClientID(clientId);
+        requestHeader.setConsumerGroup(group);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, 
requestHeader);
+        request.setLanguage(LanguageCode.JAVA);
+        request.setVersion(100);
+        request.makeCustomHeaderToNet();
+        return request;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b29c318c/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index 9c3ec67..d3d9812 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -19,10 +19,14 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -78,7 +82,7 @@ public class PullMessageProcessorTest {
         when(handlerContext.channel()).thenReturn(mockChannel);
         
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig());
         clientChannelInfo = new ClientChannelInfo(mockChannel);
-        ConsumerData consumerData = createConsumerData();
+        ConsumerData consumerData = createConsumerData(group, topic);
         brokerController.getConsumerManager().registerConsumer(
             consumerData.getGroupName(),
             clientChannelInfo,
@@ -131,6 +135,36 @@ public class PullMessageProcessorTest {
     }
 
     @Test
+    public void testProcessRequest_FoundWithHook() throws 
RemotingCommandException {
+        GetMessageResult getMessageResult = createGetMessageResult();
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+        List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
+        final ConsumeMessageContext[] messageContext = new 
ConsumeMessageContext[1];
+        ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() {
+            @Override public String hookName() {
+                return "TestHook";
+            }
+
+            @Override public void consumeMessageBefore(ConsumeMessageContext 
context) {
+                messageContext[0] = context;
+            }
+
+            @Override public void consumeMessageAfter(ConsumeMessageContext 
context) {
+            }
+        };
+        consumeMessageHookList.add(consumeMessageHook);
+        
pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
+        final RemotingCommand request = 
createPullMsgCommand(RequestCode.PULL_MESSAGE);
+        RemotingCommand response = 
pullMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        assertThat(messageContext[0]).isNotNull();
+        assertThat(messageContext[0].getConsumerGroup()).isEqualTo(group);
+        assertThat(messageContext[0].getTopic()).isEqualTo(topic);
+        assertThat(messageContext[0].getQueueId()).isEqualTo(1);
+    }
+
+    @Test
     public void testProcessRequest_MsgWasRemoving() throws 
RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult();
         getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
@@ -170,7 +204,7 @@ public class PullMessageProcessorTest {
         return request;
     }
 
-    private ConsumerData createConsumerData() {
+    static ConsumerData createConsumerData(String group, String topic) {
         ConsumerData consumerData = new ConsumerData();
         
consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b29c318c/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 2cf8d45..02490a0 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -19,7 +19,11 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -64,6 +68,9 @@ public class SendMessageProcessorTest {
     @Mock
     private MessageStore messageStore;
 
+    private String topic = "FooBar";
+    private String group = "FooBarGroup";
+
     @Before
     public void init() {
         brokerController.setMessageStore(messageStore);
@@ -72,6 +79,7 @@ public class SendMessageProcessorTest {
         when(mockChannel.remoteAddress()).thenReturn(new 
InetSocketAddress(1024));
         when(handlerContext.channel()).thenReturn(mockChannel);
         when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new 
MessageExt());
+        sendMessageProcessor = new SendMessageProcessor(brokerController);
     }
 
     @Test
@@ -80,6 +88,33 @@ public class SendMessageProcessorTest {
         assertPutResult(ResponseCode.SUCCESS);
     }
 
+    @Test
+    public void testProcessRequest_WithHook() throws RemotingCommandException {
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        List<SendMessageHook> sendMessageHookList = new ArrayList<>();
+        final SendMessageContext[] sendMessageContext = new 
SendMessageContext[1];
+        SendMessageHook sendMessageHook = new SendMessageHook() {
+            @Override public String hookName() {
+                return null;
+            }
+
+            @Override public void sendMessageBefore(SendMessageContext 
context) {
+                sendMessageContext[0] = context;
+            }
+
+            @Override public void sendMessageAfter(SendMessageContext context) 
{
+
+            }
+        };
+        sendMessageHookList.add(sendMessageHook);
+        sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
+        assertPutResult(ResponseCode.SUCCESS);
+        System.out.println(sendMessageContext[0]);
+        assertThat(sendMessageContext[0]).isNotNull();
+        assertThat(sendMessageContext[0].getTopic()).isEqualTo(topic);
+        assertThat(sendMessageContext[0].getProducerGroup()).isEqualTo(group);
+    }
+
 
     @Test
     public void testProcessRequest_FlushTimeOut() throws 
RemotingCommandException {
@@ -142,8 +177,8 @@ public class SendMessageProcessorTest {
 
     private RemotingCommand createSendMsgCommand(int requestCode) {
         SendMessageRequestHeader requestHeader = new 
SendMessageRequestHeader();
-        requestHeader.setProducerGroup("FooBar_PID");
-        requestHeader.setTopic("FooBar");
+        requestHeader.setProducerGroup(group);
+        requestHeader.setTopic(topic);
         requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
         requestHeader.setDefaultTopicQueueNums(3);
         requestHeader.setQueueId(1);
@@ -153,6 +188,7 @@ public class SendMessageProcessorTest {
         requestHeader.setReconsumeTimes(0);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(requestCode, requestHeader);
+        request.setBody(new byte[] {'a'});
         request.makeCustomHeaderToNet();
         return request;
     }
@@ -162,7 +198,7 @@ public class SendMessageProcessorTest {
 
         requestHeader.setMaxReconsumeTimes(3);
         requestHeader.setDelayLevel(4);
-        requestHeader.setGroup("FooBar_PID");
+        requestHeader.setGroup(group);
         requestHeader.setOffset(123L);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(requestCode, requestHeader);
@@ -179,8 +215,6 @@ public class SendMessageProcessorTest {
                 return null;
             }
         }).when(handlerContext).writeAndFlush(any(Object.class));
-
-        sendMessageProcessor = new SendMessageProcessor(brokerController);
         RemotingCommand responseToReturn = 
sendMessageProcessor.processRequest(handlerContext, request);
         if (responseToReturn != null) {
             assertThat(response[0]).isNull();

Reply via email to