This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 06ca14a50 [ISSUE #4791] Add elect master command for admin CLI (#4798)
06ca14a50 is described below

commit 06ca14a5081d067a1e61b5d716d3f17c16ef00d0
Author: mxsm <[email protected]>
AuthorDate: Thu Aug 11 10:06:46 2022 +0800

    [ISSUE #4791] Add elect master command for admin CLI (#4798)
---
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 31 ++++++++
 .../controller/ElectMasterRequestHeader.java       | 33 ++++++++
 .../impl/manager/ReplicasInfoManager.java          | 64 +++++++++++----
 .../processor/ControllerRequestProcessor.java      | 33 +++++---
 .../impl/manager/ReplicasInfoManagerTest.java      | 30 ++++++-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    | 20 +++--
 .../tools/admin/DefaultMQAdminExtImpl.java         | 10 ++-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    | 70 ++++++++++------
 .../rocketmq/tools/command/MQAdminStartup.java     |  2 +
 .../controller/ReElectMasterSubCommand.java        | 92 ++++++++++++++++++++++
 10 files changed, 330 insertions(+), 55 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c425dba29..7719b0ae6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -79,6 +79,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
 import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
@@ -185,6 +186,8 @@ import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHea
 import 
org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -3026,4 +3029,32 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
             throw new MQClientException(errResponse.getCode(), 
errResponse.getRemark());
         }
     }
+
+    public ElectMasterResponseHeader electMaster(String controllerAddr, String 
clusterName, String brokerName,
+        String brokerAddr) throws MQBrokerException, RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, InterruptedException, 
RemotingCommandException {
+
+        //get controller leader address
+        final GetMetaDataResponseHeader controllerMetaData = 
this.getControllerMetaData(controllerAddr);
+        assert controllerMetaData != null;
+        assert controllerMetaData.getControllerLeaderAddress() != null;
+        final String leaderAddress = 
controllerMetaData.getControllerLeaderAddress();
+        ElectMasterRequestHeader electRequestHeader = new 
ElectMasterRequestHeader(clusterName, brokerName, brokerAddr);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, 
electRequestHeader);
+        final RemotingCommand response = 
this.remotingClient.invokeSync(leaderAddress, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                BrokerMemberGroup brokerMemberGroup = 
RemotingSerializable.decode(response.getBody(), BrokerMemberGroup.class);
+                ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) 
response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+                if (null != responseHeader) {
+                    responseHeader.setBrokerMemberGroup(brokerMemberGroup);
+                }
+                return responseHeader;
+            }
+            default:
+                break;
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
index d96355e66..13146a4fb 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
@@ -17,11 +17,20 @@
 package org.apache.rocketmq.common.protocol.header.namesrv.controller;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class ElectMasterRequestHeader implements CommandCustomHeader {
+
+    @CFNotNull
+    private String clusterName;
+
+    @CFNotNull
     private String brokerName;
 
+    @CFNotNull
+    private String brokerAddress;
+
     public ElectMasterRequestHeader() {
     }
 
@@ -29,6 +38,12 @@ public class ElectMasterRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
+    public ElectMasterRequestHeader(String clusterName, String brokerName, 
String brokerAddress) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerAddress = brokerAddress;
+    }
+
     public String getBrokerName() {
         return brokerName;
     }
@@ -37,10 +52,28 @@ public class ElectMasterRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
+    public String getBrokerAddress() {
+        return brokerAddress;
+    }
+
+    public void setBrokerAddress(String brokerAddress) {
+        this.brokerAddress = brokerAddress;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
     @Override
     public String toString() {
         return "ElectMasterRequestHeader{" +
+            "clusterName='" + clusterName + '\'' +
             "brokerName='" + brokerName + '\'' +
+            "brokerAddress='" + brokerAddress + '\'' +
             '}';
     }
 
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index a53c7ca9d..c63ca0bf1 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.BiPredicate;
 import java.util.function.Predicate;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -67,8 +68,8 @@ public class ReplicasInfoManager {
     }
 
     public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
-        final AlterSyncStateSetRequestHeader request, final SyncStateSet 
syncStateSet,
-        final BiPredicate<String, String> brokerAlivePredicate) {
+            final AlterSyncStateSetRequestHeader request, final SyncStateSet 
syncStateSet,
+            final BiPredicate<String, String> brokerAlivePredicate) {
         final String brokerName = request.getBrokerName();
         final ControllerResult<AlterSyncStateSetResponseHeader> result = new 
ControllerResult<>(new AlterSyncStateSetResponseHeader());
         final AlterSyncStateSetResponseHeader response = result.getResponse();
@@ -90,7 +91,7 @@ public class ReplicasInfoManager {
             // Check master
             if 
(!syncStateInfo.getMasterAddress().equals(request.getMasterAddress())) {
                 String err = String.format("Rejecting alter syncStateSet 
request because the current leader is:{%s}, not {%s}",
-                    syncStateInfo.getMasterAddress(), 
request.getMasterAddress());
+                        syncStateInfo.getMasterAddress(), 
request.getMasterAddress());
                 log.error("{}", err);
                 
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_MASTER, err);
                 return result;
@@ -99,7 +100,7 @@ public class ReplicasInfoManager {
             // Check master epoch
             if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) {
                 String err = String.format("Rejecting alter syncStateSet 
request because the current master epoch is:{%d}, not {%d}",
-                    syncStateInfo.getMasterEpoch(), request.getMasterEpoch());
+                        syncStateInfo.getMasterEpoch(), 
request.getMasterEpoch());
                 log.error("{}", err);
                 
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH, err);
                 return result;
@@ -108,7 +109,7 @@ public class ReplicasInfoManager {
             // Check syncStateSet epoch
             if (syncStateSet.getSyncStateSetEpoch() != 
syncStateInfo.getSyncStateSetEpoch()) {
                 String err = String.format("Rejecting alter syncStateSet 
request because the current syncStateSet epoch is:{%d}, not {%d}",
-                    syncStateInfo.getSyncStateSetEpoch(), 
syncStateSet.getSyncStateSetEpoch());
+                        syncStateInfo.getSyncStateSetEpoch(), 
syncStateSet.getSyncStateSetEpoch());
                 log.error("{}", err);
                 
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH, 
err);
                 return result;
@@ -150,8 +151,9 @@ public class ReplicasInfoManager {
     }
 
     public ControllerResult<ElectMasterResponseHeader> electMaster(
-        final ElectMasterRequestHeader request, final BiPredicate<String, 
String> brokerAlivePredicate) {
+            final ElectMasterRequestHeader request, final BiPredicate<String, 
String> brokerAlivePredicate) {
         final String brokerName = request.getBrokerName();
+        final String assignBrokerAddress = request.getBrokerAddress();
         final ControllerResult<ElectMasterResponseHeader> result = new 
ControllerResult<>(new ElectMasterResponseHeader());
         if (isContainsBroker(brokerName)) {
             final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
@@ -160,16 +162,26 @@ public class ReplicasInfoManager {
             // First, check whether the master is still active
             final String oldMaster = syncStateInfo.getMasterAddress();
             if (StringUtils.isNoneEmpty(oldMaster) && 
brokerAlivePredicate.test(brokerInfo.getClusterName(), oldMaster)) {
-                String err = String.format("The old master %s is still alive, 
not need to elect new master for broker %s", oldMaster, 
brokerInfo.getBrokerName());
-                log.warn("{}", err);
-                
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
-                return result;
+
+                if (StringUtils.isBlank(assignBrokerAddress)) {
+                    String err = String.format("The old master %s is still 
alive, no need to elect new master for broker %s", oldMaster, 
brokerInfo.getBrokerName());
+                    log.warn("{}", err);
+                    
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+                    return result;
+                }
+
+                if (StringUtils.equals(oldMaster, assignBrokerAddress)) {
+                    String err = String.format("The Re-elect master is the 
same as the old master %s which is still alive, no need to elect new master for 
broker %s", oldMaster, brokerInfo.getBrokerName());
+                    log.warn("{}", err);
+                    
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+                    return result;
+                }
             }
 
             // Try elect a master in syncStateSet
             if (syncStateSet.size() > 1) {
-                boolean electSuccess = tryElectMaster(result, brokerName, 
syncStateSet, candidate ->
-                    !candidate.equals(syncStateInfo.getMasterAddress()) && 
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
+                boolean electSuccess = tryElectMaster(result, brokerName, 
assignBrokerAddress, syncStateSet, candidate ->
+                        !candidate.equals(syncStateInfo.getMasterAddress()) && 
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
                 if (electSuccess) {
                     return result;
                 }
@@ -177,8 +189,8 @@ public class ReplicasInfoManager {
 
             // Try elect a master in lagging replicas if 
enableElectUncleanMaster = true
             if (controllerConfig.isEnableElectUncleanMaster()) {
-                boolean electSuccess = tryElectMaster(result, brokerName, 
brokerInfo.getAllBroker(), candidate ->
-                    !candidate.equals(syncStateInfo.getMasterAddress()) && 
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
+                boolean electSuccess = tryElectMaster(result, brokerName, 
assignBrokerAddress, brokerInfo.getAllBroker(), candidate ->
+                        !candidate.equals(syncStateInfo.getMasterAddress()) && 
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
                 if (electSuccess) {
                     return result;
                 }
@@ -202,9 +214,28 @@ public class ReplicasInfoManager {
      * @return true if elect success
      */
     private boolean tryElectMaster(final 
ControllerResult<ElectMasterResponseHeader> result, final String brokerName,
-        final Set<String> candidates, final Predicate<String> filter) {
+                                   final String assignBrokerAddress, final 
Set<String> candidates, final Predicate<String> filter) {
         final int masterEpoch = 
this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
         final int syncStateSetEpoch = 
this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
+
+        //Re-elect the assigned broker as master
+        if (StringUtils.isNotBlank(assignBrokerAddress) && 
filter.test(assignBrokerAddress)) {
+            final ElectMasterResponseHeader response = result.getResponse();
+            response.setNewMasterAddress(assignBrokerAddress);
+            response.setMasterEpoch(masterEpoch + 1);
+            response.setSyncStateSetEpoch(syncStateSetEpoch);
+            BrokerMemberGroup brokerMemberGroup = 
buildBrokerMemberGroup(brokerName);
+            if (null != brokerMemberGroup) {
+                response.setBrokerMemberGroup(brokerMemberGroup);
+                result.setBody(brokerMemberGroup.encode());
+            }
+            final ElectMasterEvent event = new ElectMasterEvent(brokerName, 
assignBrokerAddress);
+            result.addEvent(event);
+            return true;
+        } else if (StringUtils.isNotBlank(assignBrokerAddress) && 
!filter.test(assignBrokerAddress)) {
+            return false;
+        }
+
         for (final String candidate : candidates) {
             if (filter.test(candidate)) {
                 final ElectMasterResponseHeader response = 
result.getResponse();
@@ -213,6 +244,7 @@ public class ReplicasInfoManager {
                 response.setSyncStateSetEpoch(syncStateSetEpoch);
                 BrokerMemberGroup brokerMemberGroup = 
buildBrokerMemberGroup(brokerName);
                 if (null != brokerMemberGroup) {
+                    response.setBrokerMemberGroup(brokerMemberGroup);
                     result.setBody(brokerMemberGroup.encode());
                 }
                 final ElectMasterEvent event = new 
ElectMasterEvent(brokerName, candidate);
@@ -237,7 +269,7 @@ public class ReplicasInfoManager {
     }
 
     public ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerBroker(
-        final RegisterBrokerToControllerRequestHeader request) {
+            final RegisterBrokerToControllerRequestHeader request) {
         final String brokerName = request.getBrokerName();
         final String brokerAddress = request.getBrokerAddress();
         final ControllerResult<RegisterBrokerToControllerResponseHeader> 
result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 12824d7c3..5b583b493 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -17,21 +17,25 @@
 package org.apache.rocketmq.controller.processor;
 
 import io.netty.channel.ChannelHandlerContext;
+
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
-import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
-import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
 import org.apache.rocketmq.controller.BrokerHeartbeatManager;
 import org.apache.rocketmq.controller.ControllerManager;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -69,9 +73,9 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
     public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
         if (ctx != null) {
             log.debug("Receive request, {} {} {}",
-                request.getCode(),
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                request);
+                    request.getCode(),
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                    request);
         }
         switch (request.getCode()) {
             case CONTROLLER_ALTER_SYNC_STATE_SET: {
@@ -84,10 +88,21 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
                 break;
             }
             case CONTROLLER_ELECT_MASTER: {
-                final ElectMasterRequestHeader controllerRequest = 
(ElectMasterRequestHeader) 
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().electMaster(controllerRequest);
+                final ElectMasterRequestHeader electMasterRequest = 
(ElectMasterRequestHeader) 
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().electMaster(electMasterRequest);
                 if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    final RemotingCommand response = 
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) response.readCustomHeader();
+
+                    if (null != responseHeader) {
+                        if 
(StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
+                            
heartbeatManager.changeBrokerMetadata(electMasterRequest.getClusterName(), 
responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
+                        }
+                        if 
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+                            
this.controllerManager.notifyBrokerRoleChanged(responseHeader, 
electMasterRequest.getClusterName());
+                        }
+                    }
+                    return response;
                 }
                 break;
             }
@@ -99,7 +114,7 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
                     final RegisterBrokerToControllerResponseHeader 
responseHeader = (RegisterBrokerToControllerResponseHeader) 
response.readCustomHeader();
                     if (responseHeader != null && responseHeader.getBrokerId() 
>= 0) {
                         
this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), 
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                            responseHeader.getBrokerId(), 
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel());
+                                responseHeader.getBrokerId(), 
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel());
                     }
                     return response;
                 }
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 09ed9fc70..ed47be2df 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
@@ -53,7 +54,8 @@ public class ReplicasInfoManagerTest {
         this.replicasInfoManager = new ReplicasInfoManager(config);
     }
 
-    public boolean registerNewBroker(String clusterName, String brokerName, 
String brokerAddress, boolean isFirstRegisteredBroker) {
+    public boolean registerNewBroker(String clusterName, String brokerName, 
String brokerAddress,
+        boolean isFirstRegisteredBroker) {
         // Register new broker
         final RegisterBrokerToControllerRequestHeader registerRequest =
             new RegisterBrokerToControllerRequestHeader(clusterName, 
brokerName, brokerAddress);
@@ -72,7 +74,8 @@ public class ReplicasInfoManagerTest {
         return true;
     }
 
-    private boolean alterNewInSyncSet(String brokerName, String masterAddress, 
int masterEpoch, Set<String> newSyncStateSet, int syncStateSetEpoch) {
+    private boolean alterNewInSyncSet(String brokerName, String masterAddress, 
int masterEpoch,
+        Set<String> newSyncStateSet, int syncStateSetEpoch) {
         final AlterSyncStateSetRequestHeader alterRequest =
             new AlterSyncStateSetRequestHeader(brokerName, masterAddress, 
masterEpoch);
         final ControllerResult<AlterSyncStateSetResponseHeader> result = 
this.replicasInfoManager.alterSyncStateSet(alterRequest, new 
SyncStateSet(newSyncStateSet, syncStateSetEpoch), (va1, va2) -> true);
@@ -113,6 +116,29 @@ public class ReplicasInfoManagerTest {
         assertEquals(response.getMasterEpoch(), 2);
         assertFalse(response.getNewMasterAddress().isEmpty());
         assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+
+        final Set<String> brokerSet = new HashSet<>();
+        brokerSet.add("127.0.0.1:9000");
+        brokerSet.add("127.0.0.1:9001");
+        brokerSet.add("127.0.0.1:9002");
+        final ElectMasterRequestHeader assignRequest = new 
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9000");
+        final ControllerResult<ElectMasterResponseHeader> cResult1 = 
this.replicasInfoManager.electMaster(assignRequest, (clusterName, 
brokerAddress) -> brokerAddress.contains("127.0.0.1:9000"));
+        assertEquals( cResult1.getResponseCode(), 
ResponseCode.CONTROLLER_INVALID_REQUEST);
+
+
+        final ElectMasterRequestHeader assignRequest1 = new 
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9001");
+        final ControllerResult<ElectMasterResponseHeader> cResult2 = 
this.replicasInfoManager.electMaster(assignRequest1, (clusterName, 
brokerAddress) -> brokerAddress.equals("127.0.0.1:9000"));
+        assertEquals( cResult2.getResponseCode(), 
ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
+
+        final ElectMasterRequestHeader assignRequest2 = new 
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9001");
+        final ControllerResult<ElectMasterResponseHeader> cResult3 = 
this.replicasInfoManager.electMaster(assignRequest2, (clusterName, 
brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"));
+        assertEquals( cResult3.getResponseCode(), ResponseCode.SUCCESS);
+        final ElectMasterResponseHeader response3 = cResult3.getResponse();
+        assertEquals(response3.getNewMasterAddress(),"127.0.0.1:9001");
+        assertEquals(response.getMasterEpoch(), 2);
+        assertFalse(response.getNewMasterAddress().isEmpty());
+        assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+
     }
 
     @Test
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 3356f94f7..0b550b56b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -53,6 +53,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -208,7 +209,8 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
 
     @Override
     public void updateGlobalWhiteAddrConfig(String addr,
-        String globalWhiteAddrs, String aclFileFullPath) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        String globalWhiteAddrs,
+        String aclFileFullPath) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
         defaultMQAdminExtImpl.updateGlobalWhiteAddrConfig(addr, 
globalWhiteAddrs, aclFileFullPath);
     }
 
@@ -333,7 +335,8 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
-    public ProducerTableInfo getAllProducerInfo(final String brokerAddr) 
throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+    public ProducerTableInfo getAllProducerInfo(
+        final String brokerAddr) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
         return defaultMQAdminExtImpl.getAllProducerInfo(brokerAddr);
     }
 
@@ -534,7 +537,8 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
-    public boolean deleteExpiredCommitLogByAddr(String addr) throws 
RemotingConnectException, RemotingSendRequestException,
+    public boolean deleteExpiredCommitLogByAddr(
+        String addr) throws RemotingConnectException, 
RemotingSendRequestException,
         RemotingTimeoutException, MQClientException, InterruptedException {
         return defaultMQAdminExtImpl.deleteExpiredCommitLogByAddr(addr);
     }
@@ -787,7 +791,6 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         this.defaultMQAdminExtImpl.resetMasterFlushOffset(brokerAddr, 
masterFlushOffset);
     }
 
-
     public QueryResult queryMessageByUniqKey(String topic, String key, int 
maxNum, long begin, long end)
         throws MQClientException, InterruptedException {
 
@@ -808,7 +811,8 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
-    public Map<String, Properties> getControllerConfig(List<String> 
controllerServers) throws InterruptedException, RemotingTimeoutException,
+    public Map<String, Properties> getControllerConfig(
+        List<String> controllerServers) throws InterruptedException, 
RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, 
MQClientException,
         UnsupportedEncodingException {
         return 
this.defaultMQAdminExtImpl.getControllerConfig(controllerServers);
@@ -819,4 +823,10 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         List<String> controllers) throws InterruptedException, 
RemotingConnectException, UnsupportedEncodingException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
MQBrokerException {
         this.defaultMQAdminExtImpl.updateControllerConfig(properties, 
controllers);
     }
+
+    @Override
+    public ElectMasterResponseHeader electMaster(String controllerAddr, String 
clusterName,
+        String brokerName, String brokerAddr) throws RemotingException, 
InterruptedException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.electMaster(controllerAddr, 
clusterName, brokerName, brokerAddr);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index ee28c876e..3cea455a2 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -60,6 +60,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
 import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -1793,6 +1794,12 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         
this.mqClientInstance.getMQClientAPIImpl().resetMasterFlushOffset(brokerAddr, 
masterFlushOffset);
     }
 
+    @Override
+    public ElectMasterResponseHeader electMaster(String controllerAddr, String 
clusterName,
+        String brokerName, String brokerAddr) throws RemotingException, 
InterruptedException, MQBrokerException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().electMaster(controllerAddr, 
clusterName, brokerName, brokerAddr);
+    }
+
     @Override
     public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, 
String groupName, String topicName,
         Boolean readable) throws RemotingException, InterruptedException, 
MQBrokerException {
@@ -1823,7 +1830,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         return 
this.mqClientInstance.getMQClientAPIImpl().getControllerConfig(controllerServers,
 timeoutMillis);
     }
 
-    @Override public void updateControllerConfig(Properties properties,
+    @Override
+    public void updateControllerConfig(Properties properties,
         List<String> controllers) throws InterruptedException, 
RemotingConnectException, UnsupportedEncodingException,
         RemotingSendRequestException, RemotingTimeoutException, 
MQClientException, MQBrokerException {
         
this.mqClientInstance.getMQClientAPIImpl().updateControllerConfig(properties, 
controllers, timeoutMillis);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 9fe6ea4fe..857f159b5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -99,7 +100,8 @@ public interface MQAdminExt extends MQAdmin {
         final String globalWhiteAddrs) throws RemotingException, 
MQBrokerException,
         InterruptedException, MQClientException;
 
-    void updateGlobalWhiteAddrConfig(final String addr, final String 
globalWhiteAddrs, String aclFileFullPath) throws RemotingException, 
MQBrokerException,
+    void updateGlobalWhiteAddrConfig(final String addr, final String 
globalWhiteAddrs,
+        String aclFileFullPath) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
     ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
@@ -113,14 +115,16 @@ public interface MQAdminExt extends MQAdmin {
         final SubscriptionGroupConfig config) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException;
 
-    SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, 
final String group) throws InterruptedException, RemotingException, 
MQClientException, MQBrokerException;
+    SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr,
+        final String group) throws InterruptedException, RemotingException, 
MQClientException, MQBrokerException;
 
     TopicStatsTable examineTopicStats(
         final String topic) throws RemotingException, MQClientException, 
InterruptedException,
         MQBrokerException;
 
-    TopicStatsTable examineTopicStats(String brokerAddr, final String topic) 
throws RemotingException, MQClientException, InterruptedException,
-            MQBrokerException;
+    TopicStatsTable examineTopicStats(String brokerAddr,
+        final String topic) throws RemotingException, MQClientException, 
InterruptedException,
+        MQBrokerException;
 
     AdminToolResult<TopicStatsTable> examineTopicStatsConcurrent(String topic);
 
@@ -162,7 +166,7 @@ public interface MQAdminExt extends MQAdmin {
         MQClientException, InterruptedException, MQBrokerException;
 
     ProducerTableInfo getAllProducerInfo(final String brokerAddr) throws 
RemotingException,
-            MQClientException, InterruptedException, MQBrokerException;
+        MQClientException, InterruptedException, MQBrokerException;
 
     List<String> getNameServerAddressList();
 
@@ -170,7 +174,7 @@ public interface MQAdminExt extends MQAdmin {
         RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException;
 
     int addWritePermOfBroker(final String namesrvAddr, String brokerName) 
throws RemotingCommandException,
-            RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException;
+        RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException;
 
     void putKVConfig(final String namespace, final String key, final String 
value);
 
@@ -200,7 +204,8 @@ public interface MQAdminExt extends MQAdmin {
     void deleteSubscriptionGroup(final String addr, String groupName) throws 
RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
-    void deleteSubscriptionGroup(final String addr, String groupName, boolean 
removeOffset) throws RemotingException, MQBrokerException,
+    void deleteSubscriptionGroup(final String addr, String groupName,
+        boolean removeOffset) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
     void createAndUpdateKvConfig(String namespace, String key,
@@ -233,11 +238,13 @@ public interface MQAdminExt extends MQAdmin {
     GroupList queryTopicConsumeByWho(final String topic) throws 
RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, InterruptedException, MQBrokerException, 
RemotingException, MQClientException;
 
-    TopicList queryTopicsByConsumer(final String group) throws 
InterruptedException, MQBrokerException, RemotingException, MQClientException;
+    TopicList queryTopicsByConsumer(
+        final String group) throws InterruptedException, MQBrokerException, 
RemotingException, MQClientException;
 
     AdminToolResult<TopicList> queryTopicsByConsumerConcurrent(final String 
group);
 
-    SubscriptionData querySubscription(final String group, final String topic) 
throws InterruptedException, MQBrokerException, RemotingException, 
MQClientException;
+    SubscriptionData querySubscription(final String group,
+        final String topic) throws InterruptedException, MQBrokerException, 
RemotingException, MQClientException;
 
     List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
         final String group) throws InterruptedException, MQBrokerException,
@@ -266,7 +273,8 @@ public interface MQAdminExt extends MQAdmin {
     ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, 
final String clientId, final boolean jstack)
         throws RemotingException, MQClientException, InterruptedException;
 
-    ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, 
final String clientId, final boolean jstack, final boolean metrics)
+    ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, 
final String clientId, final boolean jstack,
+        final boolean metrics)
         throws RemotingException, MQClientException, InterruptedException;
 
     ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
@@ -311,7 +319,6 @@ public interface MQAdminExt extends MQAdmin {
         long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQBrokerException;
 
-
     TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
         long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQBrokerException;
@@ -368,8 +375,8 @@ public interface MQAdminExt extends MQAdmin {
     boolean resumeCheckHalfMessage(final String topic,
         final String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException;
 
-    void setMessageRequestMode(final String brokerAddr, final String topic, 
final String consumerGroup, final
-        MessageRequestMode mode, final int popWorkGroupSize, final long 
timeoutMillis)
+    void setMessageRequestMode(final String brokerAddr, final String topic, 
final String consumerGroup,
+        final MessageRequestMode mode, final int popWorkGroupSize, final long 
timeoutMillis)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
         RemotingConnectException, MQClientException;
 
@@ -383,15 +390,16 @@ public interface MQAdminExt extends MQAdmin {
         throws RemotingException, InterruptedException, MQBrokerException;
 
     TopicConfig examineTopicConfig(final String addr,
-                                   final String topic) throws 
InterruptedException, MQBrokerException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException;
+        final String topic) throws InterruptedException, MQBrokerException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException;
 
-    void createStaticTopic(final String addr, final String defaultTopic, final 
TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final 
boolean force) throws RemotingException, InterruptedException, 
MQBrokerException;
+    void createStaticTopic(final String addr, final String defaultTopic, final 
TopicConfig topicConfig,
+        final TopicQueueMappingDetail mappingDetail,
+        final boolean force) throws RemotingException, InterruptedException, 
MQBrokerException;
 
     GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String 
groupName, String topicName,
         Boolean readable)
         throws RemotingException, InterruptedException, MQBrokerException;
 
-
     MessageExt queryMessage(String clusterName,
         String topic,
         String msgId) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException;
@@ -399,21 +407,24 @@ public interface MQAdminExt extends MQAdmin {
     HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws 
RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, InterruptedException, MQBrokerException;
 
-    InSyncStateData getInSyncStateData(String controllerAddress, List<String> 
brokers) throws RemotingException, InterruptedException, MQBrokerException;
+    InSyncStateData getInSyncStateData(String controllerAddress,
+        List<String> brokers) throws RemotingException, InterruptedException, 
MQBrokerException;
 
-    EpochEntryCache getBrokerEpochCache(String brokerAddr) throws 
RemotingException, InterruptedException, MQBrokerException;
+    EpochEntryCache getBrokerEpochCache(
+        String brokerAddr) throws RemotingException, InterruptedException, 
MQBrokerException;
+
+    GetMetaDataResponseHeader getControllerMetaData(
+        String controllerAddr) throws RemotingException, InterruptedException, 
MQBrokerException;
 
-    GetMetaDataResponseHeader getControllerMetaData(String controllerAddr) 
throws RemotingException, InterruptedException, MQBrokerException;
     /**
      * Reset master flush offset in slave
      *
-     * @param brokerAddr slave broker address
+     * @param brokerAddr        slave broker address
      * @param masterFlushOffset master flush offset
      */
     void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset)
         throws InterruptedException, MQBrokerException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException;
 
-
     /**
      * Get controller config.
      * <br>
@@ -421,7 +432,8 @@ public interface MQAdminExt extends MQAdmin {
      *
      * @return The fetched controller config
      */
-    Map<String, Properties> getControllerConfig(List<String> 
controllerServers) throws InterruptedException, RemotingTimeoutException,
+    Map<String, Properties> getControllerConfig(
+        List<String> controllerServers) throws InterruptedException, 
RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, 
MQClientException, UnsupportedEncodingException;
 
     /**
@@ -433,4 +445,18 @@ public interface MQAdminExt extends MQAdmin {
         final List<String> controllers) throws InterruptedException, 
RemotingConnectException,
         UnsupportedEncodingException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException, MQBrokerException;
 
+    /**
+     * manual trigger broker elect master
+     *
+     * @param controllerAddr controller address
+     * @param brokerName     broker name
+     * @param brokerAddr     broker address
+     * @return
+     * @throws RemotingException
+     * @throws InterruptedException
+     * @throws MQBrokerException
+     */
+    ElectMasterResponseHeader electMaster(String controllerAddr, String 
clusterName, String brokerName,
+        String brokerAddr)
+        throws RemotingException, InterruptedException, MQBrokerException;
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index fc9226d8a..2673c6747 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -63,6 +63,7 @@ import 
org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
 import 
org.apache.rocketmq.tools.command.controller.GetControllerConfigSubCommand;
 import 
org.apache.rocketmq.tools.command.controller.GetControllerMetaDataSubCommand;
 import 
org.apache.rocketmq.tools.command.controller.UpdateControllerConfigSubCommand;
+import org.apache.rocketmq.tools.command.controller.ReElectMasterSubCommand;
 import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
 import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
 import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
@@ -264,6 +265,7 @@ public class MQAdminStartup {
 
         initCommand(new GetControllerConfigSubCommand());
         initCommand(new UpdateControllerConfigSubCommand());
+        initCommand(new ReElectMasterSubCommand());
     }
 
     private static void initLogback() throws JoranException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
new file mode 100644
index 000000000..47e8dfadc
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.controller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ReElectMasterSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "electMaster";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Re-elect the specified broker as master";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("a", "controllerAddress", true, "The address 
of controller");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerAddress", true, "The address of the 
broker which requires to become master");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("n", "brokerName", true, "The broker name of the 
replicas that require to be manipulated");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "the clusterName of 
broker");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        String controllerAddress = commandLine.getOptionValue("a").trim();
+        String clusterName = commandLine.getOptionValue('c').trim();
+        String brokerName = commandLine.getOptionValue('n').trim();
+        String brokerAddress = commandLine.getOptionValue("b").trim();
+
+        try {
+            defaultMQAdminExt.start();
+            final ElectMasterResponseHeader metaData = 
defaultMQAdminExt.electMaster(controllerAddress, clusterName, brokerName, 
brokerAddress);
+            System.out.printf("\n#ClusterName\t%s", clusterName);
+            System.out.printf("\n#BrokerName\t%s", brokerName);
+            System.out.printf("\n#BrokerMasterAddr\t%s", 
metaData.getNewMasterAddress());
+            System.out.printf("\n#MasterEpoch\t%s", metaData.getMasterEpoch());
+            System.out.printf("\n#SyncStateSetEpoch\t%s\n", 
metaData.getSyncStateSetEpoch());
+            BrokerMemberGroup brokerMemberGroup = 
metaData.getBrokerMemberGroup();
+            if (null != brokerMemberGroup && null != 
brokerMemberGroup.getBrokerAddrs()) {
+                brokerMemberGroup.getBrokerAddrs().forEach((key, value) -> 
System.out.printf("\t#Broker\t%d\t%s\n", key, value));
+            }
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+
+    }
+}

Reply via email to