This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4a8e0d5b85 [ISSUE #7351] Allow mqadmin to operate slave nodes
4a8e0d5b85 is described below

commit 4a8e0d5b851d1f9573cda79b7d2e42ee498809da
Author: guyinyou <[email protected]>
AuthorDate: Wed Sep 13 16:08:03 2023 +0800

    [ISSUE #7351] Allow mqadmin to operate slave nodes
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../broker/processor/AdminBrokerProcessor.java     |  12 ---
 .../broker/processor/AdminBrokerProcessorTest.java | 106 ---------------------
 2 files changed, 118 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 8fbcd3c94f..9e48431be2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -406,9 +406,6 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     private synchronized RemotingCommand 
updateAndCreateTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-        if (validateSlave(response)) {
-            return response;
-        }
         final CreateTopicRequestHeader requestHeader =
             (CreateTopicRequestHeader) 
request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
 
@@ -519,9 +516,6 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-        if (validateSlave(response)) {
-            return response;
-        }
         DeleteTopicRequestHeader requestHeader =
             (DeleteTopicRequestHeader) 
request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
 
@@ -1413,9 +1407,6 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     private RemotingCommand 
updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand 
request)
         throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-        if (validateSlave(response)) {
-            return response;
-        }
 
         LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup 
called by {}",
             RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -1480,9 +1471,6 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-        if (validateSlave(response)) {
-            return response;
-        }
         DeleteSubscriptionGroupRequestHeader requestHeader =
             (DeleteSubscriptionGroupRequestHeader) 
request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 9d17011b61..ec252cecea 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -76,7 +76,6 @@ import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
 import org.apache.rocketmq.store.stats.BrokerStats;
@@ -250,32 +249,6 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
-    @Test
-    public void testUpdateAndCreateTopicOnSlaveInRocksdb() throws Exception {
-        if (notToBeExecuted()) {
-            return;
-        }
-        initRocksdbTopicManager();
-        testUpdateAndCreateTopicOnSlave();
-    }
-
-    @Test
-    public void testUpdateAndCreateTopicOnSlave() throws Exception {
-        // setup
-        MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
-        when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
-        defaultMessageStore = mock(DefaultMessageStore.class);
-        
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
-        // test on slave
-        String topic = "TEST_CREATE_TOPIC";
-        RemotingCommand request = buildCreateTopicRequest(topic);
-        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
-        assertThat(response.getRemark()).isEqualTo("Can't modify topic or 
subscription group from slave broker, " +
-            "please execute it from master broker.");
-    }
-
     @Test
     public void testDeleteTopicInRocksdb() throws Exception {
         if (notToBeExecuted()) {
@@ -301,31 +274,6 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
-    @Test
-    public void testDeleteTopicOnSlaveInRocksdb() throws Exception {
-        if (notToBeExecuted()) {
-            return;
-        }
-        initRocksdbTopicManager();
-        testDeleteTopicOnSlave();
-    }
-
-    @Test
-    public void testDeleteTopicOnSlave() throws Exception {
-        // setup
-        MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
-        when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
-        defaultMessageStore = mock(DefaultMessageStore.class);
-        
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
-        String topic = "TEST_DELETE_TOPIC";
-        RemotingCommand request = buildDeleteTopicRequest(topic);
-        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
-        assertThat(response.getRemark()).isEqualTo("Can't modify topic or 
subscription group from slave broker, " +
-            "please execute it from master broker.");
-    }
-
     @Test
     public void testDeleteWithPopRetryTopic() throws Exception {
         String topic = "topicA";
@@ -538,36 +486,6 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
-    @Test
-    public void testUpdateAndCreateSubscriptionGroupOnSlaveInRocksdb() throws 
Exception {
-        initRocksdbSubscriptionManager();
-        testUpdateAndCreateSubscriptionGroupOnSlave();
-    }
-
-    @Test
-    public void testUpdateAndCreateSubscriptionGroupOnSlave() throws 
RemotingCommandException {
-        // Setup
-        MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
-        when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
-        defaultMessageStore = mock(DefaultMessageStore.class);
-        
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
-        // Test
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
 null);
-        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
-        subscriptionGroupConfig.setBrokerId(1);
-        subscriptionGroupConfig.setGroupName("groupId");
-        subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE);
-        subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE);
-        subscriptionGroupConfig.setRetryMaxTimes(111);
-        subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE);
-        
request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
-        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
-        assertThat(response.getRemark()).isEqualTo("Can't modify topic or 
subscription group from slave broker, " +
-            "please execute it from master broker.");
-    }
-
     @Test
     public void testGetAllSubscriptionGroupInRocksdb() throws Exception {
         initRocksdbSubscriptionManager();
@@ -596,30 +514,6 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
-    @Test
-    public void testDeleteSubscriptionGroupOnSlaveInRocksdb() throws Exception 
{
-        initRocksdbSubscriptionManager();
-        testDeleteSubscriptionGroupOnSlave();
-    }
-
-    @Test
-    public void testDeleteSubscriptionGroupOnSlave() throws 
RemotingCommandException {
-        // Setup
-        MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
-        when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
-        defaultMessageStore = mock(DefaultMessageStore.class);
-        
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
-        // Test
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, 
null);
-        request.addExtField("groupName", "GID-Group-Name");
-        request.addExtField("removeOffset", "true");
-        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
-        assertThat(response.getRemark()).isEqualTo("Can't modify topic or 
subscription group from slave broker, " +
-            "please execute it from master broker.");
-    }
-
     @Test
     public void testGetTopicStatsInfo() throws RemotingCommandException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null);

Reply via email to