This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 88e00547f5254a6b7fd3031a00fd2d640b84228f
Author: TheR1sing3un <[email protected]>
AuthorDate: Fri Feb 3 23:18:47 2023 +0800

    feat(controller): refactor broker's information recording core from ip 
address to broker id
    
    1. refactor broker's information recording core from ip address to broker id
---
 .../org/apache/rocketmq/common/BrokerAddrInfo.java |  43 ++--
 .../controller/BrokerHeartbeatManager.java         |   4 +-
 .../apache/rocketmq/controller/BrokerLiveInfo.java |  11 +-
 .../rocketmq/controller/elect/ElectPolicy.java     |   9 +-
 .../controller/elect/impl/DefaultElectPolicy.java  |  56 ++---
 .../BrokerLiveInfoGetter.java}                     |  18 +-
 .../BrokerValidPredicate.java}                     |  20 +-
 .../impl/DefaultBrokerHeartbeatManager.java        |  15 +-
 .../impl/event/AlterSyncStateSetEvent.java         |   6 +-
 .../controller/impl/event/ApplyBrokerIdEvent.java  |  21 +-
 .../impl/event/CleanBrokerDataEvent.java           |  20 +-
 .../controller/impl/event/ElectMasterEvent.java    |  24 +--
 .../controller/impl/manager/BrokerReplicaInfo.java |  40 ++--
 .../impl/manager/ReplicasInfoManager.java          | 225 ++++++++++-----------
 .../controller/impl/manager/SyncStateInfo.java     |  30 +--
 .../namesrv/routeinfo/RouteInfoManager.java        |  65 +++++-
 .../remoting/protocol/body/BrokerReplicasInfo.java |  48 +++--
 .../remoting/protocol/body/SyncStateSet.java       |   8 +-
 .../controller/AlterSyncStateSetRequestHeader.java |  22 +-
 .../CleanControllerBrokerDataRequestHeader.java    |  14 +-
 .../controller/ElectMasterRequestHeader.java       |  34 ++--
 .../controller/ElectMasterResponseHeader.java      |  24 +--
 .../controller/GetReplicaInfoResponseHeader.java   |  20 +-
 .../namesrv/BrokerHeartbeatRequestHeader.java      |   2 +-
 24 files changed, 436 insertions(+), 343 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java
index cd122c83a..09dd36d0c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java
@@ -16,27 +16,35 @@
  */
 package org.apache.rocketmq.common;
 
+import java.util.Objects;
+
 public class BrokerAddrInfo {
     private final String clusterName;
-    private final String brokerAddr;
 
-    private int hash;
+    private final String brokerName;
+
+    private final Long brokerId;
 
-    public BrokerAddrInfo(String clusterName, String brokerAddr) {
+    public BrokerAddrInfo(String clusterName, String brokerName, Long 
brokerId) {
         this.clusterName = clusterName;
-        this.brokerAddr = brokerAddr;
+        this.brokerName = brokerName;
+        this.brokerId = brokerId;
     }
 
     public String getClusterName() {
         return clusterName;
     }
 
-    public String getBrokerAddr() {
-        return brokerAddr;
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
     }
 
     public boolean isEmpty() {
-        return clusterName.isEmpty() && brokerAddr.isEmpty();
+        return clusterName.isEmpty() && brokerName.isEmpty() && brokerId == 
null;
     }
 
     @Override
@@ -50,29 +58,22 @@ public class BrokerAddrInfo {
 
         if (obj instanceof BrokerAddrInfo) {
             BrokerAddrInfo addr = (BrokerAddrInfo) obj;
-            return clusterName.equals(addr.clusterName) && 
brokerAddr.equals(addr.brokerAddr);
+            return clusterName.equals(addr.clusterName) && 
brokerName.equals(addr.brokerName) && brokerId == addr.brokerId;
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        int h = hash;
-        if (h == 0 && clusterName.length() + brokerAddr.length() > 0) {
-            for (int i = 0; i < clusterName.length(); i++) {
-                h = 31 * h + clusterName.charAt(i);
-            }
-            h = 31 * h + '_';
-            for (int i = 0; i < brokerAddr.length(); i++) {
-                h = 31 * h + brokerAddr.charAt(i);
-            }
-            hash = h;
-        }
-        return h;
+        return Objects.hash(this.clusterName, this.brokerName, this.brokerId);
     }
 
     @Override
     public String toString() {
-        return "BrokerAddrInfo [clusterName=" + clusterName + ", brokerAddr=" 
+ brokerAddr + "]";
+        return "BrokerAddrInfo{" +
+                "clusterName='" + clusterName + '\'' +
+                ", brokerName='" + brokerName + '\'' +
+                ", brokerId=" + brokerId +
+                '}';
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 7d9b78e8c..81e3cf31c 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -50,12 +50,12 @@ public interface BrokerHeartbeatManager {
     /**
      * Get broker live information by clusterName and brokerAddr
      */
-    BrokerLiveInfo getBrokerLiveInfo(String clusterName, String brokerAddr);
+    BrokerLiveInfo getBrokerLiveInfo(String clusterName, String brokerName, 
Long brokerId);
 
     /**
      * Check whether broker active
      */
-    boolean isBrokerActive(final String clusterName, final String brokerAddr);
+    boolean isBrokerActive(final String clusterName, final String brokerName, 
final Long brokerId);
 
     interface BrokerLifecycleListener {
         /**
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
index eb33b98a6..9fdd6c937 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
@@ -21,9 +21,9 @@ import io.netty.channel.Channel;
 public class BrokerLiveInfo {
     private final String brokerName;
 
-    private final String brokerAddr;
+    private String brokerAddr;
     private long heartbeatTimeoutMillis;
-    private final Channel channel;
+    private Channel channel;
     private long brokerId;
     private long lastUpdateTimestamp;
     private int epoch;
@@ -141,4 +141,11 @@ public class BrokerLiveInfo {
         return confirmOffset;
     }
 
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
index aba8f5538..8e4e75a2c 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
@@ -24,13 +24,14 @@ public interface ElectPolicy {
     /**
      * elect a master
      *
-     * @param clusterName       the brokerGroup belongs
+     * @param clusterName       the broker group belongs to
+     * @param brokerName        the broker group name
      * @param syncStateBrokers  all broker replicas in syncStateSet
      * @param allReplicaBrokers all broker replicas
      * @param oldMaster         old master
-     * @param brokerAddr  broker address(can be used as prefer or assigned in 
some elect policy)
-     * @return new master's brokerAddr
+     * @param brokerId          broker id(can be used as prefer or assigned in 
some elect policy)
+     * @return new master's broker id
      */
-    String elect(String clusterName, Set<String> syncStateBrokers, Set<String> 
allReplicaBrokers, String oldMaster, String brokerAddr);
+    Long elect(String clusterName, String brokerName, Set<Long> 
syncStateBrokers, Set<Long> allReplicaBrokers, Long oldMaster, Long brokerId);
 
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
index 00cac1627..e7423675a 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.controller.elect.impl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.BrokerLiveInfo;
+import org.apache.rocketmq.controller.helper.BrokerLiveInfoGetter;
+import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
 
 import java.util.Comparator;
 import java.util.Set;
@@ -29,12 +31,12 @@ import java.util.stream.Collectors;
 
 public class DefaultElectPolicy implements ElectPolicy {
 
-    // <clusterName, brokerAddr>, Used to judge whether a broker
+    // <clusterName, brokerName, brokerAddr>, Used to judge whether a broker
     // has preliminary qualification to be selected as master
-    private BiPredicate<String, String> validPredicate;
+    private BrokerValidPredicate validPredicate;
 
-    // <clusterName, brokerAddr, BrokerLiveInfo>, Used to obtain the 
BrokerLiveInfo information of a broker
-    private BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter;
+    // <clusterName, brokerName, brokerAddr, BrokerLiveInfo>, Used to obtain 
the BrokerLiveInfo information of a broker
+    private BrokerLiveInfoGetter brokerLiveInfoGetter;
 
     // Sort in descending order according to<epoch, offset>, and sort in 
ascending order according to priority
     private final Comparator<BrokerLiveInfo> comparator = (o1, o2) -> {
@@ -46,9 +48,9 @@ public class DefaultElectPolicy implements ElectPolicy {
         }
     };
 
-    public DefaultElectPolicy(BiPredicate<String, String> validPredicate, 
BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter) {
+    public DefaultElectPolicy(BrokerValidPredicate validPredicate, 
BrokerLiveInfoGetter brokerLiveInfoGetter) {
         this.validPredicate = validPredicate;
-        this.additionalInfoGetter = additionalInfoGetter;
+        this.brokerLiveInfoGetter = brokerLiveInfoGetter;
     }
 
     public DefaultElectPolicy() {
@@ -66,50 +68,50 @@ public class DefaultElectPolicy implements ElectPolicy {
      * @param clusterName       the brokerGroup belongs
      * @param syncStateBrokers  all broker replicas in syncStateSet
      * @param allReplicaBrokers all broker replicas
-     * @param oldMaster         old master
-     * @param preferBrokerAddr  the broker prefer to be elected
+     * @param oldMaster         old master's broker id
+     * @param preferBrokerId    the broker id prefer to be elected
      * @return master elected by our own policy
      */
     @Override
-    public String elect(String clusterName, Set<String> syncStateBrokers, 
Set<String> allReplicaBrokers, String oldMaster, String preferBrokerAddr) {
-        String newMaster = null;
+    public Long elect(String clusterName, String brokerName, Set<Long> 
syncStateBrokers, Set<Long> allReplicaBrokers, Long oldMaster, Long 
preferBrokerId) {
+        Long newMaster = null;
         // try to elect in syncStateBrokers
         if (syncStateBrokers != null) {
-            newMaster = tryElect(clusterName, syncStateBrokers, oldMaster, 
preferBrokerAddr);
+            newMaster = tryElect(clusterName, brokerName, syncStateBrokers, 
oldMaster, preferBrokerId);
         }
-        if (StringUtils.isNotEmpty(newMaster)) {
+        if (newMaster != null) {
             return newMaster;
         }
 
         // try to elect in all allReplicaBrokers
         if (allReplicaBrokers != null) {
-            newMaster = tryElect(clusterName, allReplicaBrokers, oldMaster, 
preferBrokerAddr);
+            newMaster = tryElect(clusterName, brokerName, allReplicaBrokers, 
oldMaster, preferBrokerId);
         }
         return newMaster;
     }
 
 
-    private String tryElect(String clusterName, Set<String> brokers, String 
oldMaster, String preferBrokerAddr) {
+    private Long tryElect(String clusterName, String brokerName, Set<Long> 
brokers, Long oldMaster, Long preferBrokerId) {
         if (this.validPredicate != null) {
-            brokers = brokers.stream().filter(brokerAddr -> 
this.validPredicate.test(clusterName, brokerAddr)).collect(Collectors.toSet());
+            brokers = brokers.stream().filter(brokerAddr -> 
this.validPredicate.check(clusterName, brokerName, 
brokerAddr)).collect(Collectors.toSet());
         }
         if (!brokers.isEmpty()) {
             // if old master is still valid, and preferBrokerAddr is blank or 
is equals to oldMaster
-            if (brokers.contains(oldMaster) && 
(StringUtils.isBlank(preferBrokerAddr) || preferBrokerAddr.equals(oldMaster))) {
+            if (brokers.contains(oldMaster) && (preferBrokerId == null || 
preferBrokerId == oldMaster)) {
                 return oldMaster;
             }
 
             // if preferBrokerAddr is valid, we choose it, otherwise we choose 
nothing
-            if (StringUtils.isNotBlank(preferBrokerAddr)) {
-                return brokers.contains(preferBrokerAddr) ? preferBrokerAddr : 
null;
+            if (preferBrokerId != null) {
+                return brokers.contains(preferBrokerId) ? preferBrokerId : 
null;
             }
 
-            if (this.additionalInfoGetter != null) {
+            if (this.brokerLiveInfoGetter != null) {
                 // sort brokerLiveInfos by (epoch,maxOffset)
                 TreeSet<BrokerLiveInfo> brokerLiveInfos = new 
TreeSet<>(this.comparator);
-                brokers.forEach(brokerAddr -> 
brokerLiveInfos.add(this.additionalInfoGetter.apply(clusterName, brokerAddr)));
+                brokers.forEach(brokerAddr -> 
brokerLiveInfos.add(this.brokerLiveInfoGetter.get(clusterName, brokerName, 
brokerAddr)));
                 if (brokerLiveInfos.size() >= 1) {
-                    return brokerLiveInfos.first().getBrokerAddr();
+                    return brokerLiveInfos.first().getBrokerId();
                 }
             }
             // elect random
@@ -123,15 +125,15 @@ public class DefaultElectPolicy implements ElectPolicy {
         return additionalInfoGetter;
     }
 
-    public void setAdditionalInfoGetter(BiFunction<String, String, 
BrokerLiveInfo> additionalInfoGetter) {
-        this.additionalInfoGetter = additionalInfoGetter;
+    public void setBrokerLiveInfoGetter(BrokerLiveInfoGetter 
brokerLiveInfoGetter) {
+        this.brokerLiveInfoGetter = brokerLiveInfoGetter;
     }
 
-    public BiPredicate<String, String> getValidPredicate() {
-        return validPredicate;
+    public void setValidPredicate(BrokerValidPredicate validPredicate) {
+        this.validPredicate = validPredicate;
     }
 
-    public void setValidPredicate(BiPredicate<String, String> validPredicate) {
-        this.validPredicate = validPredicate;
+    public BrokerLiveInfoGetter getBrokerLiveInfoGetter() {
+        return brokerLiveInfoGetter;
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLiveInfoGetter.java
similarity index 56%
copy from 
controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
copy to 
controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLiveInfoGetter.java
index aba8f5538..4a302cdbd 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLiveInfoGetter.java
@@ -14,23 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.controller.elect;
 
+package org.apache.rocketmq.controller.helper;
 
-import java.util.Set;
+import org.apache.rocketmq.controller.BrokerLiveInfo;
 
-public interface ElectPolicy {
+public interface BrokerLiveInfoGetter {
 
-    /**
-     * elect a master
-     *
-     * @param clusterName       the brokerGroup belongs
-     * @param syncStateBrokers  all broker replicas in syncStateSet
-     * @param allReplicaBrokers all broker replicas
-     * @param oldMaster         old master
-     * @param brokerAddr  broker address(can be used as prefer or assigned in 
some elect policy)
-     * @return new master's brokerAddr
-     */
-    String elect(String clusterName, Set<String> syncStateBrokers, Set<String> 
allReplicaBrokers, String oldMaster, String brokerAddr);
+    BrokerLiveInfo get(String clusterName, String brokerName, Long brokerId);
 
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerValidPredicate.java
similarity index 55%
copy from 
controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
copy to 
controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerValidPredicate.java
index aba8f5538..d8c6a2f65 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerValidPredicate.java
@@ -14,23 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.controller.elect;
+package org.apache.rocketmq.controller.helper;
 
+public interface BrokerValidPredicate {
 
-import java.util.Set;
-
-public interface ElectPolicy {
-
-    /**
-     * elect a master
-     *
-     * @param clusterName       the brokerGroup belongs
-     * @param syncStateBrokers  all broker replicas in syncStateSet
-     * @param allReplicaBrokers all broker replicas
-     * @param oldMaster         old master
-     * @param brokerAddr  broker address(can be used as prefer or assigned in 
some elect policy)
-     * @return new master's brokerAddr
-     */
-    String elect(String clusterName, Set<String> syncStateBrokers, Set<String> 
allReplicaBrokers, String oldMaster, String brokerAddr);
-
+    boolean check(String clusterName, String brokerName, Long brokerId);
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 2a5610c56..3045da85e 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -99,9 +99,10 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
         this.brokerLifecycleListeners.add(listener);
     }
 
-    @Override public void onBrokerHeartbeat(String clusterName, String 
brokerName, String brokerAddr, Long brokerId,
+    @Override
+    public void onBrokerHeartbeat(String clusterName, String brokerName, 
String brokerAddr, Long brokerId,
         Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, 
Long confirmOffset, Integer electionPriority) {
-        BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
+        BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerName, 
brokerId);
         BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
         int realEpoch = Optional.ofNullable(epoch).orElse(-1);
         long realBrokerId = Optional.ofNullable(brokerId).orElse(-1L);
@@ -126,6 +127,8 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
             prev.setHeartbeatTimeoutMillis(realTimeoutMillis);
             prev.setElectionPriority(realElectionPriority);
             prev.setBrokerId(realBrokerId);
+            prev.setBrokerAddr(brokerAddr);
+            prev.setChannel(channel);
             if (realEpoch > prev.getEpoch() || realEpoch == prev.getEpoch() && 
realMaxOffset > prev.getMaxOffset()) {
                 prev.setEpoch(realEpoch);
                 prev.setMaxOffset(realMaxOffset);
@@ -153,13 +156,13 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
     }
 
     @Override
-    public BrokerLiveInfo getBrokerLiveInfo(String clusterName, String 
brokerAddr) {
-        return this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, 
brokerAddr));
+    public BrokerLiveInfo getBrokerLiveInfo(String clusterName, String 
brokerName, Long brokerId) {
+        return this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, 
brokerName, brokerId));
     }
 
     @Override
-    public boolean isBrokerActive(String clusterName, String brokerAddr) {
-        final BrokerLiveInfo info = this.brokerLiveTable.get(new 
BrokerAddrInfo(clusterName, brokerAddr));
+    public boolean isBrokerActive(String clusterName, String brokerName, Long 
brokerId) {
+        final BrokerLiveInfo info = this.brokerLiveTable.get(new 
BrokerAddrInfo(clusterName, brokerName, brokerId));
         if (info != null) {
             long last = info.getLastUpdateTimestamp();
             long timeoutMillis = info.getHeartbeatTimeoutMillis();
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
index 2342e0e99..6af44b722 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
@@ -26,9 +26,9 @@ import java.util.Set;
 public class AlterSyncStateSetEvent implements EventMessage {
 
     private final String brokerName;
-    private final Set<String/*Address*/> newSyncStateSet;
+    private final Set<Long/*BrokerId*/> newSyncStateSet;
 
-    public AlterSyncStateSetEvent(String brokerName, Set<String> 
newSyncStateSet) {
+    public AlterSyncStateSetEvent(String brokerName, Set<Long> 
newSyncStateSet) {
         this.brokerName = brokerName;
         this.newSyncStateSet = new HashSet<>(newSyncStateSet);
     }
@@ -42,7 +42,7 @@ public class AlterSyncStateSetEvent implements EventMessage {
         return brokerName;
     }
 
-    public Set<String> getNewSyncStateSet() {
+    public Set<Long> getNewSyncStateSet() {
         return new HashSet<>(newSyncStateSet);
     }
 
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
index c4934d7c0..a0bf001c4 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
@@ -24,13 +24,17 @@ public class ApplyBrokerIdEvent implements EventMessage {
     private final String clusterName;
     private final String brokerName;
     private final String brokerAddress;
+
+    private final String registerCheckCode;
+
     private final long newBrokerId;
 
-    public ApplyBrokerIdEvent(String clusterName, String brokerName, String 
brokerAddress, long newBrokerId) {
+    public ApplyBrokerIdEvent(String clusterName, String brokerName, String 
brokerAddress, long newBrokerId, String registerCheckCode) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
         this.brokerAddress = brokerAddress;
         this.newBrokerId = newBrokerId;
+        this.registerCheckCode = registerCheckCode;
     }
 
     @Override
@@ -54,13 +58,18 @@ public class ApplyBrokerIdEvent implements EventMessage {
         return clusterName;
     }
 
+    public String getRegisterCheckCode() {
+        return registerCheckCode;
+    }
+
     @Override
     public String toString() {
         return "ApplyBrokerIdEvent{" +
-            "clusterName='" + clusterName + '\'' +
-            ", brokerName='" + brokerName + '\'' +
-            ", brokerAddress='" + brokerAddress + '\'' +
-            ", newBrokerId=" + newBrokerId +
-            '}';
+                "clusterName='" + clusterName + '\'' +
+                ", brokerName='" + brokerName + '\'' +
+                ", brokerAddress='" + brokerAddress + '\'' +
+                ", registerCheckCode='" + registerCheckCode + '\'' +
+                ", newBrokerId=" + newBrokerId +
+                '}';
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
index 4678f90c4..e639e27e4 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
@@ -23,11 +23,11 @@ public class CleanBrokerDataEvent implements EventMessage {
 
     private String brokerName;
 
-    private Set<String> brokerAddressSet;
+    private Set<Long> brokerIdSetToClean;
 
-    public CleanBrokerDataEvent(String brokerName, Set<String> 
brokerAddressSet) {
+    public CleanBrokerDataEvent(String brokerName, Set<Long> 
brokerIdSetToClean) {
         this.brokerName = brokerName;
-        this.brokerAddressSet = brokerAddressSet;
+        this.brokerIdSetToClean = brokerIdSetToClean;
     }
 
     public String getBrokerName() {
@@ -38,12 +38,12 @@ public class CleanBrokerDataEvent implements EventMessage {
         this.brokerName = brokerName;
     }
 
-    public Set<String> getBrokerAddressSet() {
-        return brokerAddressSet;
+    public void setBrokerIdSetToClean(Set<Long> brokerIdSetToClean) {
+        this.brokerIdSetToClean = brokerIdSetToClean;
     }
 
-    public void setBrokerAddressSet(Set<String> brokerAddressSet) {
-        this.brokerAddressSet = brokerAddressSet;
+    public Set<Long> getBrokerIdSetToClean() {
+        return brokerIdSetToClean;
     }
 
     /**
@@ -57,8 +57,8 @@ public class CleanBrokerDataEvent implements EventMessage {
     @Override
     public String toString() {
         return "CleanBrokerDataEvent{" +
-            "brokerName='" + brokerName + '\'' +
-            ", brokerAddressSet=" + brokerAddressSet +
-            '}';
+                "brokerName='" + brokerName + '\'' +
+                ", brokerIdSetToClean=" + brokerIdSetToClean +
+                '}';
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
index 970b5d8cd..71a56bdce 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
@@ -24,20 +24,20 @@ public class ElectMasterEvent implements EventMessage {
     // Mark whether a new master was elected.
     private final boolean newMasterElected;
     private final String brokerName;
-    private final String newMasterAddress;
+    private final Long newMasterBrokerId;
 
     public ElectMasterEvent(boolean newMasterElected, String brokerName) {
-        this(newMasterElected, brokerName, "");
+        this(newMasterElected, brokerName, null);
     }
 
-    public ElectMasterEvent(String brokerName, String newMasterAddress) {
-        this(true, brokerName, newMasterAddress);
+    public ElectMasterEvent(String brokerName, Long newMasterBrokerId) {
+        this(true, brokerName, newMasterBrokerId);
     }
 
-    public ElectMasterEvent(boolean newMasterElected, String brokerName, 
String newMasterAddress) {
+    public ElectMasterEvent(boolean newMasterElected, String brokerName, Long 
newMasterBrokerId) {
         this.newMasterElected = newMasterElected;
         this.brokerName = brokerName;
-        this.newMasterAddress = newMasterAddress;
+        this.newMasterBrokerId = newMasterBrokerId;
     }
 
     @Override
@@ -53,16 +53,16 @@ public class ElectMasterEvent implements EventMessage {
         return brokerName;
     }
 
-    public String getNewMasterAddress() {
-        return newMasterAddress;
+    public Long getNewMasterBrokerId() {
+        return newMasterBrokerId;
     }
 
     @Override
     public String toString() {
         return "ElectMasterEvent{" +
-            "newMasterElected=" + newMasterElected +
-            ", brokerName='" + brokerName + '\'' +
-            ", newMasterAddress='" + newMasterAddress + '\'' +
-            '}';
+                "newMasterElected=" + newMasterElected +
+                ", brokerName='" + brokerName + '\'' +
+                ", newMasterBrokerId=" + newMasterBrokerId +
+                '}';
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
index e2a68a544..bc60c8b54 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
@@ -21,26 +21,30 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
 
 /**
  * Broker replicas info, mapping from brokerAddress to {brokerId, 
brokerHaAddress}.
  */
 public class BrokerReplicaInfo {
     private final String clusterName;
+
     private final String brokerName;
+
     // Start from 1
     private final AtomicLong nextAssignBrokerId;
-    private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable;
+
+    private final HashMap<Long/*brokerId*/, Pair<String/*ipAddress*/, 
String/*registerCheckCode*/>> brokerIdInfo;
 
     public BrokerReplicaInfo(String clusterName, String brokerName) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.brokerIdTable = new HashMap<>();
         this.nextAssignBrokerId = new AtomicLong(MixAll.FIRST_SLAVE_ID);
+        this.brokerIdInfo = new HashMap<>();
     }
 
-    public void removeBrokerAddress(final String address) {
-        this.brokerIdTable.remove(address);
+    public void removeBrokerId(final Long brokerId) {
+        this.brokerIdInfo.remove(brokerId);
     }
 
     public long newBrokerId() {
@@ -55,26 +59,30 @@ public class BrokerReplicaInfo {
         return brokerName;
     }
 
-    public void addBroker(final String address, final Long brokerId) {
-        this.brokerIdTable.put(address, brokerId);
+    public void addBroker(final Long brokerId, final String ipAddress, final 
String registerCheckCode) {
+        this.brokerIdInfo.put(brokerId, new Pair<>(ipAddress, 
registerCheckCode));
     }
 
-    public boolean isBrokerExist(final String address) {
-        return this.brokerIdTable.containsKey(address);
+    public boolean isBrokerExist(final Long brokerId) {
+        return this.brokerIdInfo.containsKey(brokerId);
     }
 
-    public Set<String> getAllBroker() {
-        return new HashSet<>(this.brokerIdTable.keySet());
+    public Set<Long> getAllBroker() {
+        return new HashSet<>(this.brokerIdInfo.keySet());
     }
 
-    public HashMap<String, Long> getBrokerIdTable() {
-        return new HashMap<>(this.brokerIdTable);
+    public HashMap<Long, String> getBrokerIdTable() {
+        HashMap<Long/*brokerId*/, String/*address*/> map = new 
HashMap<>(this.brokerIdInfo.size());
+        this.brokerIdInfo.forEach((id, pair) -> {
+            map.put(id, pair.getObject1());
+        });
+        return map;
     }
 
-    public Long getBrokerId(final String address) {
-        if (this.brokerIdTable.containsKey(address)) {
-            return this.brokerIdTable.get(address);
+    public String getBrokerAddress(final Long brokerId) {
+        if (this.brokerIdInfo.containsKey(brokerId)) {
+            return this.brokerIdInfo.get(brokerId).getObject1();
         }
-        return -1L;
+        return null;
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index dc0339d0c..c915131bf 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -25,11 +25,13 @@ import java.util.Set;
 import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
+import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
 import org.apache.rocketmq.controller.impl.event.AlterSyncStateSetEvent;
 import org.apache.rocketmq.controller.impl.event.ApplyBrokerIdEvent;
 import org.apache.rocketmq.controller.impl.event.CleanBrokerDataEvent;
@@ -71,92 +73,92 @@ public class ReplicasInfoManager {
     }
 
     public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
-        final AlterSyncStateSetRequestHeader request, final SyncStateSet 
syncStateSet,
-        final BiPredicate<String, String> brokerAlivePredicate) {
+            final AlterSyncStateSetRequestHeader request, final SyncStateSet 
syncStateSet,
+            final BrokerValidPredicate brokerAlivePredicate) {
         final String brokerName = request.getBrokerName();
         final ControllerResult<AlterSyncStateSetResponseHeader> result = new 
ControllerResult<>(new AlterSyncStateSetResponseHeader());
         final AlterSyncStateSetResponseHeader response = result.getResponse();
 
-        if (isContainsBroker(brokerName)) {
-            final Set<String> newSyncStateSet = syncStateSet.getSyncStateSet();
-            final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
-            final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
+        if (!isContainsBroker(brokerName)) {
+            
result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, 
"Broker metadata is not existed");
+            return result;
+        }
+        final Set<Long> newSyncStateSet = syncStateSet.getSyncStateSet();
+        final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
+        final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
 
-            // Check whether the oldSyncStateSet is equal with newSyncStateSet
-            final Set<String> oldSyncStateSet = 
syncStateInfo.getSyncStateSet();
-            if (oldSyncStateSet.size() == newSyncStateSet.size() && 
oldSyncStateSet.containsAll(newSyncStateSet)) {
-                String err = "The newSyncStateSet is equal with 
oldSyncStateSet, no needed to update syncStateSet";
-                LOGGER.warn("{}", err);
-                
result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, 
err);
-                return result;
-            }
+        // Check whether the oldSyncStateSet is equal with newSyncStateSet
+        final Set<Long> oldSyncStateSet = syncStateInfo.getSyncStateSet();
+        if (oldSyncStateSet.size() == newSyncStateSet.size() && 
oldSyncStateSet.containsAll(newSyncStateSet)) {
+            String err = "The newSyncStateSet is equal with oldSyncStateSet, 
no needed to update syncStateSet";
+            LOGGER.warn("{}", err);
+            
result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, 
err);
+            return result;
+        }
 
-            // Check master
-            if 
(!syncStateInfo.getMasterAddress().equals(request.getMasterAddress())) {
-                String err = String.format("Rejecting alter syncStateSet 
request because the current leader is:{%s}, not {%s}",
-                    syncStateInfo.getMasterAddress(), 
request.getMasterAddress());
-                LOGGER.error("{}", err);
-                
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_MASTER, err);
-                return result;
-            }
+        // Check master
+        if 
(!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) {
+            String err = String.format("Rejecting alter syncStateSet request 
because the current leader is:{%s}, not {%s}",
+                    syncStateInfo.getMasterBrokerId(), 
request.getMasterBrokerId());
+            LOGGER.error("{}", err);
+            result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_MASTER, 
err);
+            return result;
+        }
 
-            // Check master epoch
-            if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) {
-                String err = String.format("Rejecting alter syncStateSet 
request because the current master epoch is:{%d}, not {%d}",
+        // Check master epoch
+        if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) {
+            String err = String.format("Rejecting alter syncStateSet request 
because the current master epoch is:{%d}, not {%d}",
                     syncStateInfo.getMasterEpoch(), request.getMasterEpoch());
-                LOGGER.error("{}", err);
-                
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH, err);
-                return result;
-            }
+            LOGGER.error("{}", err);
+            
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH, err);
+            return result;
+        }
 
-            // Check syncStateSet epoch
-            if (syncStateSet.getSyncStateSetEpoch() != 
syncStateInfo.getSyncStateSetEpoch()) {
-                String err = String.format("Rejecting alter syncStateSet 
request because the current syncStateSet epoch is:{%d}, not {%d}",
+        // Check syncStateSet epoch
+        if (syncStateSet.getSyncStateSetEpoch() != 
syncStateInfo.getSyncStateSetEpoch()) {
+            String err = String.format("Rejecting alter syncStateSet request 
because the current syncStateSet epoch is:{%d}, not {%d}",
                     syncStateInfo.getSyncStateSetEpoch(), 
syncStateSet.getSyncStateSetEpoch());
+            LOGGER.error("{}", err);
+            
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH, 
err);
+            return result;
+        }
+
+        // Check newSyncStateSet correctness
+        for (Long replica : newSyncStateSet) {
+            if (!brokerReplicaInfo.isBrokerExist(replica)) {
+                String err = String.format("Rejecting alter syncStateSet 
request because the replicas {%s} don't exist", replica);
                 LOGGER.error("{}", err);
-                
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH, 
err);
+                
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REPLICAS, err);
                 return result;
             }
-
-            // Check newSyncStateSet correctness
-            for (String replicas : newSyncStateSet) {
-                if (!brokerReplicaInfo.isBrokerExist(replicas)) {
-                    String err = String.format("Rejecting alter syncStateSet 
request because the replicas {%s} don't exist", replicas);
-                    LOGGER.error("{}", err);
-                    
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REPLICAS, err);
-                    return result;
-                }
-                if 
(!brokerAlivePredicate.test(brokerReplicaInfo.getClusterName(), replicas)) {
-                    String err = String.format("Rejecting alter syncStateSet 
request because the replicas {%s} don't alive", replicas);
-                    LOGGER.error(err);
-                    
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NOT_ALIVE, err);
-                    return result;
-                }
-            }
-
-            if (!newSyncStateSet.contains(syncStateInfo.getMasterAddress())) {
-                String err = String.format("Rejecting alter syncStateSet 
request because the newSyncStateSet don't contains origin leader {%s}", 
syncStateInfo.getMasterAddress());
+            if 
(!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), 
brokerReplicaInfo.getBrokerName(), replica)) {
+                String err = String.format("Rejecting alter syncStateSet 
request because the replicas {%s} don't alive", replica);
                 LOGGER.error(err);
-                
result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, 
err);
+                
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NOT_ALIVE, err);
                 return result;
             }
+        }
 
-            // Generate event
-            int epoch = syncStateInfo.getSyncStateSetEpoch() + 1;
-            response.setNewSyncStateSetEpoch(epoch);
-            result.setBody(new SyncStateSet(newSyncStateSet, epoch).encode());
-            final AlterSyncStateSetEvent event = new 
AlterSyncStateSetEvent(brokerName, newSyncStateSet);
-            result.addEvent(event);
+        if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) {
+            String err = String.format("Rejecting alter syncStateSet request 
because the newSyncStateSet don't contains origin leader {%s}", 
syncStateInfo.getMasterBrokerId());
+            LOGGER.error(err);
+            
result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, 
err);
             return result;
         }
-        
result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, 
"Broker metadata is not existed");
+
+        // Generate event
+        int epoch = syncStateInfo.getSyncStateSetEpoch() + 1;
+        response.setNewSyncStateSetEpoch(epoch);
+        result.setBody(new SyncStateSet(newSyncStateSet, epoch).encode());
+        final AlterSyncStateSetEvent event = new 
AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+        result.addEvent(event);
         return result;
     }
 
     public ControllerResult<ElectMasterResponseHeader> electMaster(final 
ElectMasterRequestHeader request,
-        final ElectPolicy electPolicy) {
+                                                                   final 
ElectPolicy electPolicy) {
         final String brokerName = request.getBrokerName();
-        final String brokerAddress = request.getBrokerAddress();
+        final Long brokerId = request.getBrokerId();
         final ControllerResult<ElectMasterResponseHeader> result = new 
ControllerResult<>(new ElectMasterResponseHeader());
         final ElectMasterResponseHeader response = result.getResponse();
         if (!isContainsBroker(brokerName)) {
@@ -168,45 +170,45 @@ public class ReplicasInfoManager {
 
         final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
         final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
-        final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
-        final String oldMaster = syncStateInfo.getMasterAddress();
-        Set<String> allReplicaBrokers = 
controllerConfig.isEnableElectUncleanMaster() ? 
brokerReplicaInfo.getAllBroker() : null;
-        String newMaster = null;
+        final Set<Long> syncStateSet = syncStateInfo.getSyncStateSet();
+        final Long oldMaster = syncStateInfo.getMasterBrokerId();
+        Set<Long> allReplicaBrokers = 
controllerConfig.isEnableElectUncleanMaster() ? 
brokerReplicaInfo.getAllBroker() : null;
+        Long newMaster = null;
 
         if (syncStateInfo.isFirstTimeForElect()) {
             // If never have a master in this broker set, in other words, it 
is the first time to elect a master
             // elect it as the first master
-            newMaster = brokerAddress;
+            newMaster = brokerId;
         }
 
         // elect by policy
         if (newMaster == null) {
             // we should assign this assignedBrokerAddr when the brokerAddress 
need to be elected by force
-            String assignedBrokerAddr = request.isForceElect() ? brokerAddress 
: null;
-            newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), 
syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerAddr);
+            Long assignedBrokerId = request.isForceElect() ? brokerId : null;
+            newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), 
brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, 
assignedBrokerId);
         }
 
-        if (StringUtils.isNotEmpty(newMaster) && newMaster.equals(oldMaster)) {
+        if (newMaster != null && newMaster.equals(oldMaster)) {
             // old master still valid, change nothing
             String err = String.format("The old master %s is still alive, not 
need to elect new master for broker %s", oldMaster, 
brokerReplicaInfo.getBrokerName());
             LOGGER.warn("{}", err);
             // the master still exist
             response.setMasterEpoch(syncStateInfo.getMasterEpoch());
             
response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
-            response.setMasterAddress(syncStateInfo.getMasterAddress());
-            
response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
+            response.setMasterBrokerId(oldMaster);
+            
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(oldMaster));
             
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err);
             return result;
         }
 
         // a new master is elected
-        if (StringUtils.isNotEmpty(newMaster)) {
+        if (newMaster != null) {
             final int masterEpoch = syncStateInfo.getMasterEpoch();
             final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch();
-            response.setMasterAddress(newMaster);
+            response.setMasterBrokerId(newMaster);
+            
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(newMaster));
             response.setMasterEpoch(masterEpoch + 1);
             response.setSyncStateSetEpoch(syncStateSetEpoch + 1);
-            
response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
             BrokerMemberGroup brokerMemberGroup = 
buildBrokerMemberGroup(brokerName);
             if (null != brokerMemberGroup) {
                 response.setBrokerMemberGroup(brokerMemberGroup);
@@ -219,7 +221,7 @@ public class ReplicasInfoManager {
         // If elect failed and the electMaster is triggered by controller (we 
can figure it out by brokerAddress),
         // we still need to apply an ElectMasterEvent to tell the statemachine
         // that the master was shutdown and no new master was elected.
-        if (request.getBrokerAddress() == null) {
+        if (request.getBrokerId() == null) {
             final ElectMasterEvent event = new ElectMasterEvent(false, 
brokerName);
             result.addEvent(event);
             
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old 
master has down and failed to elect a new broker master");
@@ -234,9 +236,9 @@ public class ReplicasInfoManager {
         if (isContainsBroker(brokerName)) {
             final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
             final BrokerMemberGroup group = new 
BrokerMemberGroup(brokerReplicaInfo.getClusterName(), brokerName);
-            final HashMap<String, Long> brokerIdTable = 
brokerReplicaInfo.getBrokerIdTable();
+            final HashMap<Long, String> brokerIdTable = 
brokerReplicaInfo.getBrokerIdTable();
             final HashMap<Long, String> memberGroup = new HashMap<>();
-            brokerIdTable.forEach((addr, id) -> memberGroup.put(id, addr));
+            brokerIdTable.forEach((id, addr) -> memberGroup.put(id, addr));
             group.setBrokerAddrs(memberGroup);
             return group;
         }
@@ -244,7 +246,7 @@ public class ReplicasInfoManager {
     }
 
     public ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerBroker(
-        final RegisterBrokerToControllerRequestHeader request, final 
BiPredicate<String, String> brokerAlivePredicate) {
+            final RegisterBrokerToControllerRequestHeader request, final 
BiPredicate<String, String> brokerAlivePredicate) {
         String brokerAddress = request.getBrokerAddress();
         final String brokerName = request.getBrokerName();
         final String clusterName = request.getClusterName();
@@ -295,12 +297,10 @@ public class ReplicasInfoManager {
             // If exist broker metadata, just return metadata
             final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
             final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
-            final String masterAddress = syncStateInfo.getMasterAddress();
-            response.setMasterAddress(masterAddress);
+            final Long masterBrokerId = syncStateInfo.getMasterBrokerId();
+            response.setMasterBrokerId(masterBrokerId);
+            
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(masterBrokerId));
             response.setMasterEpoch(syncStateInfo.getMasterEpoch());
-            if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
-                
response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
-            }
             result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), 
syncStateInfo.getSyncStateSetEpoch()).encode());
             return result;
         }
@@ -316,21 +316,20 @@ public class ReplicasInfoManager {
                 // If exist broker metadata, just return metadata
                 final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
                 final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
-                final Set<String> syncStateSet = 
syncStateInfo.getSyncStateSet();
-                final String master = syncStateInfo.getMasterAddress();
+                final Set<Long> syncStateSet = syncStateInfo.getSyncStateSet();
+                final Long masterBrokerId = syncStateInfo.getMasterBrokerId();
                 final ArrayList<BrokerReplicasInfo.ReplicaIdentity> 
inSyncReplicas = new ArrayList<>();
                 final ArrayList<BrokerReplicasInfo.ReplicaIdentity> 
notInSyncReplicas = new ArrayList<>();
 
-                brokerReplicaInfo.getBrokerIdTable().forEach((brokerAddress, 
brokerId) -> {
-                    if (syncStateSet.contains(brokerAddress)) {
-                        long id = StringUtils.equals(master, brokerAddress) ? 
MixAll.MASTER_ID : brokerReplicaInfo.getBrokerId(brokerAddress);
-                        inSyncReplicas.add(new 
BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id));
+                brokerReplicaInfo.getBrokerIdTable().forEach((brokerId, 
brokerAddress) -> {
+                    if (syncStateSet.contains(brokerId)) {
+                        inSyncReplicas.add(new 
BrokerReplicasInfo.ReplicaIdentity(brokerName, brokerId, brokerAddress));
                     } else {
-                        notInSyncReplicas.add(new 
BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId));
+                        notInSyncReplicas.add(new 
BrokerReplicasInfo.ReplicaIdentity(brokerName, brokerId, brokerAddress));
                     }
                 });
 
-                final BrokerReplicasInfo.ReplicasInfo inSyncState = new 
BrokerReplicasInfo.ReplicasInfo(master, syncStateInfo.getMasterEpoch(), 
syncStateInfo.getSyncStateSetEpoch(), inSyncReplicas, notInSyncReplicas);
+                final BrokerReplicasInfo.ReplicasInfo inSyncState = new 
BrokerReplicasInfo.ReplicasInfo(masterBrokerId, 
brokerReplicaInfo.getBrokerAddress(masterBrokerId), 
syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), 
inSyncReplicas, notInSyncReplicas);
                 brokerReplicasInfo.addReplicaInfo(brokerName, inSyncState);
             }
         }
@@ -339,27 +338,27 @@ public class ReplicasInfoManager {
     }
 
     public ControllerResult<Void> cleanBrokerData(final 
CleanControllerBrokerDataRequestHeader requestHeader,
-        final BiPredicate<String, String> brokerAlivePredicate) {
+                                                  final BrokerValidPredicate 
validPredicate) {
         final ControllerResult<Void> result = new ControllerResult<>();
 
         final String clusterName = requestHeader.getClusterName();
         final String brokerName = requestHeader.getBrokerName();
-        final String brokerAddrs = requestHeader.getBrokerAddress();
+        final String brokerIdSetToClean = 
requestHeader.getBrokerIdSetToClean();
 
-        Set<String> brokerAddressSet = null;
+        Set<Long> brokerIdSet = null;
         if (!requestHeader.isCleanLivingBroker()) {
             //if SyncStateInfo.masterAddress is not empty, at least one broker 
with the same BrokerName is alive
             SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
-            if (StringUtils.isBlank(brokerAddrs) && null != syncStateInfo && 
StringUtils.isNotEmpty(syncStateInfo.getMasterAddress())) {
+            if (StringUtils.isBlank(brokerIdSetToClean) && null != 
syncStateInfo && syncStateInfo.getMasterBrokerId() != null) {
                 String remark = String.format("Broker %s is still alive, clean 
up failure", requestHeader.getBrokerName());
                 
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
remark);
                 return result;
             }
-            if (StringUtils.isNotBlank(brokerAddrs)) {
-                brokerAddressSet = 
Stream.of(brokerAddrs.split(";")).collect(Collectors.toSet());
-                for (String brokerAddr : brokerAddressSet) {
-                    if (brokerAlivePredicate.test(clusterName, brokerAddr)) {
-                        String remark = String.format("Broker [%s,  %s] is 
still alive, clean up failure", requestHeader.getBrokerName(), brokerAddr);
+            if (StringUtils.isNotBlank(brokerIdSetToClean)) {
+                brokerIdSet = 
Stream.of(brokerIdSetToClean.split(";")).map(idStr -> 
Long.valueOf(idStr)).collect(Collectors.toSet());
+                for (Long brokerId : brokerIdSet) {
+                    if (validPredicate.check(clusterName, brokerName, 
brokerId)) {
+                        String remark = String.format("Broker [%s,  %s] is 
still alive, clean up failure", requestHeader.getBrokerName(), brokerId);
                         
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
remark);
                         return result;
                     }
@@ -367,7 +366,7 @@ public class ReplicasInfoManager {
             }
         }
         if (isContainsBroker(brokerName)) {
-            final CleanBrokerDataEvent event = new 
CleanBrokerDataEvent(brokerName, brokerAddressSet);
+            final CleanBrokerDataEvent event = new 
CleanBrokerDataEvent(brokerName, brokerIdSet);
             result.addEvent(event);
             return result;
         }
@@ -412,8 +411,8 @@ public class ReplicasInfoManager {
         final String brokerName = event.getBrokerName();
         if (isContainsBroker(brokerName)) {
             final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
-            if (!brokerReplicaInfo.isBrokerExist(event.getBrokerAddress())) {
-                brokerReplicaInfo.addBroker(event.getBrokerAddress(), 
event.getNewBrokerId());
+            if (!brokerReplicaInfo.isBrokerExist(event.getNewBrokerId())) {
+                brokerReplicaInfo.addBroker(event.getNewBrokerId(), 
event.getBrokerAddress(), event.getRegisterCheckCode());
             }
         } else {
             // First time to register in this broker set
@@ -421,7 +420,7 @@ public class ReplicasInfoManager {
             final String clusterName = event.getClusterName();
             final BrokerReplicaInfo brokerReplicaInfo = new 
BrokerReplicaInfo(clusterName, brokerName);
             long brokerId = brokerReplicaInfo.newBrokerId();
-            brokerReplicaInfo.addBroker(event.getBrokerAddress(), brokerId);
+            brokerReplicaInfo.addBroker(brokerId, event.getBrokerAddress(), 
event.getRegisterCheckCode());
             this.replicaInfoTable.put(brokerName, brokerReplicaInfo);
             final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, 
brokerName);
             // Initialize an empty syncStateInfo for this broker set
@@ -431,7 +430,7 @@ public class ReplicasInfoManager {
 
     private void handleElectMaster(final ElectMasterEvent event) {
         final String brokerName = event.getBrokerName();
-        final String newMaster = event.getNewMasterAddress();
+        final Long newMaster = event.getNewMasterBrokerId();
         if (isContainsBroker(brokerName)) {
             final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
 
@@ -440,13 +439,13 @@ public class ReplicasInfoManager {
                 syncStateInfo.updateMasterInfo(newMaster);
 
                 // Record new newSyncStateSet list
-                final HashSet<String> newSyncStateSet = new HashSet<>();
+                final HashSet<Long> newSyncStateSet = new HashSet<>();
                 newSyncStateSet.add(newMaster);
                 syncStateInfo.updateSyncStateSetInfo(newSyncStateSet);
             } else {
                 // If new master was not elected, which means old master was 
shutdown and the newSyncStateSet list had no more replicas
                 // So we should delete old master, but retain newSyncStateSet 
list.
-                syncStateInfo.updateMasterInfo("");
+                syncStateInfo.updateMasterInfo(null);
             }
             return;
         }
@@ -456,9 +455,9 @@ public class ReplicasInfoManager {
     private void handleCleanBrokerDataEvent(final CleanBrokerDataEvent event) {
 
         final String brokerName = event.getBrokerName();
-        final Set<String> brokerAddressSet = event.getBrokerAddressSet();
+        final Set<Long> brokerIdSetToClean = event.getBrokerIdSetToClean();
 
-        if (null == brokerAddressSet || brokerAddressSet.isEmpty()) {
+        if (null == brokerIdSetToClean || brokerIdSetToClean.isEmpty()) {
             this.replicaInfoTable.remove(brokerName);
             this.syncStateSetInfoTable.remove(brokerName);
             return;
@@ -468,9 +467,9 @@ public class ReplicasInfoManager {
         }
         final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
         final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
-        for (String brokerAddress : brokerAddressSet) {
-            brokerReplicaInfo.removeBrokerAddress(brokerAddress);
-            syncStateInfo.removeSyncState(brokerAddress);
+        for (Long brokerId : brokerIdSetToClean) {
+            brokerReplicaInfo.removeBrokerId(brokerId);
+            syncStateInfo.removeFromSyncState(brokerId);
         }
         if (brokerReplicaInfo.getBrokerIdTable().isEmpty()) {
             this.replicaInfoTable.remove(brokerName);
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 29570b5ea..0951df93a 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.controller.impl.manager;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
 
 /**
  * Manages the syncStateSet of broker replicas.
@@ -27,10 +26,11 @@ import org.apache.commons.lang3.StringUtils;
 public class SyncStateInfo {
     private final String clusterName;
     private final String brokerName;
-    private Set<String/*Address*/> syncStateSet;
+
+    private Set<Long/*brokerId*/> syncStateSet;
     private int syncStateSetEpoch;
 
-    private String masterAddress;
+    private Long masterBrokerId;
     private int masterEpoch;
 
     public SyncStateInfo(String clusterName, String brokerName) {
@@ -42,23 +42,23 @@ public class SyncStateInfo {
     }
 
 
-    public SyncStateInfo(String clusterName, String brokerName, String 
masterAddress) {
+    public SyncStateInfo(String clusterName, String brokerName, Long 
masterBrokerId) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.masterAddress = masterAddress;
+        this.masterBrokerId = masterBrokerId;
         this.masterEpoch = 1;
         this.syncStateSet = new HashSet<>();
-        this.syncStateSet.add(masterAddress);
+        this.syncStateSet.add(masterBrokerId);
         this.syncStateSetEpoch = 1;
     }
 
 
-    public void updateMasterInfo(String masterAddress) {
-        this.masterAddress = masterAddress;
+    public void updateMasterInfo(Long masterBrokerId) {
+        this.masterBrokerId = masterBrokerId;
         this.masterEpoch++;
     }
 
-    public void updateSyncStateSetInfo(Set<String> newSyncStateSet) {
+    public void updateSyncStateSetInfo(Set<Long> newSyncStateSet) {
         this.syncStateSet = new HashSet<>(newSyncStateSet);
         this.syncStateSetEpoch++;
     }
@@ -68,7 +68,7 @@ public class SyncStateInfo {
     }
 
     public boolean isMasterExist() {
-        return StringUtils.isNotEmpty(this.masterAddress);
+        return masterBrokerId != null;
     }
 
     public String getClusterName() {
@@ -79,7 +79,7 @@ public class SyncStateInfo {
         return brokerName;
     }
 
-    public Set<String> getSyncStateSet() {
+    public Set<Long> getSyncStateSet() {
         return new HashSet<>(syncStateSet);
     }
 
@@ -87,15 +87,15 @@ public class SyncStateInfo {
         return syncStateSetEpoch;
     }
 
-    public String getMasterAddress() {
-        return masterAddress;
+    public Long getMasterBrokerId() {
+        return masterBrokerId;
     }
 
     public int getMasterEpoch() {
         return masterEpoch;
     }
 
-    public void removeSyncState(final String address) {
-        syncStateSet.remove(address);
+    public void removeFromSyncState(final Long brokerId) {
+        syncStateSet.remove(brokerId);
     }
 }
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index fc36a8c8e..772cb7094 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.BrokerAddrInfo;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -1073,6 +1072,70 @@ public class RouteInfoManager {
     }
 }
 
+/**
+ * broker address information
+ */
+class BrokerAddrInfo {
+    private String clusterName;
+    private String brokerAddr;
+
+    private int hash;
+
+    public BrokerAddrInfo(String clusterName, String brokerAddr) {
+        this.clusterName = clusterName;
+        this.brokerAddr = brokerAddr;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public boolean isEmpty() {
+        return clusterName.isEmpty() && brokerAddr.isEmpty();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+
+        if (obj instanceof BrokerAddrInfo) {
+            BrokerAddrInfo addr = (BrokerAddrInfo) obj;
+            return clusterName.equals(addr.clusterName) && 
brokerAddr.equals(addr.brokerAddr);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        int h = hash;
+        if (h == 0 && clusterName.length() + brokerAddr.length() > 0) {
+            for (int i = 0; i < clusterName.length(); i++) {
+                h = 31 * h + clusterName.charAt(i);
+            }
+            h = 31 * h + '_';
+            for (int i = 0; i < brokerAddr.length(); i++) {
+                h = 31 * h + brokerAddr.charAt(i);
+            }
+            hash = h;
+        }
+        return h;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerAddrInfo [clusterName=" + clusterName + ", brokerAddr=" 
+ brokerAddr + "]";
+    }
+}
+
 class BrokerLiveInfo {
     private long lastUpdateTimestamp;
     private long heartbeatTimeoutMillis;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
index fece50d2e..c7e410b80 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
@@ -42,15 +42,19 @@ public class BrokerReplicasInfo extends 
RemotingSerializable  {
     }
 
     public static class ReplicasInfo extends RemotingSerializable {
+
+        private Long masterBrokerId;
+
         private String masterAddress;
         private int masterEpoch;
         private int syncStateSetEpoch;
         private List<ReplicaIdentity> inSyncReplicas;
         private List<ReplicaIdentity> notInSyncReplicas;
 
-        public ReplicasInfo(String masterAddress, int masterEpoch, int 
syncStateSetEpoch,
+        public ReplicasInfo(Long masterBrokerId, String masterAddress, int 
masterEpoch, int syncStateSetEpoch,
             List<ReplicaIdentity> inSyncReplicas,
             List<ReplicaIdentity> notInSyncReplicas) {
+            this.masterBrokerId = masterBrokerId;
             this.masterAddress = masterAddress;
             this.masterEpoch = masterEpoch;
             this.syncStateSetEpoch = syncStateSetEpoch;
@@ -99,23 +103,42 @@ public class BrokerReplicasInfo extends 
RemotingSerializable  {
             List<ReplicaIdentity> notInSyncReplicas) {
             this.notInSyncReplicas = notInSyncReplicas;
         }
+
+        public void setMasterBrokerId(Long masterBrokerId) {
+            this.masterBrokerId = masterBrokerId;
+        }
+
+        public Long getMasterBrokerId() {
+            return masterBrokerId;
+        }
     }
 
     public static class ReplicaIdentity extends RemotingSerializable {
-        private String address;
+        private String brokerName;
         private Long brokerId;
 
-        public ReplicaIdentity(String address, Long brokerId) {
-            this.address = address;
+        private String brokerAddress;
+
+        public ReplicaIdentity(String brokerName, Long brokerId, String 
brokerAddress) {
+            this.brokerName = brokerName;
             this.brokerId = brokerId;
+            this.brokerAddress = brokerAddress;
+        }
+
+        public String getBrokerName() {
+            return brokerName;
+        }
+
+        public void setBrokerName(String brokerName) {
+            this.brokerName = brokerName;
         }
 
-        public String getAddress() {
-            return address;
+        public String getBrokerAddress() {
+            return brokerAddress;
         }
 
-        public void setAddress(String address) {
-            this.address = address;
+        public void setBrokerAddress(String brokerAddress) {
+            this.brokerAddress = brokerAddress;
         }
 
         public Long getBrokerId() {
@@ -128,10 +151,11 @@ public class BrokerReplicasInfo extends 
RemotingSerializable  {
 
         @Override
         public String toString() {
-            return "{" +
-                "address='" + address + '\'' +
-                ", brokerId=" + brokerId +
-                '}';
+            return "ReplicaIdentity{" +
+                    "brokerName='" + brokerName + '\'' +
+                    ", brokerId=" + brokerId +
+                    ", brokerAddress='" + brokerAddress + '\'' +
+                    '}';
         }
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SyncStateSet.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SyncStateSet.java
index ced216d85..f0a71f8a9 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SyncStateSet.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SyncStateSet.java
@@ -22,19 +22,19 @@ import java.util.Set;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class SyncStateSet extends RemotingSerializable {
-    private Set<String> syncStateSet;
+    private Set<Long> syncStateSet;
     private int syncStateSetEpoch;
 
-    public SyncStateSet(Set<String> syncStateSet, int syncStateSetEpoch) {
+    public SyncStateSet(Set<Long> syncStateSet, int syncStateSetEpoch) {
         this.syncStateSet = new HashSet<>(syncStateSet);
         this.syncStateSetEpoch = syncStateSetEpoch;
     }
 
-    public Set<String> getSyncStateSet() {
+    public Set<Long> getSyncStateSet() {
         return new HashSet<>(syncStateSet);
     }
 
-    public void setSyncStateSet(Set<String> syncStateSet) {
+    public void setSyncStateSet(Set<Long> syncStateSet) {
         this.syncStateSet = new HashSet<>(syncStateSet);
     }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java
index 3e01379f8..9fbf74e1f 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java
@@ -21,15 +21,15 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class AlterSyncStateSetRequestHeader implements CommandCustomHeader {
     private String brokerName;
-    private String masterAddress;
+    private Long masterBrokerId;
     private int masterEpoch;
 
     public AlterSyncStateSetRequestHeader() {
     }
 
-    public AlterSyncStateSetRequestHeader(String brokerName, String 
masterAddress, int masterEpoch) {
+    public AlterSyncStateSetRequestHeader(String brokerName, Long 
masterBrokerId, int masterEpoch) {
         this.brokerName = brokerName;
-        this.masterAddress = masterAddress;
+        this.masterBrokerId = masterBrokerId;
         this.masterEpoch = masterEpoch;
     }
 
@@ -41,12 +41,12 @@ public class AlterSyncStateSetRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
-    public String getMasterAddress() {
-        return masterAddress;
+    public Long getMasterBrokerId() {
+        return masterBrokerId;
     }
 
-    public void setMasterAddress(String masterAddress) {
-        this.masterAddress = masterAddress;
+    public void setMasterBrokerId(Long masterBrokerId) {
+        this.masterBrokerId = masterBrokerId;
     }
 
     public int getMasterEpoch() {
@@ -60,10 +60,10 @@ public class AlterSyncStateSetRequestHeader implements 
CommandCustomHeader {
     @Override
     public String toString() {
         return "AlterSyncStateSetRequestHeader{" +
-            "brokerName='" + brokerName + '\'' +
-            ", masterAddress='" + masterAddress + '\'' +
-            ", masterEpoch=" + masterEpoch +
-            '}';
+                "brokerName='" + brokerName + '\'' +
+                ", masterBrokerId=" + masterBrokerId +
+                ", masterEpoch=" + masterEpoch +
+                '}';
     }
 
     @Override
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/CleanControllerBrokerDataRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/CleanControllerBrokerDataRequestHeader.java
index 2c49c437c..b2d6640b7 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/CleanControllerBrokerDataRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/CleanControllerBrokerDataRequestHeader.java
@@ -31,18 +31,18 @@ public class CleanControllerBrokerDataRequestHeader 
implements CommandCustomHead
     private String brokerName;
 
     @CFNullable
-    private String brokerAddress;
+    private String brokerIdSetToClean;
 
     private boolean isCleanLivingBroker = false;
 
     public CleanControllerBrokerDataRequestHeader() {
     }
 
-    public CleanControllerBrokerDataRequestHeader(String clusterName, String 
brokerName, String brokerAddress,
+    public CleanControllerBrokerDataRequestHeader(String clusterName, String 
brokerName, String brokerIdSetToClean,
         boolean isCleanLivingBroker) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.brokerAddress = brokerAddress;
+        this.brokerIdSetToClean = brokerIdSetToClean;
         this.isCleanLivingBroker = isCleanLivingBroker;
     }
 
@@ -71,12 +71,12 @@ public class CleanControllerBrokerDataRequestHeader 
implements CommandCustomHead
         this.brokerName = brokerName;
     }
 
-    public String getBrokerAddress() {
-        return brokerAddress;
+    public String getBrokerIdSetToClean() {
+        return brokerIdSetToClean;
     }
 
-    public void setBrokerAddress(String brokerAddress) {
-        this.brokerAddress = brokerAddress;
+    public void setBrokerIdSetToClean(String brokerIdSetToClean) {
+        this.brokerIdSetToClean = brokerIdSetToClean;
     }
 
     public boolean isCleanLivingBroker() {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
index bb9cfca3e..5db4f4c93 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
@@ -29,14 +29,14 @@ public class ElectMasterRequestHeader implements 
CommandCustomHeader {
     private String brokerName;
 
     /**
-     * brokerAddress
-     * for brokerTrigger electMaster: this brokerAddress will be elected as a 
master when it is the first time to elect
+     * brokerId
+     * for brokerTrigger electMaster: this brokerId will be elected as a 
master when it is the first time to elect
      * in this broker-set
-     * for adminTrigger electMaster: this brokerAddress is also named 
assignedBrokerAddress, which means we must prefer to elect
+     * for adminTrigger electMaster: this brokerAddress is also named 
assignedBrokerId, which means we must prefer to elect
      * it as a new master when this broker is valid.
      */
     @CFNotNull
-    private String brokerAddress;
+    private Long brokerId;
 
     @CFNotNull
     private Boolean forceElect = false;
@@ -48,30 +48,30 @@ public class ElectMasterRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
-    public ElectMasterRequestHeader(String clusterName, String brokerName, 
String brokerAddress) {
+    public ElectMasterRequestHeader(String clusterName, String brokerName, 
Long brokerId) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.brokerAddress = brokerAddress;
+        this.brokerId = brokerId;
     }
 
-    public ElectMasterRequestHeader(String clusterName, String brokerName, 
String brokerAddress, boolean forceElect) {
+    public ElectMasterRequestHeader(String clusterName, String brokerName, 
Long brokerId, boolean forceElect) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.brokerAddress = brokerAddress;
+        this.brokerId = brokerId;
         this.forceElect = forceElect;
     }
 
     public static ElectMasterRequestHeader ofBrokerTrigger(String clusterName, 
String brokerName,
-        String brokerAddress) {
-        return new ElectMasterRequestHeader(clusterName, brokerName, 
brokerAddress);
+        Long brokerId) {
+        return new ElectMasterRequestHeader(clusterName, brokerName, brokerId);
     }
 
     public static ElectMasterRequestHeader ofControllerTrigger(String 
brokerName) {
         return new ElectMasterRequestHeader(brokerName);
     }
 
-    public static ElectMasterRequestHeader ofAdminTrigger(String clusterName, 
String brokerName, String brokerAddress) {
-        return new ElectMasterRequestHeader(clusterName, brokerName, 
brokerAddress, true);
+    public static ElectMasterRequestHeader ofAdminTrigger(String clusterName, 
String brokerName, Long brokerId) {
+        return new ElectMasterRequestHeader(clusterName, brokerName, brokerId, 
true);
     }
 
     public String getBrokerName() {
@@ -82,12 +82,12 @@ public class ElectMasterRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
-    public String getBrokerAddress() {
-        return brokerAddress;
+    public Long getBrokerId() {
+        return brokerId;
     }
 
-    public void setBrokerAddress(String brokerAddress) {
-        this.brokerAddress = brokerAddress;
+    public void setBrokerId(Long brokerId) {
+        this.brokerId = brokerId;
     }
 
     public String getClusterName() {
@@ -107,7 +107,7 @@ public class ElectMasterRequestHeader implements 
CommandCustomHeader {
         return "ElectMasterRequestHeader{" +
                 "clusterName='" + clusterName + '\'' +
                 ", brokerName='" + brokerName + '\'' +
-                ", brokerAddress='" + brokerAddress + '\'' +
+                ", brokerId=" + brokerId +
                 ", forceElect=" + forceElect +
                 '}';
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
index 811d64150..1544b37db 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
@@ -21,13 +21,13 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
 
 public class ElectMasterResponseHeader implements CommandCustomHeader {
+
+    private Long masterBrokerId;
     private String masterAddress;
     private int masterEpoch;
     private int syncStateSetEpoch;
     private BrokerMemberGroup brokerMemberGroup;
 
-    private long brokerId = -1;
-
     public ElectMasterResponseHeader() {
     }
 
@@ -63,23 +63,23 @@ public class ElectMasterResponseHeader implements 
CommandCustomHeader {
         this.brokerMemberGroup = brokerMemberGroup;
     }
 
-    public long getBrokerId() {
-        return brokerId;
+    public void setMasterBrokerId(Long masterBrokerId) {
+        this.masterBrokerId = masterBrokerId;
     }
 
-    public void setBrokerId(long brokerId) {
-        this.brokerId = brokerId;
+    public Long getMasterBrokerId() {
+        return masterBrokerId;
     }
 
     @Override
     public String toString() {
         return "ElectMasterResponseHeader{" +
-            "masterAddress='" + masterAddress + '\'' +
-            ", masterEpoch=" + masterEpoch +
-            ", syncStateSetEpoch=" + syncStateSetEpoch +
-            ", brokerMemberGroup=" + brokerMemberGroup +
-            ", brokerId=" + brokerId +
-            '}';
+                "masterBrokerId=" + masterBrokerId +
+                ", masterAddress='" + masterAddress + '\'' +
+                ", masterEpoch=" + masterEpoch +
+                ", syncStateSetEpoch=" + syncStateSetEpoch +
+                ", brokerMemberGroup=" + brokerMemberGroup +
+                '}';
     }
 
     @Override
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java
index fc6d4ce2e..a7b6bbefa 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java
@@ -20,10 +20,10 @@ import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class GetReplicaInfoResponseHeader implements CommandCustomHeader {
+
+    private Long masterBrokerId;
     private String masterAddress;
     private int masterEpoch;
-    // BrokerId for current replicas.
-    private long brokerId = -1L;
 
     public GetReplicaInfoResponseHeader() {
     }
@@ -44,21 +44,21 @@ public class GetReplicaInfoResponseHeader implements 
CommandCustomHeader {
         this.masterEpoch = masterEpoch;
     }
 
-    public long getBrokerId() {
-        return brokerId;
+    public Long getMasterBrokerId() {
+        return masterBrokerId;
     }
 
-    public void setBrokerId(long brokerId) {
-        this.brokerId = brokerId;
+    public void setMasterBrokerId(Long masterBrokerId) {
+        this.masterBrokerId = masterBrokerId;
     }
 
     @Override
     public String toString() {
         return "GetReplicaInfoResponseHeader{" +
-            "masterAddress='" + masterAddress + '\'' +
-            ", masterEpoch=" + masterEpoch +
-            ", brokerId=" + brokerId +
-            '}';
+                "masterBrokerId=" + masterBrokerId +
+                ", masterAddress='" + masterAddress + '\'' +
+                ", masterEpoch=" + masterEpoch +
+                '}';
     }
 
     @Override
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
index eb7332fdf..8469defe2 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
@@ -29,7 +29,7 @@ public class BrokerHeartbeatRequestHeader implements 
CommandCustomHeader {
     private String brokerAddr;
     @CFNotNull
     private String brokerName;
-    @CFNullable
+    @CFNotNull
     private Long brokerId;
     @CFNullable
     private Integer epoch;

Reply via email to