This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a948f67b4d [ISSUE #8796] Add more test coverage for 
PopLongPollingService (#8797)
a948f67b4d is described below

commit a948f67b4dd711d74171e5d3559f6be7bd0c60e1
Author: yx9o <[email protected]>
AuthorDate: Wed Oct 9 09:58:52 2024 +0800

    [ISSUE #8796] Add more test coverage for PopLongPollingService (#8797)
---
 .../longpolling/PopLongPollingServiceTest.java     | 220 +++++++++++++++++++++
 1 file changed, 220 insertions(+)

diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
new file mode 100644
index 0000000000..6527beeb68
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.longpolling;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.MessageFilter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PopLongPollingServiceTest {
+    
+    @Mock
+    private BrokerController brokerController;
+    
+    @Mock
+    private NettyRequestProcessor processor;
+    
+    @Mock
+    private ChannelHandlerContext ctx;
+    
+    @Mock
+    private ExecutorService pullMessageExecutor;
+    
+    private PopLongPollingService popLongPollingService;
+    
+    private final String defaultTopic = "defaultTopic";
+    
+    @Before
+    public void init() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setPopPollingMapSize(100);
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        popLongPollingService = spy(new 
PopLongPollingService(brokerController, processor, true));
+    }
+    
+    @Test
+    public void testNotifyMessageArrivingWithRetryTopic() {
+        int queueId = 0;
+        
doNothing().when(popLongPollingService).notifyMessageArrivingWithRetryTopic(defaultTopic,
 queueId, null, 0L, null, null);
+        
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, 
queueId);
+        verify(popLongPollingService, 
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, null, 0L, 
null, null);
+    }
+    
+    @Test
+    public void testNotifyMessageArriving() {
+        int queueId = 0;
+        Long tagsCode = 123L;
+        long msgStoreTime = System.currentTimeMillis();
+        byte[] filterBitMap = new byte[]{0x01};
+        Map<String, String> properties = new ConcurrentHashMap<>();
+        
doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, 
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+        
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, 
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+        verify(popLongPollingService).notifyMessageArriving(defaultTopic, 
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+    }
+    
+    @Test
+    public void testNotifyMessageArrivingValidRequest() throws Exception {
+        String cid = "CID_1";
+        int queueId = 0;
+        ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap 
= new ConcurrentHashMap<>();
+        ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
+        cids.put(cid, (byte) 1);
+        topicCidMap.put(defaultTopic, cids);
+        popLongPollingService = new PopLongPollingService(brokerController, 
processor, true);
+        ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> 
pollingMap =
+                new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+        Channel channel = mock(Channel.class);
+        when(channel.isActive()).thenReturn(true);
+        PopRequest popRequest = mock(PopRequest.class);
+        MessageFilter messageFilter = mock(MessageFilter.class);
+        SubscriptionData subscriptionData = mock(SubscriptionData.class);
+        when(popRequest.getMessageFilter()).thenReturn(messageFilter);
+        when(popRequest.getSubscriptionData()).thenReturn(subscriptionData);
+        when(popRequest.getChannel()).thenReturn(channel);
+        String pollingKey = KeyBuilder.buildPollingKey(defaultTopic, cid, 
queueId);
+        ConcurrentSkipListSet popRequests = mock(ConcurrentSkipListSet.class);
+        when(popRequests.pollLast()).thenReturn(popRequest);
+        pollingMap.put(pollingKey, popRequests);
+        FieldUtils.writeDeclaredField(popLongPollingService, "topicCidMap", 
topicCidMap, true);
+        FieldUtils.writeDeclaredField(popLongPollingService, "pollingMap", 
pollingMap, true);
+        boolean actual = 
popLongPollingService.notifyMessageArriving(defaultTopic, queueId, cid, null, 
0, null, null);
+        assertFalse(actual);
+    }
+    
+    @Test
+    public void testWakeUpNullRequest() {
+        assertFalse(popLongPollingService.wakeUp(null));
+    }
+    
+    @Test
+    public void testWakeUpIncompleteRequest() {
+        PopRequest request = mock(PopRequest.class);
+        when(request.complete()).thenReturn(false);
+        assertFalse(popLongPollingService.wakeUp(request));
+    }
+    
+    @Test
+    public void testWakeUpInactiveChannel() {
+        PopRequest request = mock(PopRequest.class);
+        when(request.complete()).thenReturn(true);
+        when(request.getCtx()).thenReturn(ctx);
+        Channel channel = mock(Channel.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(channel.isActive()).thenReturn(true);
+        
when(brokerController.getPullMessageExecutor()).thenReturn(pullMessageExecutor);
+        assertTrue(popLongPollingService.wakeUp(request));
+    }
+    
+    @Test
+    public void testWakeUpValidRequestWithException() throws Exception {
+        PopRequest request = mock(PopRequest.class);
+        when(request.complete()).thenReturn(true);
+        when(request.getCtx()).thenReturn(ctx);
+        Channel channel = mock(Channel.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(request.getChannel()).thenReturn(channel);
+        when(channel.isActive()).thenReturn(true);
+        
when(brokerController.getPullMessageExecutor()).thenReturn(pullMessageExecutor);
+        when(processor.processRequest(any(), any())).thenThrow(new 
RuntimeException("Test Exception"));
+        assertTrue(popLongPollingService.wakeUp(request));
+        ArgumentCaptor<Runnable> captor = 
ArgumentCaptor.forClass(Runnable.class);
+        verify(pullMessageExecutor).submit(captor.capture());
+        captor.getValue().run();
+        verify(processor).processRequest(any(), any());
+    }
+    
+    @Test
+    public void testPollingNotPolling() {
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        RemotingCommand remotingCommand = mock(RemotingCommand.class);
+        PollingHeader requestHeader = mock(PollingHeader.class);
+        SubscriptionData subscriptionData = mock(SubscriptionData.class);
+        MessageFilter messageFilter = mock(MessageFilter.class);
+        when(requestHeader.getPollTime()).thenReturn(0L);
+        PollingResult result = popLongPollingService.polling(ctx, 
remotingCommand, requestHeader, subscriptionData, messageFilter);
+        assertEquals(PollingResult.NOT_POLLING, result);
+    }
+    
+    @Test
+    public void testPollingServicePollingTimeout() throws 
IllegalAccessException {
+        String cid = "CID_1";
+        popLongPollingService = new PopLongPollingService(brokerController, 
processor, true);
+        popLongPollingService.shutdown();
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        RemotingCommand remotingCommand = mock(RemotingCommand.class);
+        PollingHeader requestHeader = mock(PollingHeader.class);
+        SubscriptionData subscriptionData = mock(SubscriptionData.class);
+        MessageFilter messageFilter = mock(MessageFilter.class);
+        when(requestHeader.getPollTime()).thenReturn(1000L);
+        when(requestHeader.getTopic()).thenReturn(defaultTopic);
+        when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup");
+        ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap 
= new ConcurrentHashMap<>();
+        ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
+        cids.put(cid, (byte) 1);
+        topicCidMap.put(defaultTopic, cids);
+        FieldUtils.writeDeclaredField(popLongPollingService, "topicCidMap", 
topicCidMap, true);
+        PollingResult result = popLongPollingService.polling(ctx, 
remotingCommand, requestHeader, subscriptionData, messageFilter);
+        assertEquals(PollingResult.POLLING_TIMEOUT, result);
+    }
+    
+    @Test
+    public void testPollingPollingSuc() {
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        RemotingCommand remotingCommand = mock(RemotingCommand.class);
+        PollingHeader requestHeader = mock(PollingHeader.class);
+        SubscriptionData subscriptionData = mock(SubscriptionData.class);
+        MessageFilter messageFilter = mock(MessageFilter.class);
+        when(requestHeader.getPollTime()).thenReturn(1000L);
+        
when(requestHeader.getBornTime()).thenReturn(System.currentTimeMillis());
+        when(requestHeader.getTopic()).thenReturn("topic");
+        when(requestHeader.getConsumerGroup()).thenReturn("cid");
+        when(requestHeader.getQueueId()).thenReturn(0);
+        PollingResult result = popLongPollingService.polling(ctx, 
remotingCommand, requestHeader, subscriptionData, messageFilter);
+        assertEquals(PollingResult.POLLING_SUC, result);
+    }
+}

Reply via email to