This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 73d0c33f4f add tests for ConsumerManageProcessor (#8401)
73d0c33f4f is described below
commit 73d0c33f4f1ab5e2b57afa4afc6203000aebeabd
Author: Tan Xiang <[email protected]>
AuthorDate: Thu Jul 18 14:21:09 2024 +0800
add tests for ConsumerManageProcessor (#8401)
---
.../processor/ConsumerManageProcessorTest.java | 186 ++++++++++++++++++++-
1 file changed, 185 insertions(+), 1 deletion(-)
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
index c94591d381..6b3c2578af 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
@@ -16,19 +16,36 @@
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.remoting.protocol.statictopic.LogicQueueMappingItem;
+import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingContext;
+import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.rpc.RpcClient;
+import org.apache.rocketmq.remoting.rpc.RpcException;
+import org.apache.rocketmq.remoting.rpc.RpcResponse;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
@@ -38,7 +55,17 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -50,12 +77,24 @@ public class ConsumerManageProcessorTest {
private BrokerController brokerController = new BrokerController(new
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new
MessageStoreConfig());
@Mock
private MessageStore messageStore;
+ @Mock
+ private Channel channel;
+ @Mock
+ private ConsumerOffsetManager consumerOffsetManager;
+ @Mock
+ private BrokerOuterAPI brokerOuterAPI;
+ @Mock
+ private RpcClient rpcClient;
+ @Mock
+ private Future<RpcResponse> responseFuture;
+ @Mock
+ private TopicQueueMappingContext mappingContext;
private String topic = "FooBar";
private String group = "FooBarGroup";
@Before
- public void init() {
+ public void init() throws RpcException {
brokerController.setMessageStore(messageStore);
TopicConfigManager topicConfigManager = new
TopicConfigManager(brokerController);
topicConfigManager.getTopicConfigTable().put(topic, new
TopicConfig(topic));
@@ -64,6 +103,12 @@ public class ConsumerManageProcessorTest {
subscriptionGroupManager.getSubscriptionGroupTable().put(group, new
SubscriptionGroupConfig());
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
consumerManageProcessor = new
ConsumerManageProcessor(brokerController);
+ when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
+ when(brokerOuterAPI.getRpcClient()).thenReturn(rpcClient);
+ when(rpcClient.invoke(any(),anyLong())).thenReturn(responseFuture);
+ TopicQueueMappingDetail topicQueueMappingDetail = new
TopicQueueMappingDetail();
+ topicQueueMappingDetail.setBname("BrokerA");
+
when(mappingContext.getMappingDetail()).thenReturn(topicQueueMappingDetail);
}
@Test
@@ -82,6 +127,145 @@ public class ConsumerManageProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
}
+ @Test
+ public void testUpdateConsumerOffset() throws RemotingCommandException {
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+
when(consumerOffsetManager.hasOffsetReset(anyString(),anyString(),anyInt())).thenReturn(true);
+ RemotingCommand request = buildUpdateConsumerOffsetRequest(group,
topic, 0, 0);
+ RemotingCommand response =
consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+
when(consumerOffsetManager.hasOffsetReset(anyString(),anyString(),anyInt())).thenReturn(false);
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetConsumerListByGroup() throws RemotingCommandException {
+ GetConsumerListByGroupRequestHeader requestHeader = new
GetConsumerListByGroupRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+ brokerController.getConsumerManager().getConsumerTable().put(group,new
ConsumerGroupInfo(group));
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+ ConsumerGroupInfo consumerGroupInfo =
+
this.brokerController.getConsumerManager().getConsumerGroupInfo(
+ requestHeader.getConsumerGroup());
+ consumerGroupInfo.getChannelInfoTable().put(channel,new
ClientChannelInfo(channel));
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testQueryConsumerOffset() throws RemotingCommandException,
ExecutionException, InterruptedException {
+ RemotingCommand request = buildQueryConsumerOffsetRequest(group,
topic, 0, true);
+ RemotingCommand response =
consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
+
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+
when(consumerOffsetManager.queryOffset(anyString(),anyString(),anyInt())).thenReturn(0L);
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+
when(consumerOffsetManager.queryOffset(anyString(),anyString(),anyInt())).thenReturn(-1L);
+
when(messageStore.getMinOffsetInQueue(anyString(),anyInt())).thenReturn(-1L);
+
when(messageStore.checkInMemByConsumeOffset(anyString(),anyInt(),anyLong(),anyInt())).thenReturn(true);
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ TopicQueueMappingManager topicQueueMappingManager =
mock(TopicQueueMappingManager.class);
+
when(brokerController.getTopicQueueMappingManager()).thenReturn(topicQueueMappingManager);
+
when(topicQueueMappingManager.buildTopicQueueMappingContext(any(QueryConsumerOffsetRequestHeader.class))).thenReturn(mappingContext);
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+
assertThat(response.getCode()).isEqualTo(ResponseCode.NOT_LEADER_FOR_QUEUE);
+
+ List<LogicQueueMappingItem> items = new ArrayList<>();
+ LogicQueueMappingItem item1 = createLogicQueueMappingItem("BrokerC",
0, 0L, 0L);
+ items.add(item1);
+ when(mappingContext.getMappingItemList()).thenReturn(items);
+ when(mappingContext.getLeaderItem()).thenReturn(item1);
+ when(mappingContext.getCurrentItem()).thenReturn(item1);
+ when(mappingContext.isLeader()).thenReturn(true);
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ LogicQueueMappingItem item2 = createLogicQueueMappingItem("BrokerA",
0, 0L, 0L);
+ items.add(item2);
+ QueryConsumerOffsetResponseHeader queryConsumerOffsetResponseHeader =
new QueryConsumerOffsetResponseHeader();
+ queryConsumerOffsetResponseHeader.setOffset(0L);
+ RpcResponse rpcResponse = new
RpcResponse(ResponseCode.SUCCESS,queryConsumerOffsetResponseHeader,null);
+ when(responseFuture.get()).thenReturn(rpcResponse);
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ queryConsumerOffsetResponseHeader.setOffset(-1L);
+ rpcResponse = new
RpcResponse(ResponseCode.SUCCESS,queryConsumerOffsetResponseHeader,null);
+ when(responseFuture.get()).thenReturn(rpcResponse);
+ response = consumerManageProcessor.processRequest(handlerContext,
request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
+ }
+
+ @Test
+ public void testRewriteRequestForStaticTopic() throws RpcException,
ExecutionException, InterruptedException {
+ UpdateConsumerOffsetRequestHeader requestHeader = new
UpdateConsumerOffsetRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(0);
+ requestHeader.setCommitOffset(0L);
+
+ RemotingCommand response =
consumerManageProcessor.rewriteRequestForStaticTopic(requestHeader,
mappingContext);
+ assertThat(response).isNotNull();
+
assertThat(response.getCode()).isEqualTo(ResponseCode.NOT_LEADER_FOR_QUEUE);
+
+ List<LogicQueueMappingItem> items = new ArrayList<>();
+ LogicQueueMappingItem item = createLogicQueueMappingItem("BrokerC", 0,
0L, 0L);
+ items.add(item);
+ when(mappingContext.getMappingItemList()).thenReturn(items);
+ when(mappingContext.isLeader()).thenReturn(true);
+ RpcResponse rpcResponse = new RpcResponse(ResponseCode.SUCCESS,new
UpdateConsumerOffsetResponseHeader(),null);
+ when(responseFuture.get()).thenReturn(rpcResponse);
+ response =
consumerManageProcessor.rewriteRequestForStaticTopic(requestHeader,
mappingContext);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ public RemotingCommand buildQueryConsumerOffsetRequest(String group,
String topic, int queueId,boolean setZeroIfNotFound) {
+ QueryConsumerOffsetRequestHeader requestHeader = new
QueryConsumerOffsetRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setSetZeroIfNotFound(setZeroIfNotFound);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET,
requestHeader);
+ request.makeCustomHeaderToNet();
+ return request;
+ }
+
+ public LogicQueueMappingItem createLogicQueueMappingItem(String
brokerName, int queueId, long startOffset, long logicOffset) {
+ LogicQueueMappingItem item = new LogicQueueMappingItem();
+ item.setBname(brokerName);
+ item.setQueueId(queueId);
+ item.setStartOffset(startOffset);
+ item.setLogicOffset(logicOffset);
+ return item;
+ }
+
private RemotingCommand buildUpdateConsumerOffsetRequest(String group,
String topic, int queueId, long offset) {
UpdateConsumerOffsetRequestHeader requestHeader = new
UpdateConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(group);