This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2f482e2ff9 [ISSUE #8517] Add more test coverage for PullMessageService 
(#8542)
2f482e2ff9 is described below

commit 2f482e2ff95cb3c1917d6758aa7b5b9811137ae4
Author: yx9o <[email protected]>
AuthorDate: Fri Aug 16 14:31:03 2024 +0800

    [ISSUE #8517] Add more test coverage for PullMessageService (#8542)
---
 .../impl/consumer/PullMessageServiceTest.java      | 139 +++++++++++++++++++++
 1 file changed, 139 insertions(+)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullMessageServiceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullMessageServiceTest.java
new file mode 100644
index 0000000000..73fa4e95d6
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullMessageServiceTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullMessageServiceTest {
+
+    @Mock
+    private MQClientInstance mQClientFactory;
+
+    @Mock
+    private ScheduledExecutorService executorService;
+
+    private PullMessageService pullMessageService;
+
+    private final long defaultTimeout = 3000L;
+
+    private final String defaultGroup = "defaultGroup";
+
+    @Before
+    public void init() throws Exception {
+        pullMessageService = new PullMessageService(mQClientFactory);
+        FieldUtils.writeDeclaredField(pullMessageService, 
"scheduledExecutorService", executorService, true);
+        pullMessageService.start();
+    }
+
+    @Test
+    public void testProcessPullResult() {
+        PopRequest popRequest = mock(PopRequest.class);
+        pullMessageService.executePopPullRequestLater(popRequest, 
defaultTimeout);
+        pullMessageService.makeStop();
+        pullMessageService.executePopPullRequestLater(popRequest, 
defaultTimeout);
+        verify(executorService, times(1))
+                .schedule(any(Runnable.class),
+                        eq(defaultTimeout),
+                        eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testExecutePopPullRequestImmediately() throws 
IllegalAccessException, InterruptedException {
+        PopRequest popRequest = mock(PopRequest.class);
+        LinkedBlockingQueue<MessageRequest> messageRequestQueue = 
mock(LinkedBlockingQueue.class);
+        FieldUtils.writeDeclaredField(pullMessageService, 
"messageRequestQueue", messageRequestQueue, true);
+        pullMessageService.executePopPullRequestImmediately(popRequest);
+        verify(messageRequestQueue, times(1)).put(any(PopRequest.class));
+    }
+
+    @Test
+    public void testExecuteTaskLater() {
+        Runnable runnable = mock(Runnable.class);
+        pullMessageService.executeTaskLater(runnable, defaultTimeout);
+        pullMessageService.makeStop();
+        pullMessageService.executeTaskLater(runnable, defaultTimeout);
+        verify(executorService, times(1))
+                .schedule(any(Runnable.class),
+                        eq(defaultTimeout),
+                        eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testExecuteTask() {
+        Runnable runnable = mock(Runnable.class);
+        pullMessageService.executeTask(runnable);
+        pullMessageService.makeStop();
+        pullMessageService.executeTask(runnable);
+        verify(executorService, times(1)).execute(any(Runnable.class));
+    }
+
+    @Test
+    public void testGetScheduledExecutorService() {
+        assertEquals(executorService, 
pullMessageService.getScheduledExecutorService());
+    }
+
+    @Test
+    public void testRun() throws InterruptedException, IllegalAccessException {
+        LinkedBlockingQueue<MessageRequest> messageRequestQueue = new 
LinkedBlockingQueue<>();
+        PopRequest popRequest = mock(PopRequest.class);
+        
when(popRequest.getMessageRequestMode()).thenReturn(MessageRequestMode.POP);
+        when(popRequest.getConsumerGroup()).thenReturn(defaultGroup);
+        messageRequestQueue.put(popRequest);
+        DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = 
mock(DefaultMQPushConsumerImpl.class);
+        
when(mQClientFactory.selectConsumer(any())).thenReturn(defaultMQPushConsumerImpl);
+        FieldUtils.writeDeclaredField(pullMessageService, 
"messageRequestQueue", messageRequestQueue, true);
+        new Thread(() -> pullMessageService.run()).start();
+        TimeUnit.SECONDS.sleep(1);
+        pullMessageService.makeStop();
+        verify(mQClientFactory, times(1)).selectConsumer(eq(defaultGroup));
+        verify(defaultMQPushConsumerImpl).popMessage(any(PopRequest.class));
+    }
+
+    @Test
+    public void testRunWithNullConsumer() throws InterruptedException, 
IllegalAccessException {
+        LinkedBlockingQueue<MessageRequest> messageRequestQueue = new 
LinkedBlockingQueue<>();
+        PopRequest popRequest = mock(PopRequest.class);
+        
when(popRequest.getMessageRequestMode()).thenReturn(MessageRequestMode.POP);
+        when(popRequest.getConsumerGroup()).thenReturn(defaultGroup);
+        messageRequestQueue.put(popRequest);
+        FieldUtils.writeDeclaredField(pullMessageService, 
"messageRequestQueue", messageRequestQueue, true);
+        new Thread(() -> pullMessageService.run()).start();
+        TimeUnit.SECONDS.sleep(1);
+        pullMessageService.makeStop();
+        verify(mQClientFactory, times(1)).selectConsumer(eq(defaultGroup));
+    }
+}

Reply via email to