This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4a8e0d5b85 [ISSUE #7351] Allow mqadmin to operate slave nodes
4a8e0d5b85 is described below
commit 4a8e0d5b851d1f9573cda79b7d2e42ee498809da
Author: guyinyou <[email protected]>
AuthorDate: Wed Sep 13 16:08:03 2023 +0800
[ISSUE #7351] Allow mqadmin to operate slave nodes
Co-authored-by: guyinyou <[email protected]>
---
.../broker/processor/AdminBrokerProcessor.java | 12 ---
.../broker/processor/AdminBrokerProcessorTest.java | 106 ---------------------
2 files changed, 118 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 8fbcd3c94f..9e48431be2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -406,9 +406,6 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private synchronized RemotingCommand
updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- if (validateSlave(response)) {
- return response;
- }
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader)
request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
@@ -519,9 +516,6 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- if (validateSlave(response)) {
- return response;
- }
DeleteTopicRequestHeader requestHeader =
(DeleteTopicRequestHeader)
request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
@@ -1413,9 +1407,6 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private RemotingCommand
updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand
request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- if (validateSlave(response)) {
- return response;
- }
LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup
called by {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -1480,9 +1471,6 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- if (validateSlave(response)) {
- return response;
- }
DeleteSubscriptionGroupRequestHeader requestHeader =
(DeleteSubscriptionGroupRequestHeader)
request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 9d17011b61..ec252cecea 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -76,7 +76,6 @@ import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.stats.BrokerStats;
@@ -250,32 +249,6 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
- @Test
- public void testUpdateAndCreateTopicOnSlaveInRocksdb() throws Exception {
- if (notToBeExecuted()) {
- return;
- }
- initRocksdbTopicManager();
- testUpdateAndCreateTopicOnSlave();
- }
-
- @Test
- public void testUpdateAndCreateTopicOnSlave() throws Exception {
- // setup
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
- defaultMessageStore = mock(DefaultMessageStore.class);
-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
- // test on slave
- String topic = "TEST_CREATE_TOPIC";
- RemotingCommand request = buildCreateTopicRequest(topic);
- RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or
subscription group from slave broker, " +
- "please execute it from master broker.");
- }
-
@Test
public void testDeleteTopicInRocksdb() throws Exception {
if (notToBeExecuted()) {
@@ -301,31 +274,6 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
- @Test
- public void testDeleteTopicOnSlaveInRocksdb() throws Exception {
- if (notToBeExecuted()) {
- return;
- }
- initRocksdbTopicManager();
- testDeleteTopicOnSlave();
- }
-
- @Test
- public void testDeleteTopicOnSlave() throws Exception {
- // setup
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
- defaultMessageStore = mock(DefaultMessageStore.class);
-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
- String topic = "TEST_DELETE_TOPIC";
- RemotingCommand request = buildDeleteTopicRequest(topic);
- RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or
subscription group from slave broker, " +
- "please execute it from master broker.");
- }
-
@Test
public void testDeleteWithPopRetryTopic() throws Exception {
String topic = "topicA";
@@ -538,36 +486,6 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
- @Test
- public void testUpdateAndCreateSubscriptionGroupOnSlaveInRocksdb() throws
Exception {
- initRocksdbSubscriptionManager();
- testUpdateAndCreateSubscriptionGroupOnSlave();
- }
-
- @Test
- public void testUpdateAndCreateSubscriptionGroupOnSlave() throws
RemotingCommandException {
- // Setup
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
- defaultMessageStore = mock(DefaultMessageStore.class);
-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
- // Test
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
null);
- SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
- subscriptionGroupConfig.setBrokerId(1);
- subscriptionGroupConfig.setGroupName("groupId");
- subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE);
- subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE);
- subscriptionGroupConfig.setRetryMaxTimes(111);
- subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE);
-
request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
- RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or
subscription group from slave broker, " +
- "please execute it from master broker.");
- }
-
@Test
public void testGetAllSubscriptionGroupInRocksdb() throws Exception {
initRocksdbSubscriptionManager();
@@ -596,30 +514,6 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
- @Test
- public void testDeleteSubscriptionGroupOnSlaveInRocksdb() throws Exception
{
- initRocksdbSubscriptionManager();
- testDeleteSubscriptionGroupOnSlave();
- }
-
- @Test
- public void testDeleteSubscriptionGroupOnSlave() throws
RemotingCommandException {
- // Setup
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
- defaultMessageStore = mock(DefaultMessageStore.class);
-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
- // Test
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP,
null);
- request.addExtField("groupName", "GID-Group-Name");
- request.addExtField("removeOffset", "true");
- RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or
subscription group from slave broker, " +
- "please execute it from master broker.");
- }
-
@Test
public void testGetTopicStatsInfo() throws RemotingCommandException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null);