Repository: incubator-rocketmq
Updated Branches:
  refs/heads/TravisCI_Test 68278b37f -> a3aa00a2c (forced update)


[ROCKETMQ-52] Resolve infinite loop issue in rocketmq-client UT


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/a3aa00a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/a3aa00a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/a3aa00a2

Branch: refs/heads/TravisCI_Test
Commit: a3aa00a2ca3f92c6bc7d8babe542dd52723d2954
Parents: a3aff81
Author: yukon <[email protected]>
Authored: Mon Jan 23 11:05:01 2017 +0800
Committer: yukon <[email protected]>
Committed: Mon Jan 23 13:01:19 2017 +0800

----------------------------------------------------------------------
 .../consumer/DefaultMQPushConsumerTest.java     | 45 ++++++++++----------
 1 file changed, 23 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a3aa00a2/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 048e456..2e0af5a 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import org.apache.rocketmq.client.ClientConfig;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -35,7 +35,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
 import 
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -50,7 +49,6 @@ import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
@@ -58,7 +56,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Spy;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
@@ -69,17 +66,16 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class DefaultMQPushConsumerTest {
     private String consumerGroup;
     private String topic = "FooBar";
     private String brokerName = "BrokerA";
-    @Spy
-    private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private MQClientInstance mQClientFactory;
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
     private PullAPIWrapper pullAPIWrapper;
@@ -89,7 +85,6 @@ public class DefaultMQPushConsumerTest {
     @Before
     public void init() throws Exception {
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
-        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, 
consumerGroup, false));
         pushConsumer = new DefaultMQPushConsumer(consumerGroup);
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
         pushConsumer.setPullInterval(60 * 1000);
@@ -106,10 +101,10 @@ public class DefaultMQPushConsumerTest {
         Field field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
         field.setAccessible(true);
         field.set(pushConsumerImpl, rebalancePushImpl);
-
         pushConsumer.subscribe(topic, "*");
         pushConsumer.start();
 
+        mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
         field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
         field.set(pushConsumerImpl, mQClientFactory);
@@ -119,15 +114,17 @@ public class DefaultMQPushConsumerTest {
         field.setAccessible(true);
         field.set(mQClientFactory, mQClientAPIImpl);
 
+        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, 
consumerGroup, false));
         field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
         field.setAccessible(true);
         field.set(pushConsumerImpl, pullAPIWrapper);
 
         
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
         mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
-        mQClientFactory.start();
 
-        doAnswer(new Answer() {
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class),
+            anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
+            .thenAnswer(new Answer<Object>() {
             @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
                 PullMessageRequestHeader requestHeader = mock.getArgument(1);
                 MessageClientExt messageClientExt = new MessageClientExt();
@@ -140,17 +137,15 @@ public class DefaultMQPushConsumerTest {
                 messageClientExt.setStoreHost(new InetSocketAddress(8080));
                 PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
                 ((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
-                return null;
+                return pullResult;
             }
-        }).when(mQClientAPIImpl).pullMessage(anyString(), 
any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class));
+        });
 
         doReturn(new FindBrokerResult("127.0.0.1:10911", 
false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean());
         
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(),
 anyString());
         Set<MessageQueue> messageQueueSet = new HashSet<>();
         messageQueueSet.add(createPullRequest().getMessageQueue());
         
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, 
messageQueueSet);
-        
doReturn(messageQueueSet).when(mQClientAPIImpl).lockBatchMQ(anyString(), 
any(LockBatchRequestBody.class), anyLong());
-
         
doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class));
     }
 
@@ -180,23 +175,26 @@ public class DefaultMQPushConsumerTest {
     }
 
     @Test
-    public void testPullMessage_SuccessWithOrderlyService() throws 
InterruptedException, RemotingException, MQBrokerException {
+    public void testPullMessage_SuccessWithOrderlyService() throws Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         final MessageExt[] messageExts = new MessageExt[1];
-        
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new 
ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new 
MessageListenerOrderly() {
+
+        MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
             @Override
             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
ConsumeOrderlyContext context) {
                 messageExts[0] = msgs.get(0);
                 countDownLatch.countDown();
                 return null;
             }
-        }));
+        };
+        pushConsumer.registerMessageListener(listenerOrderly);
+        
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new 
ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
listenerOrderly));
+        pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true);
         pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
-
         PullMessageService pullMessageService = 
mQClientFactory.getPullMessageService();
-        pullMessageService.executePullRequestImmediately(createPullRequest());
+        pullMessageService.executePullRequestLater(createPullRequest(), 100);
 
-        countDownLatch.await();
+        countDownLatch.await(10, TimeUnit.SECONDS);
         assertThat(messageExts[0].getTopic()).isEqualTo(topic);
         assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
     }
@@ -211,7 +209,10 @@ public class DefaultMQPushConsumerTest {
         messageQueue.setQueueId(0);
         messageQueue.setTopic(topic);
         pullRequest.setMessageQueue(messageQueue);
-        pullRequest.setProcessQueue(new ProcessQueue());
+        ProcessQueue processQueue = new ProcessQueue();
+        processQueue.setLocked(true);
+        processQueue.setLastLockTimestamp(System.currentTimeMillis());
+        pullRequest.setProcessQueue(processQueue);
 
         return pullRequest;
     }

Reply via email to