This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2d1e3143c7 [ISSUE #8372] Add more test coverage for
AdminBrokerProcessor
2d1e3143c7 is described below
commit 2d1e3143c7dc60aca805e6689c5364825e3020d4
Author: Tan Xiang <[email protected]>
AuthorDate: Wed Jul 17 16:08:20 2024 +0800
[ISSUE #8372] Add more test coverage for AdminBrokerProcessor
---
.../broker/processor/AdminBrokerProcessor.java | 2 +-
.../broker/processor/AdminBrokerProcessorTest.java | 471 ++++++++++++++++++++-
2 files changed, 454 insertions(+), 19 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 1b29ff173c..c5419a62df 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1941,7 +1941,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
String content =
this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().encode();
- if (content != null && content.length() > 0) {
+ if (content != null && !content.isEmpty()) {
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index e66703e565..04324043fb 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -20,21 +20,6 @@ import com.alibaba.fastjson.JSON;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.auth.authentication.enums.UserType;
import
org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
import org.apache.rocketmq.auth.authentication.model.Subject;
@@ -45,8 +30,10 @@ import org.apache.rocketmq.auth.authorization.model.Acl;
import org.apache.rocketmq.auth.authorization.model.Environment;
import org.apache.rocketmq.auth.authorization.model.Resource;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
@@ -55,11 +42,13 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueId;
import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.constant.FIleReadaheadMode;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -67,13 +56,20 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.AclInfo;
+import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
@@ -82,8 +78,11 @@ import
org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
@@ -91,6 +90,13 @@ import
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.ResetMasterFlushOffsetHeader;
+import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
@@ -98,12 +104,17 @@ import
org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.stats.BrokerStats;
+import org.apache.rocketmq.store.timer.TimerCheckpoint;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.apache.rocketmq.store.timer.TimerMetrics;
+import org.apache.rocketmq.store.util.LibC;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -112,6 +123,26 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -170,11 +201,32 @@ public class AdminBrokerProcessorTest {
@Mock
private AuthorizationMetadataManager authorizationMetadataManager;
+ @Mock
+ private TimerMessageStore timerMessageStore;
+
+ @Mock
+ private TimerMetrics timerMetrics;
+
+ @Mock
+ private MessageStoreConfig messageStoreConfig;
+
+ @Mock
+ private CommitLog commitLog;
+
+ @Mock
+ private Broker2Client broker2Client;
+
+ @Mock
+ private ClientChannelInfo clientChannelInfo;
+
@Before
public void init() throws Exception {
brokerController.setMessageStore(messageStore);
brokerController.setAuthenticationMetadataManager(authenticationMetadataManager);
brokerController.setAuthorizationMetadataManager(authorizationMetadataManager);
+ Field field = BrokerController.class.getDeclaredField("broker2Client");
+ field.setAccessible(true);
+ field.set(brokerController, broker2Client);
//doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();
@@ -280,6 +332,31 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
+ @Test
+ public void testUpdateAndCreateTopicList() throws RemotingCommandException
{
+ List<String> systemTopicList = new ArrayList<>(systemTopicSet);
+ RemotingCommand request = buildCreateTopicListRequest(systemTopicList);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ assertThat(response.getRemark()).isEqualTo("The topic[" +
systemTopicList.get(0) + "] is conflict with system topic.");
+
+ List<String> inValidTopicList = new ArrayList<>();
+ inValidTopicList.add("");
+ request = buildCreateTopicListRequest(inValidTopicList);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+ List<String> topicList = new ArrayList<>();
+ topicList.add("TEST_CREATE_TOPIC");
+ topicList.add("TEST_CREATE_TOPIC1");
+ request = buildCreateTopicListRequest(topicList);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ //test no changes
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
@Test
public void testDeleteTopicInRocksdb() throws Exception {
if (notToBeExecuted()) {
@@ -815,7 +892,6 @@ public class AdminBrokerProcessorTest {
request.setBody(JSON.toJSONBytes(aclInfo));
RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-
}
@Test
@@ -833,7 +909,6 @@ public class AdminBrokerProcessorTest {
request.setBody(JSON.toJSONBytes(aclInfo));
RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-
}
@Test
@@ -893,6 +968,349 @@ public class AdminBrokerProcessorTest {
assertThat(aclInfoData.get(0).getPolicies().get(0).getEntries().get(0).getDecision()).isEqualTo("Allow");
}
+ @Test
+ public void testGetTimeCheckPoint() throws RemotingCommandException {
+ when(this.brokerController.getTimerCheckpoint()).thenReturn(null);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ assertThat(response.getRemark()).isEqualTo("The checkpoint is null");
+
+ when(this.brokerController.getTimerCheckpoint()).thenReturn(new
TimerCheckpoint());
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+ @Test
+ public void testGetTimeMetrics() throws RemotingCommandException,
IOException {
+
when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(null);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_METRICS, null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+
when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(timerMessageStore);
+
when(this.timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics);
+ when(this.timerMetrics.encode()).thenReturn(new
TimerMetrics.TimerMetricsSerializeWrapper().toJson(false));
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testUpdateColdDataFlowCtrGroupConfig() throws
RemotingCommandException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_COLD_DATA_FLOW_CTR_CONFIG,
null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ request.setBody("consumerGroup1=1".getBytes());
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ request.setBody("".getBytes());
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testRemoveColdDataFlowCtrGroupConfig() throws
RemotingCommandException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.REMOVE_COLD_DATA_FLOW_CTR_CONFIG,
null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ request.setBody("consumerGroup1".getBytes());
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetColdDataFlowCtrInfo() throws RemotingCommandException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_COLD_DATA_FLOW_CTR_INFO,
null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testSetCommitLogReadAheadMode() throws
RemotingCommandException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SET_COMMITLOG_READ_MODE, null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+ HashMap<String, String> extfields = new HashMap<>();
+ extfields.put(FIleReadaheadMode.READ_AHEAD_MODE,
String.valueOf(LibC.MADV_DONTNEED));
+ request.setExtFields(extfields);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+ extfields.clear();
+ extfields.put(FIleReadaheadMode.READ_AHEAD_MODE,
String.valueOf(LibC.MADV_NORMAL));
+ request.setExtFields(extfields);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ this.brokerController.setMessageStore(defaultMessageStore);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+
when(this.defaultMessageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+ when(this.defaultMessageStore.getCommitLog()).thenReturn(commitLog);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetUnknownCmdResponse() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(10000,
null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+
assertThat(response.getCode()).isEqualTo(ResponseCode.REQUEST_CODE_NOT_SUPPORTED);
+ }
+
+ @Test
+ public void testGetAllMessageRequestMode() throws RemotingCommandException
{
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_MESSAGE_REQUEST_MODE,
null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testResetOffset() throws RemotingCommandException {
+ ResetOffsetRequestHeader requestHeader =
+ createRequestHeader("topic","group",-1,false,-1,-1);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET,
requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+
+
this.brokerController.getTopicConfigManager().getTopicConfigTable().put("topic",
new TopicConfig("topic"));
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+
+
this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group",
new SubscriptionGroupConfig());
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ requestHeader.setQueueId(0);
+ request =
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET,
requestHeader);
+ request.makeCustomHeaderToNet();
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ requestHeader.setOffset(2L);
+ request =
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET,
requestHeader);
+ request.makeCustomHeaderToNet();
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ }
+
+ @Test
+ public void testGetConsumerStatus() throws RemotingCommandException {
+ GetConsumerStatusRequestHeader requestHeader = new
GetConsumerStatusRequestHeader();
+ requestHeader.setGroup("group");
+ requestHeader.setTopic("topic");
+ requestHeader.setClientAddr("");
+ RemotingCommand request = RemotingCommand
+
.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS,
requestHeader);
+ RemotingCommand responseCommand =
RemotingCommand.createResponseCommand(null);
+ responseCommand.setCode(ResponseCode.SUCCESS);
+
when(broker2Client.getConsumeStatus(anyString(),anyString(),anyString())).thenReturn(responseCommand);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testQueryTopicConsumeByWho() throws RemotingCommandException {
+ QueryTopicConsumeByWhoRequestHeader requestHeader = new
QueryTopicConsumeByWhoRequestHeader();
+ requestHeader.setTopic("topic");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO,
requestHeader);
+ request.makeCustomHeaderToNet();
+ HashSet<String> groups = new HashSet<>();
+ groups.add("group");
+
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+
when(consumerManager.queryTopicConsumeByWho(anyString())).thenReturn(groups);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(RemotingSerializable.decode(response.getBody(),
GroupList.class)
+ .getGroupList().contains("group"))
+ .isEqualTo(groups.contains("group"));
+ }
+
+ @Test
+ public void testQueryTopicByConsumer() throws RemotingCommandException {
+ QueryTopicsByConsumerRequestHeader requestHeader = new
QueryTopicsByConsumerRequestHeader();
+ requestHeader.setGroup("group");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPICS_BY_CONSUMER,
requestHeader);
+ request.makeCustomHeaderToNet();
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testQuerySubscriptionByConsumer() throws
RemotingCommandException {
+ QuerySubscriptionByConsumerRequestHeader requestHeader = new
QuerySubscriptionByConsumerRequestHeader();
+ requestHeader.setGroup("group");
+ requestHeader.setTopic("topic");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER,
requestHeader);
+ request.makeCustomHeaderToNet();
+
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+
when(consumerManager.findSubscriptionData(anyString(),anyString())).thenReturn(null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetSystemTopicListFromBroker() throws
RemotingCommandException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER,
null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testCleanExpiredConsumeQueue() throws RemotingCommandException
{
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE,
null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testDeleteExpiredCommitLog() throws RemotingCommandException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.DELETE_EXPIRED_COMMITLOG,
null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testCleanUnusedTopic() throws RemotingCommandException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetConsumerRunningInfo() throws RemotingCommandException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+
when(consumerManager.findChannel(anyString(),anyString())).thenReturn(null);
+ GetConsumerRunningInfoRequestHeader requestHeader = new
GetConsumerRunningInfoRequestHeader();
+ requestHeader.setClientId("client");
+ requestHeader.setConsumerGroup("group");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO,
requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+
when(consumerManager.findChannel(anyString(),anyString())).thenReturn(clientChannelInfo);
+
when(clientChannelInfo.getVersion()).thenReturn(MQVersion.Version.V3_0_0_SNAPSHOT.ordinal());
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+
when(clientChannelInfo.getVersion()).thenReturn(MQVersion.Version.V5_2_3.ordinal());
+ when(brokerController.getBroker2Client()).thenReturn(broker2Client);
+ when(clientChannelInfo.getChannel()).thenReturn(channel);
+ RemotingCommand responseCommand =
RemotingCommand.createResponseCommand(null);
+ responseCommand.setCode(ResponseCode.SUCCESS);
+
when(broker2Client.callClient(any(Channel.class),any(RemotingCommand.class))).thenReturn(responseCommand);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+
when(broker2Client.callClient(any(Channel.class),any(RemotingCommand.class))).thenThrow(new
RemotingTimeoutException("timeout"));
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+
assertThat(response.getCode()).isEqualTo(ResponseCode.CONSUME_MSG_TIMEOUT);
+ }
+
+ @Test
+ public void testQueryCorrectionOffset() throws RemotingCommandException {
+ Map<Integer, Long> correctionOffsetMap = new HashMap<>();
+ correctionOffsetMap.put(0, 100L);
+ correctionOffsetMap.put(1, 200L);
+ Map<Integer, Long> compareOffsetMap = new HashMap<>();
+ compareOffsetMap.put(0, 80L);
+ compareOffsetMap.put(1, 300L);
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+
when(consumerOffsetManager.queryMinOffsetInAllGroup(anyString(),anyString())).thenReturn(correctionOffsetMap);
+
when(consumerOffsetManager.queryOffset(anyString(),anyString())).thenReturn(compareOffsetMap);
+ QueryCorrectionOffsetHeader queryCorrectionOffsetHeader = new
QueryCorrectionOffsetHeader();
+ queryCorrectionOffsetHeader.setTopic("topic");
+ queryCorrectionOffsetHeader.setCompareGroup("group");
+ queryCorrectionOffsetHeader.setFilterGroups("");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET,
queryCorrectionOffsetHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ QueryCorrectionOffsetBody body =
RemotingSerializable.decode(response.getBody(),
QueryCorrectionOffsetBody.class);
+ Map<Integer, Long> correctionOffsets = body.getCorrectionOffsets();
+ assertThat(correctionOffsets.get(0)).isEqualTo(Long.MAX_VALUE);
+ assertThat(correctionOffsets.get(1)).isEqualTo(200L);
+ }
+
+ @Test
+ public void testNotifyMinBrokerIdChange() throws RemotingCommandException {
+ NotifyMinBrokerIdChangeRequestHeader requestHeader = new
NotifyMinBrokerIdChangeRequestHeader();
+ requestHeader.setMinBrokerId(1L);
+ requestHeader.setMinBrokerAddr("127.0.0.1:10912");
+ requestHeader.setOfflineBrokerAddr("127.0.0.1:10911");
+ requestHeader.setHaBrokerAddr("");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_MIN_BROKER_ID_CHANGE,
requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testUpdateBrokerHaInfo() throws RemotingCommandException {
+ ExchangeHAInfoResponseHeader requestHeader = new
ExchangeHAInfoResponseHeader();
+ requestHeader.setMasterAddress("127.0.0.1:10911");
+ requestHeader.setMasterFlushOffset(0L);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.EXCHANGE_BROKER_HA_INFO,
requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+ requestHeader.setMasterHaAddress("127.0.0.1:10912");
+ request =
RemotingCommand.createRequestCommand(RequestCode.EXCHANGE_BROKER_HA_INFO,
requestHeader);
+ request.makeCustomHeaderToNet();
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ when(messageStore.getMasterFlushedOffset()).thenReturn(0L);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetBrokerHaStatus() throws RemotingCommandException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_HA_STATUS,null);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+ when(messageStore.getHARuntimeInfo()).thenReturn(new HARuntimeInfo());
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testResetMasterFlushOffset() throws RemotingCommandException {
+ ResetMasterFlushOffsetHeader requestHeader = new
ResetMasterFlushOffsetHeader();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.RESET_MASTER_FLUSH_OFFSET,requestHeader);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ requestHeader.setMasterFlushOffset(0L);
+ request.makeCustomHeaderToNet();
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ private ResetOffsetRequestHeader createRequestHeader(String topic,String
group,long timestamp,boolean force,long offset,int queueId) {
+ ResetOffsetRequestHeader requestHeader = new
ResetOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+ requestHeader.setTimestamp(timestamp);
+ requestHeader.setForce(force);
+ requestHeader.setOffset(offset);
+ requestHeader.setQueueId(queueId);
+ return requestHeader;
+ }
+
private RemotingCommand buildCreateTopicRequest(String topic) {
CreateTopicRequestHeader requestHeader = new
CreateTopicRequestHeader();
requestHeader.setTopic(topic);
@@ -900,12 +1318,29 @@ public class AdminBrokerProcessorTest {
requestHeader.setReadQueueNums(8);
requestHeader.setWriteQueueNums(8);
requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
-
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC,
requestHeader);
request.makeCustomHeaderToNet();
return request;
}
+ private RemotingCommand buildCreateTopicListRequest(List<String>
topicList) {
+ List<TopicConfig> topicConfigList = new ArrayList<>();
+ for (String topic:topicList) {
+ TopicConfig topicConfig = new TopicConfig(topic);
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setWriteQueueNums(8);
+ topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
+ topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+ topicConfig.setTopicSysFlag(0);
+ topicConfig.setOrder(false);
+ topicConfigList.add(topicConfig);
+ }
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST,
null);
+ CreateTopicListRequestBody createTopicListRequestBody = new
CreateTopicListRequestBody(topicConfigList);
+ request.setBody(createTopicListRequestBody.encode());
+ return request;
+ }
+
private RemotingCommand buildDeleteTopicRequest(String topic) {
DeleteTopicRequestHeader requestHeader = new
DeleteTopicRequestHeader();
requestHeader.setTopic(topic);