This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller 
by this push:
     new f905da411 [Summer of code] Let broker send heartbeat to controller 
(#4341)
f905da411 is described below

commit f905da411a71eecba9f896db52227a7b3081b6a6
Author: hzh0425 <[email protected]>
AuthorDate: Fri May 20 17:35:07 2022 +0800

    [Summer of code] Let broker send heartbeat to controller (#4341)
    
    * let broker send heartbeat to controller
    
    * code remview
    
    * code review
    
    * fix bug
    
    * add state in replicasmanager
    
    * add brokerId when getReplicasInfo
    
    * code review
---
 .../apache/rocketmq/broker/BrokerController.java   | 72 +++++++++++++++-------
 .../broker/hacontroller/ReplicasManager.java       | 70 +++++++++++++++++----
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 39 +++++++++++-
 .../org/apache/rocketmq/common/BrokerConfig.java   | 50 ++++++++++++---
 .../common/protocol/body/SyncStateSet.java         |  2 +-
 .../controller/GetReplicaInfoRequestHeader.java    | 18 +++++-
 .../controller/GetReplicaInfoResponseHeader.java   | 11 ++++
 .../rocketmq/container/InnerBrokerController.java  |  2 +-
 .../org/apache/rocketmq/controller/Controller.java |  5 ++
 .../rocketmq/controller/ControllerManager.java     | 18 +++---
 .../controller/impl/DledgerController.java         |  5 ++
 .../controller/impl/manager/BrokerInfo.java        |  5 +-
 .../impl/manager/ReplicasInfoManager.java          |  5 ++
 .../apache/rocketmq/store/DefaultMessageStore.java |  4 +-
 .../rocketmq/store/config/MessageStoreConfig.java  | 14 -----
 .../store/ha/autoswitch/AutoSwitchHATest.java      |  4 +-
 .../test/autoswitchrole/AutoSwitchRoleBase.java    |  2 +-
 .../AutoSwitchRoleIntegrationTest.java             |  2 +-
 18 files changed, 249 insertions(+), 79 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 2aa307343..cb2a4cc32 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.AccessValidator;
 import org.apache.rocketmq.broker.client.ClientHousekeepingService;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
@@ -603,7 +604,7 @@ public class BrokerController {
             }
         }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
 
-        if (!messageStoreConfig.isEnableDLegerCommitLog() && 
!messageStoreConfig.isDuplicationEnable() && 
!messageStoreConfig.isStartupControllerMode()) {
+        if (!messageStoreConfig.isEnableDLegerCommitLog() && 
!messageStoreConfig.isDuplicationEnable() && 
!brokerConfig.isStartupControllerMode()) {
             if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                 if (this.messageStoreConfig.getHaMasterAddress() != null && 
this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH) 
{
                     
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
@@ -639,7 +640,7 @@ public class BrokerController {
             }
         }
 
-        if (this.messageStoreConfig.isStartupControllerMode()) {
+        if (this.brokerConfig.isStartupControllerMode()) {
             this.updateMasterHAServerAddrPeriodically = true;
         }
     }
@@ -711,7 +712,7 @@ public class BrokerController {
                 MessageStorePluginContext context = new 
MessageStorePluginContext(this, messageStoreConfig, brokerStatsManager, 
messageArrivingListener);
                 this.messageStore = MessageStoreFactory.build(context, 
defaultMessageStore);
                 this.messageStore.getDispatcherList().addFirst(new 
CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
-                if (this.messageStoreConfig.isStartupControllerMode()) {
+                if (this.brokerConfig.isStartupControllerMode()) {
                     this.replicasManager = new ReplicasManager(this);
                 }
             } catch (IOException e) {
@@ -1452,7 +1453,7 @@ public class BrokerController {
 
         startBasicService();
 
-        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() 
&& !this.messageStoreConfig.isDuplicationEnable() && 
!this.messageStoreConfig.isStartupControllerMode()) {
+        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() 
&& !this.messageStoreConfig.isDuplicationEnable() && 
!this.brokerConfig.isStartupControllerMode()) {
             changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == 
MixAll.MASTER_ID);
             this.registerBrokerAll(true, false, true);
         }
@@ -1490,7 +1491,7 @@ public class BrokerController {
             }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), 
TimeUnit.MILLISECONDS));
         }
 
-        if (this.messageStoreConfig.isStartupControllerMode()) {
+        if (this.brokerConfig.isStartupControllerMode()) {
             scheduleSendHeartbeat();
         }
 
@@ -1600,6 +1601,10 @@ public class BrokerController {
             BrokerController.LOG.info("BrokerController#doResterBrokerAll: 
broker has shutdown, no need to register any more.");
             return;
         }
+        Long heartbeatTimeoutMillis = 
(this.brokerConfig.isEnableSlaveActingMaster() ||
+            (this.brokerConfig.isStartupControllerMode() && 
!this.brokerConfig.isControllerDeployedStandAlone())) ?
+            this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null;
+
         List<RegisterBrokerResult> registerBrokerResultList = 
this.brokerOuterAPI.registerBrokerAll(
             this.brokerConfig.getBrokerClusterName(),
             this.getBrokerAddr(),
@@ -1612,30 +1617,51 @@ public class BrokerController {
             this.brokerConfig.getRegisterBrokerTimeoutMills(),
             this.brokerConfig.isEnableSlaveActingMaster(),
             this.brokerConfig.isCompressedRegister(),
-            (this.brokerConfig.isEnableSlaveActingMaster() || 
this.messageStoreConfig.isStartupControllerMode()) ? 
this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
+            heartbeatTimeoutMillis,
             this.getBrokerIdentity());
 
         handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
     }
 
     protected void sendHeartbeat() {
-        if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
-            this.brokerOuterAPI.sendHeartbeatViaDataVersion(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.brokerConfig.getSendHeartbeatTimeoutMillis(),
-                this.getTopicConfigManager().getDataVersion(),
-                this.brokerConfig.isInBrokerContainer());
-        } else {
-            this.brokerOuterAPI.sendHeartbeat(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.brokerConfig.getSendHeartbeatTimeoutMillis(),
-                this.brokerConfig.isInBrokerContainer());
+        boolean shouldSendHeartbeatToController = 
this.brokerConfig.isStartupControllerMode() && 
this.brokerConfig.isControllerDeployedStandAlone();
+        if (shouldSendHeartbeatToController) {
+            final List<String> controllerAddresses = 
this.replicasManager.getControllerAddresses();
+            for (String controllerAddress : controllerAddresses) {
+                if (StringUtils.isNotEmpty(controllerAddress)) {
+                    this.brokerOuterAPI.sendHeartbeatToController(
+                        controllerAddress,
+                        this.brokerConfig.getBrokerClusterName(),
+                        this.getBrokerAddr(),
+                        this.brokerConfig.getBrokerName(),
+                        this.brokerConfig.getBrokerId(),
+                        this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                        this.brokerConfig.isInBrokerContainer()
+                    );
+                }
+            }
+        }
+
+        boolean shouldSendHeartbeatToNameSrv = 
this.brokerConfig.isEnableSlaveActingMaster() || 
!shouldSendHeartbeatToController;
+        if (shouldSendHeartbeatToNameSrv) {
+            if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
+                this.brokerOuterAPI.sendHeartbeatViaDataVersion(
+                    this.brokerConfig.getBrokerClusterName(),
+                    this.getBrokerAddr(),
+                    this.brokerConfig.getBrokerName(),
+                    this.brokerConfig.getBrokerId(),
+                    this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                    this.getTopicConfigManager().getDataVersion(),
+                    this.brokerConfig.isInBrokerContainer());
+            } else {
+                this.brokerOuterAPI.sendHeartbeat(
+                    this.brokerConfig.getBrokerClusterName(),
+                    this.getBrokerAddr(),
+                    this.brokerConfig.getBrokerName(),
+                    this.brokerConfig.getBrokerId(),
+                    this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                    this.brokerConfig.isInBrokerContainer());
+            }
         }
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index fd09239ab..b59995ae2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -63,6 +63,7 @@ public class ReplicasManager {
     private final List<String> controllerAddresses;
 
     private volatile String controllerLeaderAddress = "";
+    private volatile State state = State.INITIAL;
 
     private ScheduledFuture<?> checkSyncStateSetTaskFuture;
     private ScheduledFuture<?> slaveSyncFuture;
@@ -76,7 +77,7 @@ public class ReplicasManager {
         this.brokerController = brokerController;
         this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
         this.scheduledService = Executors.newScheduledThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ScheduledService_", 
brokerController.getBrokerIdentity()));
-        this.executorService = Executors.newFixedThreadPool(2, new 
ThreadFactoryImpl("ReplicasManager_ExecutorService_", 
brokerController.getBrokerIdentity()));
+        this.executorService = Executors.newFixedThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ExecutorService_", 
brokerController.getBrokerIdentity()));
         this.haService = (AutoSwitchHAService) 
brokerController.getMessageStore().getHaService();
         this.brokerConfig = brokerController.getBrokerConfig();
         final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
@@ -89,19 +90,56 @@ public class ReplicasManager {
         this.haService.setLocalAddress(this.localAddress);
     }
 
+    enum State {
+        INITIAL,
+        FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+        RUNNING,
+    }
+
     public void start() {
-        if (!schedulingSyncControllerMetadata()) {
-            return;
+        if (!startBasicService()) {
+            LOGGER.error("Failed to start replicasManager");
+            this.executorService.submit(() -> {
+                int tryTimes = 1;
+                while (!startBasicService()) {
+                    tryTimes++;
+                    LOGGER.error("Failed to start replicasManager, try 
times:{}, current state:{}, try it again", tryTimes, this.state);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+                LOGGER.info("Start replicasManager success, try times:{}", 
tryTimes);
+            });
+        }
+    }
+
+    private boolean startBasicService() {
+        if (this.state == State.INITIAL) {
+            if (schedulingSyncControllerMetadata()) {
+                LOGGER.info("First time sync controller metadata success");
+                this.state = State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE;
+            } else {
+                return false;
+            }
         }
 
-        if (!registerBroker()) {
-            return;
+        if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
+            if (registerBroker()) {
+                LOGGER.info("First time register broker success");
+                this.state = State.RUNNING;
+            } else {
+                return false;
+            }
         }
 
         schedulingSyncBrokerMetadata();
+        return true;
     }
 
     public void shutdown() {
+        this.state = State.INITIAL;
+        this.executorService.shutdown();
         this.scheduledService.shutdown();
     }
 
@@ -143,7 +181,7 @@ public class ReplicasManager {
         }
     }
 
-    public void changeToSlave(final String newMasterAddress, final int 
newMasterEpoch) {
+    public void changeToSlave(final String newMasterAddress, final int 
newMasterEpoch, long brokerId) {
         synchronized (this) {
             if (newMasterEpoch > this.masterEpoch) {
                 LOGGER.info("Begin to change to slave, brokerName={}, 
replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), 
this.localAddress, this.brokerConfig.getBrokerId());
@@ -156,6 +194,7 @@ public class ReplicasManager {
                 // Change config
                 
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE);
                 this.brokerController.changeSpecialServiceStatus(false);
+                this.brokerConfig.setBrokerId(brokerId);
 
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SLAVE);
@@ -214,12 +253,11 @@ public class ReplicasManager {
         try {
             final BrokerRegisterResponseHeader registerResponse = 
this.brokerOuterAPI.registerBroker(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), 
this.localAddress);
             final String newMasterAddress = 
registerResponse.getMasterAddress();
-            this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
             if (StringUtils.isNoneEmpty(newMasterAddress)) {
                 if (StringUtils.equals(newMasterAddress, this.localAddress)) {
                     changeToMaster(registerResponse.getMasterEpoch(), 
registerResponse.getSyncStateSetEpoch());
                 } else {
-                    changeToSlave(newMasterAddress, 
registerResponse.getMasterEpoch());
+                    changeToSlave(newMasterAddress, 
registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
                 }
             }
             return true;
@@ -235,18 +273,24 @@ public class ReplicasManager {
     private void schedulingSyncBrokerMetadata() {
         this.scheduledService.scheduleAtFixedRate(() -> {
             try {
-                final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result 
= this.brokerOuterAPI.getReplicaInfo(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerName());
+                final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result 
= this.brokerOuterAPI.getReplicaInfo(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerName(), this.localAddress);
                 final GetReplicaInfoResponseHeader info = result.getObject1();
                 final SyncStateSet syncStateSet = result.getObject2();
                 final String newMasterAddress = info.getMasterAddress();
                 final int newMasterEpoch = info.getMasterEpoch();
+                final long brokerId = info.getBrokerId();
                 synchronized (this) {
                     // Check if master changed
                     if (StringUtils.isNoneEmpty(newMasterAddress) && 
!StringUtils.equals(this.masterAddress, newMasterAddress) && newMasterEpoch > 
this.masterEpoch) {
                         if (StringUtils.equals(newMasterAddress, 
this.localAddress)) {
                             changeToMaster(newMasterEpoch, 
syncStateSet.getSyncStateSetEpoch());
                         } else {
-                            changeToSlave(newMasterAddress, newMasterEpoch);
+                            if (brokerId > 0) {
+                                changeToSlave(newMasterAddress, 
newMasterEpoch, brokerId);
+                            } else if (brokerId < 0) {
+                                // If the brokerId is no existed, we should 
try register again.
+                                registerBroker();
+                            }
                         }
                     } else {
                         // Check if sync state set changed
@@ -340,7 +384,7 @@ public class ReplicasManager {
     }
 
     public SyncStateSet getSyncStateSet() {
-        return new SyncStateSet(new HashSet<>(this.syncStateSet), 
this.syncStateSetEpoch);
+        return new SyncStateSet(this.syncStateSet, this.syncStateSetEpoch);
     }
 
     public String getLocalAddress() {
@@ -354,4 +398,8 @@ public class ReplicasManager {
     public int getMasterEpoch() {
         return masterEpoch;
     }
+
+    public List<String> getControllerAddresses() {
+        return controllerAddresses;
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 60701913a..dbede0763 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.producer.SendResult;
@@ -1062,8 +1063,8 @@ public class BrokerOuterAPI {
      * Get broker replica info
      */
     public Pair<GetReplicaInfoResponseHeader, SyncStateSet> 
getReplicaInfo(final String controllerAddress,
-        final String brokerName) throws Exception {
-        final GetReplicaInfoRequestHeader requestHeader = new 
GetReplicaInfoRequestHeader(brokerName);
+        final String brokerName, final String brokerAddress) throws Exception {
+        final GetReplicaInfoRequestHeader requestHeader = new 
GetReplicaInfoRequestHeader(brokerName, brokerAddress);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
         assert response != null;
@@ -1080,4 +1081,38 @@ public class BrokerOuterAPI {
         }
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
+
+    /**
+     * Send heartbeat to controller
+     */
+    public void sendHeartbeatToController(final String controllerAddress,
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final Long brokerId,
+        final int timeoutMills,
+        final boolean isInBrokerContainer) {
+        if (StringUtils.isEmpty(controllerAddress)) {
+            return;
+        }
+
+        final BrokerHeartbeatRequestHeader requestHeader = new 
BrokerHeartbeatRequestHeader();
+        requestHeader.setClusterName(clusterName);
+        requestHeader.setBrokerAddr(brokerAddr);
+        requestHeader.setBrokerName(brokerName);
+
+        brokerOuterExecutor.execute(new AbstractBrokerRunnable(new 
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
+            @Override
+            public void run2() {
+                RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, 
requestHeader);
+
+                try {
+                    
BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, 
timeoutMills);
+                } catch (Exception e) {
+                    LOGGER.error("Error happen when send heartbeat to 
controller {}", controllerAddress, e);
+                }
+            }
+        });
+    }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 898cd3fe0..26d05e1d7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -293,10 +293,24 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private boolean lockInStrictMode = false;
 
-    private String controllerAddr = "";
-
     private boolean compatibleWithOldNameSrv = true;
 
+    /**
+     * Is startup controller mode, which support auto switch broker's role.
+     */
+    private boolean startupControllerMode = false;
+
+    /**
+     * Whether the controller is deployed independently
+     */
+    private boolean isControllerDeployedStandAlone = false;
+
+    /**
+     * If isControllerDeployedStandAlone = false, controllerAddr should be the 
addresses of all name-srv which running the controller instance.
+     * If isControllerDeployedStandAlone = true, controllerAddr should be 
controller's address.
+     */
+    private String controllerAddr = "";
+
     private long replicasManagerSyncBrokerMetadataPeriod = 5 * 1000;
 
     private long replicasManagerCheckSyncStateSetPeriod = 8 * 1000;
@@ -1263,14 +1277,6 @@ public class BrokerConfig extends BrokerIdentity {
         this.lockInStrictMode = lockInStrictMode;
     }
 
-    public String getControllerAddr() {
-        return controllerAddr;
-    }
-
-    public void setControllerAddr(String controllerAddr) {
-        this.controllerAddr = controllerAddr;
-    }
-
     public boolean isIsolateLogEnable() {
         return isolateLogEnable;
     }
@@ -1287,6 +1293,30 @@ public class BrokerConfig extends BrokerIdentity {
         this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
     }
 
+    public boolean isStartupControllerMode() {
+        return startupControllerMode;
+    }
+
+    public void setStartupControllerMode(boolean startupControllerMode) {
+        this.startupControllerMode = startupControllerMode;
+    }
+
+    public boolean isControllerDeployedStandAlone() {
+        return isControllerDeployedStandAlone;
+    }
+
+    public void setControllerDeployedStandAlone(boolean 
controllerDeployedStandAlone) {
+        isControllerDeployedStandAlone = controllerDeployedStandAlone;
+    }
+
+    public String getControllerAddr() {
+        return controllerAddr;
+    }
+
+    public void setControllerAddr(String controllerAddr) {
+        this.controllerAddr = controllerAddr;
+    }
+
     public long getReplicasManagerSyncBrokerMetadataPeriod() {
         return replicasManagerSyncBrokerMetadataPeriod;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SyncStateSet.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SyncStateSet.java
index 19e44b5f7..1dccf1edd 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SyncStateSet.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SyncStateSet.java
@@ -26,7 +26,7 @@ public class SyncStateSet extends RemotingSerializable {
     private int syncStateSetEpoch;
 
     public SyncStateSet(Set<String> syncStateSet, int syncStateSetEpoch) {
-        this.syncStateSet = syncStateSet;
+        this.syncStateSet = new HashSet<>(syncStateSet);
         this.syncStateSetEpoch = syncStateSetEpoch;
     }
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
index 1e8fd9d05..b881ea178 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
@@ -21,6 +21,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class GetReplicaInfoRequestHeader implements CommandCustomHeader {
     private String brokerName;
+    private String brokerAddress;
 
     public GetReplicaInfoRequestHeader() {
     }
@@ -29,6 +30,11 @@ public class GetReplicaInfoRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
+    public GetReplicaInfoRequestHeader(String brokerName, String 
brokerAddress) {
+        this.brokerName = brokerName;
+        this.brokerAddress = brokerAddress;
+    }
+
     public String getBrokerName() {
         return brokerName;
     }
@@ -37,10 +43,18 @@ public class GetReplicaInfoRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
-    @Override
-    public String toString() {
+    public String getBrokerAddress() {
+        return brokerAddress;
+    }
+
+    public void setBrokerAddress(String brokerAddress) {
+        this.brokerAddress = brokerAddress;
+    }
+
+    @Override public String toString() {
         return "GetReplicaInfoRequestHeader{" +
             "brokerName='" + brokerName + '\'' +
+            ", brokerAddress='" + brokerAddress + '\'' +
             '}';
     }
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
index 1a9cb2316..72d4c48c0 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
@@ -22,6 +22,8 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 public class GetReplicaInfoResponseHeader implements CommandCustomHeader {
     private String masterAddress;
     private int masterEpoch;
+    // BrokerId for current replicas.
+    private long brokerId = -1L;
 
     public GetReplicaInfoResponseHeader() {
     }
@@ -42,10 +44,19 @@ public class GetReplicaInfoResponseHeader implements 
CommandCustomHeader {
         this.masterEpoch = masterEpoch;
     }
 
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
     @Override public String toString() {
         return "GetReplicaInfoResponseHeader{" +
             "masterAddress='" + masterAddress + '\'' +
             ", masterEpoch=" + masterEpoch +
+            ", brokerId=" + brokerId +
             '}';
     }
 
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
 
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index 48863ce1b..70294e848 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -100,7 +100,7 @@ public class InnerBrokerController extends BrokerController 
{
             }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), 
TimeUnit.MILLISECONDS));
         }
 
-        if (this.messageStoreConfig.isStartupControllerMode()) {
+        if (this.brokerConfig.isStartupControllerMode()) {
             scheduleSendHeartbeat();
         }
 
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java 
b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index ec45a8009..fca8f5565 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -50,6 +50,11 @@ public interface Controller {
      */
     void stopScheduling();
 
+    /**
+     * Whether this controller is in leader state.
+     */
+    boolean isLeaderState();
+
     /**
      * Alter ISR of broker replicas.
      *
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 7b835c5c0..1221709de 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -101,14 +101,18 @@ public class ControllerManager {
             @Override
             public void onBrokerInactive(String brokerName, String 
brokerAddress, long brokerId) {
                 if (brokerId == MixAll.MASTER_ID) {
-                    final CompletableFuture<RemotingCommand> future = 
controller.electMaster(new ElectMasterRequestHeader(brokerName));
-                    try {
-                        final RemotingCommand response = future.get(5, 
TimeUnit.SECONDS);
-                        final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) response.readCustomHeader();
-                        if (responseHeader != null) {
-                            log.info("Broker{}'s master shutdown, elect a new 
master:{}", brokerName, responseHeader);
+                    if (controller.isLeaderState()) {
+                        final CompletableFuture<RemotingCommand> future = 
controller.electMaster(new ElectMasterRequestHeader(brokerName));
+                        try {
+                            final RemotingCommand response = future.get(5, 
TimeUnit.SECONDS);
+                            final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) response.readCustomHeader();
+                            if (responseHeader != null) {
+                                log.info("Broker{}'s master shutdown, elect a 
new master:{}", brokerName, responseHeader);
+                            }
+                        } catch (Exception ignored) {
                         }
-                    } catch (Exception ignored) {
+                    } else {
+                        log.info("Broker{}' master shutdown", brokerName);
                     }
                 }
             }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
index d057e705d..8a4428f3b 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
@@ -124,6 +124,11 @@ public class DledgerController implements Controller {
         }
     }
 
+    @Override
+    public boolean isLeaderState() {
+        return this.roleHandler.isLeaderState();
+    }
+
     @Override
     public CompletableFuture<RemotingCommand> 
alterSyncStateSet(AlterSyncStateSetRequestHeader request,
         final SyncStateSet syncStateSet) {
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
index 1b0242e0d..f819ff1b3 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
@@ -63,6 +63,9 @@ public class BrokerInfo {
     }
 
     public Long getBrokerId(final String address) {
-        return this.brokerIdTable.get(address);
+        if (this.brokerIdTable.containsKey(address)) {
+            return this.brokerIdTable.get(address);
+        }
+        return -1L;
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 85ac2734f..3f991df4e 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.BiPredicate;
 import java.util.function.Predicate;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.namesrv.ControllerConfig;
@@ -262,9 +263,13 @@ public class ReplicasInfoManager {
         if (isContainsBroker(brokerName)) {
             // If exist broker metadata, just return metadata
             final InSyncReplicasInfo replicasInfo = 
this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerInfo brokerInfo = 
this.replicaInfoTable.get(brokerName);
             final String masterAddress = replicasInfo.getMasterAddress();
             response.setMasterAddress(masterAddress);
             response.setMasterEpoch(replicasInfo.getMasterEpoch());
+            if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
+                
response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress()));
+            }
             result.setBody(new SyncStateSet(replicasInfo.getSyncStateSet(), 
replicasInfo.getSyncStateSetEpoch()).encode());
             return result;
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index afbf4dc58..f2fca86bd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -191,7 +191,7 @@ public class DefaultMessageStore implements MessageStore {
         if (!messageStoreConfig.isEnableDLegerCommitLog() && 
!this.messageStoreConfig.isDuplicationEnable()) {
             this.haService = 
ServiceProvider.loadClass(ServiceProvider.HA_SERVICE_ID, HAService.class);
             if (null == this.haService) {
-                if (this.messageStoreConfig.isStartupControllerMode()) {
+                if (brokerConfig.isStartupControllerMode()) {
                     this.haService = new AutoSwitchHAService();
                     LOGGER.warn("Load AutoSwitch HA Service: {}", 
AutoSwitchHAService.class.getSimpleName());
                 } else {
@@ -1952,7 +1952,7 @@ public class DefaultMessageStore implements MessageStore {
                     destroyMappedFileIntervalForcibly, cleanAtOnce, 
deleteFileBatchMax);
                 if (deleteCount > 0) {
                     // If in the controller mode, we should notify the 
AutoSwitchHaService to truncateEpochFile
-                    if 
(DefaultMessageStore.this.messageStoreConfig.isStartupControllerMode()) {
+                    if 
(DefaultMessageStore.this.brokerConfig.isStartupControllerMode()) {
                         if (DefaultMessageStore.this.haService instanceof 
AutoSwitchHAService) {
                             final long minPhyOffset = getMinPhyOffset();
                             ((AutoSwitchHAService) 
DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 99b88d521..a182e07cf 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -253,12 +253,6 @@ public class MessageStoreConfig {
     @ImportantField
     private boolean enableAutoInSyncReplicas = false;
 
-    /**
-     * Whether startup controller mode
-     */
-    @ImportantField
-    private boolean startupControllerMode = false;
-
     /**
      * Enable or not ha flow control
      */
@@ -1174,14 +1168,6 @@ public class MessageStoreConfig {
         this.enableAutoInSyncReplicas = enableAutoInSyncReplicas;
     }
 
-    public boolean isStartupControllerMode() {
-        return startupControllerMode;
-    }
-
-    public void setStartupControllerMode(boolean startupControllerMode) {
-        this.startupControllerMode = startupControllerMode;
-    }
-
     public boolean isHaFlowControlEnable() {
         return haFlowControlEnable;
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 436dd1eed..8f74fbbb0 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -79,7 +79,6 @@ public class AutoSwitchHATest {
         storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + 
"broker1" + File.separator + "EpochFileCache");
         storeConfig1.setTotalReplicas(3);
         storeConfig1.setInSyncReplicas(2);
-        storeConfig1.setStartupControllerMode(true);
         buildMessageStoreConfig(storeConfig1, mappedFileSize);
         this.store1HaAddress = "127.0.0.1:10912";
 
@@ -91,7 +90,6 @@ public class AutoSwitchHATest {
         storeConfig2.setHaListenPort(10943);
         storeConfig2.setTotalReplicas(3);
         storeConfig2.setInSyncReplicas(2);
-        storeConfig2.setStartupControllerMode(true);
         buildMessageStoreConfig(storeConfig2, mappedFileSize);
         this.store2HaAddress = "127.0.0.1:10943";
 
@@ -106,7 +104,6 @@ public class AutoSwitchHATest {
         storeConfig3.setHaListenPort(10980);
         storeConfig3.setTotalReplicas(3);
         storeConfig3.setInSyncReplicas(2);
-        storeConfig3.setStartupControllerMode(true);
         buildMessageStoreConfig(storeConfig3, mappedFileSize);
         messageStore3 = buildMessageStore(storeConfig3, 3L);
         this.store3HaAddress = "127.0.0.1:10980";
@@ -309,6 +306,7 @@ public class AutoSwitchHATest {
         long brokerId) throws Exception {
         BrokerConfig brokerConfig = new BrokerConfig();
         brokerConfig.setBrokerId(brokerId);
+        brokerConfig.setStartupControllerMode(true);
         return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, 
null, brokerConfig);
     }
 
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
 
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index 4db66a718..0703c65a7 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -82,6 +82,7 @@ public class AutoSwitchRoleBase {
         brokerConfig.setControllerAddr(namesrvAddress);
         brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
         brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(4 * 1000);
+        brokerConfig.setStartupControllerMode(true);
 
         final NettyServerConfig nettyServerConfig = new NettyServerConfig();
         nettyServerConfig.setListenPort(nettyListenPort);
@@ -110,7 +111,6 @@ public class AutoSwitchRoleBase {
         storeConfig.setStorePathEpochFile(storePathRootDir + File.separator + 
brokerName + File.separator + "EpochFileCache");
         storeConfig.setTotalReplicas(3);
         storeConfig.setInSyncReplicas(2);
-        storeConfig.setStartupControllerMode(true);
 
         storeConfig.setMappedFileSizeCommitLog(mappedFileSize);
         storeConfig.setMappedFileSizeConsumeQueue(1024 * 1024);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
 
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 2279e77a4..d2219a870 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -115,7 +115,7 @@ public class AutoSwitchRoleIntegrationTest extends 
AutoSwitchRoleBase {
         // Let master shutdown
         brokerController1.shutdown();
         this.brokerList.remove(this.brokerController1);
-        Thread.sleep(5000);
+        Thread.sleep(6000);
 
         // The slave should change to master
         assertTrue(brokerController2.getReplicasManager().isMasterState());

Reply via email to