This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c9c8a0a61 Add unit test for ConsumerProcessor (#5352)
c9c8a0a61 is described below
commit c9c8a0a61aab9664b93a6d8c40016fdf7fb249b1
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Oct 19 17:37:28 2022 +0800
Add unit test for ConsumerProcessor (#5352)
---
.../proxy/processor/ConsumerProcessorTest.java | 67 ++++++++++++++++++++++
1 file changed, 67 insertions(+)
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index dc7e969e7..a7b254c7c 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -17,15 +17,19 @@
package org.apache.rocketmq.proxy.processor;
+import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
@@ -34,6 +38,7 @@ import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
import
org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
@@ -45,12 +50,14 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -58,6 +65,7 @@ public class ConsumerProcessorTest extends BaseProcessorTest {
private static final String CONSUMER_GROUP = "consumerGroup";
private static final String TOPIC = "topic";
+ private static final String CLIENT_ID = "clientId";
private ConsumerProcessor consumerProcessor;
@@ -173,4 +181,63 @@ public class ConsumerProcessorTest extends
BaseProcessorTest {
assertEquals(1000,
requestHeaderArgumentCaptor.getValue().getInvisibleTime().longValue());
assertEquals(handle.getReceiptHandle(),
requestHeaderArgumentCaptor.getValue().getExtraInfo());
}
+
+ @Test
+ public void testLockBatch() throws Throwable {
+ Set<MessageQueue> mqSet = new HashSet<>();
+ MessageQueue mq1 = new MessageQueue(TOPIC, "broker1", 0);
+ AddressableMessageQueue addressableMessageQueue1 = new
AddressableMessageQueue(mq1, "127.0.0.1");
+ MessageQueue mq2 = new MessageQueue(TOPIC, "broker2", 0);
+ AddressableMessageQueue addressableMessageQueue2 = new
AddressableMessageQueue(mq2, "127.0.0.1");
+ mqSet.add(mq1);
+ mqSet.add(mq2);
+
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0],
"127.0.0.1"));
+ when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue1), any(), anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
+ when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue2), any(), anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq2)));
+ Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null,
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+ .get();
+ assertThat(result).isEqualTo(mqSet);
+ }
+
+ @Test
+ public void testLockBatchPartialSuccess() throws Throwable {
+ Set<MessageQueue> mqSet = new HashSet<>();
+ MessageQueue mq1 = new MessageQueue(TOPIC, "broker1", 0);
+ AddressableMessageQueue addressableMessageQueue1 = new
AddressableMessageQueue(mq1, "127.0.0.1");
+ MessageQueue mq2 = new MessageQueue(TOPIC, "broker2", 0);
+ AddressableMessageQueue addressableMessageQueue2 = new
AddressableMessageQueue(mq2, "127.0.0.1");
+ mqSet.add(mq1);
+ mqSet.add(mq2);
+
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0],
"127.0.0.1"));
+ when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue1), any(), anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
+ when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue2), any(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(Sets.newHashSet()));
+ Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null,
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+ .get();
+ assertThat(result).isEqualTo(Sets.newHashSet(mq1));
+ }
+
+ @Test
+ public void testLockBatchPartialSuccessWithException() throws Throwable {
+ Set<MessageQueue> mqSet = new HashSet<>();
+ MessageQueue mq1 = new MessageQueue(TOPIC, "broker1", 0);
+ AddressableMessageQueue addressableMessageQueue1 = new
AddressableMessageQueue(mq1, "127.0.0.1");
+ MessageQueue mq2 = new MessageQueue(TOPIC, "broker2", 0);
+ AddressableMessageQueue addressableMessageQueue2 = new
AddressableMessageQueue(mq2, "127.0.0.1");
+ mqSet.add(mq1);
+ mqSet.add(mq2);
+
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0],
"127.0.0.1"));
+ when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue1), any(), anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
+ CompletableFuture<Set<MessageQueue>> future = new
CompletableFuture<>();
+ future.completeExceptionally(new MQBrokerException(1, "err"));
+ when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue2), any(), anyLong()))
+ .thenReturn(future);
+ Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null,
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+ .get();
+ assertThat(result).isEqualTo(Sets.newHashSet(mq1));
+ }
}
\ No newline at end of file