This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c9c8a0a61 Add unit test for ConsumerProcessor (#5352)
c9c8a0a61 is described below

commit c9c8a0a61aab9664b93a6d8c40016fdf7fb249b1
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Oct 19 17:37:28 2022 +0800

    Add unit test for ConsumerProcessor (#5352)
---
 .../proxy/processor/ConsumerProcessorTest.java     | 67 ++++++++++++++++++++++
 1 file changed, 67 insertions(+)

diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index dc7e969e7..a7b254c7c 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -17,15 +17,19 @@
 
 package org.apache.rocketmq.proxy.processor;
 
+import com.google.common.collect.Sets;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.consumer.PopResult;
 import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
@@ -34,6 +38,7 @@ import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
@@ -45,12 +50,14 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -58,6 +65,7 @@ public class ConsumerProcessorTest extends BaseProcessorTest {
 
     private static final String CONSUMER_GROUP = "consumerGroup";
     private static final String TOPIC = "topic";
+    private static final String CLIENT_ID = "clientId";
 
     private ConsumerProcessor consumerProcessor;
 
@@ -173,4 +181,63 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
         assertEquals(1000, 
requestHeaderArgumentCaptor.getValue().getInvisibleTime().longValue());
         assertEquals(handle.getReceiptHandle(), 
requestHeaderArgumentCaptor.getValue().getExtraInfo());
     }
+
+    @Test
+    public void testLockBatch() throws Throwable {
+        Set<MessageQueue> mqSet = new HashSet<>();
+        MessageQueue mq1 = new MessageQueue(TOPIC, "broker1", 0);
+        AddressableMessageQueue addressableMessageQueue1 = new 
AddressableMessageQueue(mq1, "127.0.0.1");
+        MessageQueue mq2 = new MessageQueue(TOPIC, "broker2", 0);
+        AddressableMessageQueue addressableMessageQueue2 = new 
AddressableMessageQueue(mq2, "127.0.0.1");
+        mqSet.add(mq1);
+        mqSet.add(mq2);
+        
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i 
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], 
"127.0.0.1"));
+        when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue1), any(), anyLong()))
+            
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
+        when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue2), any(), anyLong()))
+            
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq2)));
+        Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null, 
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+            .get();
+        assertThat(result).isEqualTo(mqSet);
+    }
+
+    @Test
+    public void testLockBatchPartialSuccess() throws Throwable {
+        Set<MessageQueue> mqSet = new HashSet<>();
+        MessageQueue mq1 = new MessageQueue(TOPIC, "broker1", 0);
+        AddressableMessageQueue addressableMessageQueue1 = new 
AddressableMessageQueue(mq1, "127.0.0.1");
+        MessageQueue mq2 = new MessageQueue(TOPIC, "broker2", 0);
+        AddressableMessageQueue addressableMessageQueue2 = new 
AddressableMessageQueue(mq2, "127.0.0.1");
+        mqSet.add(mq1);
+        mqSet.add(mq2);
+        
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i 
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], 
"127.0.0.1"));
+        when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue1), any(), anyLong()))
+            
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
+        when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue2), any(), anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(Sets.newHashSet()));
+        Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null, 
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+            .get();
+        assertThat(result).isEqualTo(Sets.newHashSet(mq1));
+    }
+
+    @Test
+    public void testLockBatchPartialSuccessWithException() throws Throwable {
+        Set<MessageQueue> mqSet = new HashSet<>();
+        MessageQueue mq1 = new MessageQueue(TOPIC, "broker1", 0);
+        AddressableMessageQueue addressableMessageQueue1 = new 
AddressableMessageQueue(mq1, "127.0.0.1");
+        MessageQueue mq2 = new MessageQueue(TOPIC, "broker2", 0);
+        AddressableMessageQueue addressableMessageQueue2 = new 
AddressableMessageQueue(mq2, "127.0.0.1");
+        mqSet.add(mq1);
+        mqSet.add(mq2);
+        
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i 
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], 
"127.0.0.1"));
+        when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue1), any(), anyLong()))
+            
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
+        CompletableFuture<Set<MessageQueue>> future = new 
CompletableFuture<>();
+        future.completeExceptionally(new MQBrokerException(1, "err"));
+        when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue2), any(), anyLong()))
+            .thenReturn(future);
+        Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null, 
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+            .get();
+        assertThat(result).isEqualTo(Sets.newHashSet(mq1));
+    }
 }
\ No newline at end of file

Reply via email to