This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 7835d9523f5a71ce7dd01ed15079e0b985c6ebed
Author: TheR1sing3un <[email protected]>
AuthorDate: Sat Feb 4 18:21:21 2023 +0800

     refactor code in module: store/ha for persistence broker-id
    
    1. refactor code in module: store/ha for persistence broker-id
---
 .../broker/controller/ReplicasManager.java         | 79 +++++++++++-----------
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 18 ++---
 .../rocketmq/store/ha/GroupTransferService.java    |  8 +--
 .../store/ha/autoswitch/AutoSwitchHAClient.java    | 35 +++-------
 .../ha/autoswitch/AutoSwitchHAConnection.java      | 38 ++++-------
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 77 +++++++++------------
 .../store/ha/autoswitch/AutoSwitchHATest.java      | 56 +++++++--------
 7 files changed, 138 insertions(+), 173 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 5184a2350..8a03015f7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -81,7 +81,11 @@ public class ReplicasManager {
     private ScheduledFuture<?> checkSyncStateSetTaskFuture;
     private ScheduledFuture<?> slaveSyncFuture;
 
-    private Set<String> syncStateSet;
+    private Long brokerId;
+
+    private Long masterBrokerId;
+
+    private Set<Long> syncStateSet;
     private int syncStateSetEpoch = 0;
     private String masterAddress = "";
     private int masterEpoch = 0;
@@ -99,7 +103,6 @@ public class ReplicasManager {
         this.availableControllerAddresses = new ConcurrentHashMap<>();
         this.syncStateSet = new HashSet<>();
         this.localAddress = brokerController.getBrokerAddr();
-        this.haService.setLocalAddress(this.localAddress);
     }
 
     public long getConfirmOffset() {
@@ -201,12 +204,13 @@ public class ReplicasManager {
                 this.masterEpoch = newMasterEpoch;
 
                 // Change sync state set
-                final HashSet<String> newSyncStateSet = new HashSet<>();
-                newSyncStateSet.add(this.localAddress);
+                final HashSet<Long> newSyncStateSet = new HashSet<>();
+                newSyncStateSet.add(this.brokerId);
                 changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
 
                 // Change record
                 this.masterAddress = this.localAddress;
+                this.masterBrokerId = this.brokerId;
 
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
@@ -228,33 +232,37 @@ public class ReplicasManager {
                         LOGGER.error("Error happen when register broker to 
name-srv, Failed to change broker to master", e);
                         return;
                     }
-                    LOGGER.info("Change broker {} to master success, 
masterEpoch {}, syncStateSetEpoch:{}", this.localAddress, newMasterEpoch, 
syncStateSetEpoch);
+                    LOGGER.info("Change broker [id:{}][address:{}] to master 
success, masterEpoch {}, syncStateSetEpoch:{}", this.brokerConfig, 
this.localAddress, newMasterEpoch, syncStateSetEpoch);
                 });
             }
         }
     }
 
-    public void changeToSlave(final String newMasterAddress, final int 
newMasterEpoch, long brokerId) {
+    public void changeToSlave(final String newMasterAddress, final int 
newMasterEpoch, long newMasterBrokerId) {
         synchronized (this) {
             if (newMasterEpoch > this.masterEpoch) {
-                LOGGER.info("Begin to change to slave, brokerName={}, 
replicas={}, brokerId={}", this.brokerConfig.getBrokerName(), 
this.localAddress, brokerId);
+                LOGGER.info("Begin to change to slave, brokerName={}, 
brokerId={}, newMasterBrokerId={}, newMasterAddress, newMasterEpoch",
+                        this.brokerConfig.getBrokerName(), this.brokerId, 
newMasterBrokerId, newMasterAddress, newMasterEpoch);
 
                 // Change record
                 this.masterAddress = newMasterAddress;
                 this.masterEpoch = newMasterEpoch;
+                this.masterBrokerId = newMasterBrokerId;
 
+                // Stop checking syncStateSet because only master is able to 
check
                 stopCheckSyncStateSet();
 
-                // Change config
+                // Change config(compatibility problem)
                 
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE);
                 this.brokerController.changeSpecialServiceStatus(false);
+                // The brokerId in brokerConfig just means its role(master[0] 
or slave[>=1])
                 this.brokerConfig.setBrokerId(brokerId);
 
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SLAVE);
 
                 // Notify ha service, change to slave
-                this.haService.changeToSlave(newMasterAddress, newMasterEpoch, 
this.brokerConfig.getBrokerId());
+                this.haService.changeToSlave(newMasterAddress, newMasterEpoch, 
brokerId);
 
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
@@ -265,13 +273,13 @@ public class ReplicasManager {
                         return;
                     }
 
-                    LOGGER.info("Change broker {} to slave, 
newMasterAddress:{}, newMasterEpoch:{}", this.localAddress, newMasterAddress, 
newMasterEpoch);
+                    LOGGER.info("Change broker [id:{}][address:{}] to slave, 
newMasterBrokerId:{}, newMasterAddress:{}, newMasterEpoch:{}", this.brokerId, 
this.localAddress, newMasterBrokerId, newMasterAddress, newMasterEpoch);
                 });
             }
         }
     }
 
-    private void changeSyncStateSet(final Set<String> newSyncStateSet, final 
int newSyncStateSetEpoch) {
+    private void changeSyncStateSet(final Set<Long> newSyncStateSet, final int 
newSyncStateSetEpoch) {
         synchronized (this) {
             if (newSyncStateSetEpoch > this.syncStateSetEpoch) {
                 LOGGER.info("Sync state set changed from {} to {}", 
this.syncStateSet, newSyncStateSet);
@@ -313,17 +321,18 @@ public class ReplicasManager {
         // Broker try to elect itself as a master in broker set.
         try {
             ElectMasterResponseHeader tryElectResponse = 
this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerClusterName(),
-                this.brokerConfig.getBrokerName(), this.localAddress);
+                this.brokerConfig.getBrokerName(), this.brokerId);
             final String masterAddress = tryElectResponse.getMasterAddress();
-            if (StringUtils.isEmpty(masterAddress)) {
+            final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
+            if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
                 LOGGER.warn("Now no master in broker set");
                 return false;
             }
 
-            if (StringUtils.equals(masterAddress, this.localAddress)) {
+            if (masterBrokerId.equals(this.brokerId)) {
                 changeToMaster(tryElectResponse.getMasterEpoch(), 
tryElectResponse.getSyncStateSetEpoch());
             } else {
-                changeToSlave(masterAddress, 
tryElectResponse.getMasterEpoch(), tryElectResponse.getBrokerId());
+                changeToSlave(masterAddress, 
tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
             }
             brokerController.setIsolated(false);
             return true;
@@ -375,20 +384,17 @@ public class ReplicasManager {
                 final SyncStateSet syncStateSet = result.getObject2();
                 final String newMasterAddress = info.getMasterAddress();
                 final int newMasterEpoch = info.getMasterEpoch();
-                final long brokerId = info.getBrokerId();
+                final Long masterBrokerId = info.getMasterBrokerId();
                 synchronized (this) {
                     // Check if master changed
                     if (newMasterEpoch > this.masterEpoch) {
-                        if (StringUtils.isNoneEmpty(newMasterAddress)) {
-                            if (StringUtils.equals(newMasterAddress, 
this.localAddress)) {
+                        if (StringUtils.isNoneEmpty(newMasterAddress) && 
masterBrokerId != null) {
+                            if (masterBrokerId.equals(this.brokerId)) {
+                                // If this broker is now the master
                                 changeToMaster(newMasterEpoch, 
syncStateSet.getSyncStateSetEpoch());
                             } else {
-                                if (brokerId > 0) {
-                                    changeToSlave(newMasterAddress, 
newMasterEpoch, brokerId);
-                                } else if (brokerId < 0) {
-                                    // If the brokerId is no existed, we 
should try register again.
-                                    registerBrokerToController();
-                                }
+                                // If this broker is now the slave, and master 
has been changed
+                                changeToSlave(newMasterAddress, 
newMasterEpoch, masterBrokerId);
                             }
                         } else {
                             // In this case, the master in controller is null, 
try elect in controller, this will trigger the electMasterEvent in controller.
@@ -467,8 +473,8 @@ public class ReplicasManager {
             this.checkSyncStateSetTaskFuture.cancel(false);
         }
         this.checkSyncStateSetTaskFuture = 
this.scheduledService.scheduleAtFixedRate(() -> {
-            final Set<String> newSyncStateSet = 
this.haService.maybeShrinkInSyncStateSet();
-            newSyncStateSet.add(this.localAddress);
+            final Set<Long> newSyncStateSet = 
this.haService.maybeShrinkInSyncStateSet();
+            newSyncStateSet.add(this.brokerId);
             synchronized (this) {
                 if (this.syncStateSet != null) {
                     // Check if syncStateSet changed
@@ -481,9 +487,9 @@ public class ReplicasManager {
         }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), 
TimeUnit.MILLISECONDS);
     }
 
-    private void doReportSyncStateSetChanged(Set<String> newSyncStateSet) {
+    private void doReportSyncStateSetChanged(Set<Long> newSyncStateSet) {
         try {
-            final SyncStateSet result = 
this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch, 
newSyncStateSet, this.syncStateSetEpoch);
+            final SyncStateSet result = 
this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerName(), this.brokerId, this.masterEpoch, 
newSyncStateSet, this.syncStateSetEpoch);
             if (result != null) {
                 changeSyncStateSet(result.getSyncStateSet(), 
result.getSyncStateSetEpoch());
             }
@@ -513,16 +519,13 @@ public class ReplicasManager {
         }
 
         for (String address : controllerAddresses) {
-            scanExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    if (brokerOuterAPI.checkAddressReachable(address)) {
-                        availableControllerAddresses.putIfAbsent(address, 
true);
-                    } else {
-                        Boolean value = 
availableControllerAddresses.remove(address);
-                        if (value != null) {
-                            LOGGER.warn("scanAvailableControllerAddresses 
remove unconnected address {}", address);
-                        }
+            scanExecutor.submit(() -> {
+                if (brokerOuterAPI.checkAddressReachable(address)) {
+                    availableControllerAddresses.putIfAbsent(address, true);
+                } else {
+                    Boolean value = 
availableControllerAddresses.remove(address);
+                    if (value != null) {
+                        LOGGER.warn("scanAvailableControllerAddresses remove 
unconnected address {}", address);
                     }
                 }
             });
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 97aeae5a9..f123549cf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -140,12 +140,12 @@ public class BrokerOuterAPI {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final RemotingClient remotingClient;
     private final TopAddressing topAddressing = new 
DefaultTopAddressing(MixAll.getWSAddr());
+    private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new 
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
+            new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("brokerOutApi_thread_", true));
+    private final ClientMetadata clientMetadata;
+    private final RpcClient rpcClient;
     private String nameSrvAddr = null;
-    private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new 
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
-        new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("brokerOutApi_thread_", true));
 
-    private ClientMetadata clientMetadata;
-    private RpcClient rpcClient;
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
         this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new 
ClientMetadata());
@@ -1145,10 +1145,10 @@ public class BrokerOuterAPI {
     public SyncStateSet alterSyncStateSet(
         final String controllerAddress,
         final String brokerName,
-        final String masterAddress, final int masterEpoch,
-        final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws 
Exception {
+        final Long masterBrokerId, final int masterEpoch,
+        final Set<Long> newSyncStateSet, final int syncStateSetEpoch) throws 
Exception {
 
-        final AlterSyncStateSetRequestHeader requestHeader = new 
AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch);
+        final AlterSyncStateSetRequestHeader requestHeader = new 
AlterSyncStateSetRequestHeader(brokerName, masterBrokerId, masterEpoch);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET,
 requestHeader);
         request.setBody(new SyncStateSet(newSyncStateSet, 
syncStateSetEpoch).encode());
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1169,9 +1169,9 @@ public class BrokerOuterAPI {
      * Broker try to elect itself as a master in broker set
      */
     public ElectMasterResponseHeader brokerElect(String controllerAddress, 
String clusterName, String brokerName,
-                                                 String brokerAddress) throws 
Exception {
+                                                 Long brokerId) throws 
Exception {
 
-        final ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, 
brokerAddress);
+        final ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, 
requestHeader);
         RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
         assert response != null;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
index 5318dee8f..9347e2cbf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
@@ -41,10 +41,10 @@ public class GroupTransferService extends ServiceThread {
 
     private final WaitNotifyObject notifyTransferObject = new 
WaitNotifyObject();
     private final PutMessageSpinLock lock = new PutMessageSpinLock();
+    private final DefaultMessageStore defaultMessageStore;
+    private final HAService haService;
     private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new 
LinkedList<>();
     private volatile List<CommitLog.GroupCommitRequest> requestsRead = new 
LinkedList<>();
-    private HAService haService;
-    private DefaultMessageStore defaultMessageStore;
 
     public GroupTransferService(final HAService haService, final 
DefaultMessageStore defaultMessageStore) {
         this.haService = haService;
@@ -97,7 +97,7 @@ public class GroupTransferService extends ServiceThread {
                     if (allAckInSyncStateSet && this.haService instanceof 
AutoSwitchHAService) {
                         // In this mode, we must wait for all replicas that in 
InSyncStateSet.
                         final AutoSwitchHAService autoSwitchHAService = 
(AutoSwitchHAService) this.haService;
-                        final Set<String> syncStateSet = 
autoSwitchHAService.getSyncStateSet();
+                        final Set<Long> syncStateSet = 
autoSwitchHAService.getSyncStateSet();
                         if (syncStateSet.size() <= 1) {
                             // Only master
                             transferOK = true;
@@ -108,7 +108,7 @@ public class GroupTransferService extends ServiceThread {
                         int ackNums = 1;
                         for (HAConnection conn : 
haService.getConnectionList()) {
                             final AutoSwitchHAConnection 
autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
-                            if 
(syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && 
autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
+                            if 
(syncStateSet.contains(autoSwitchHAConnection.getSlaveId()) && 
autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
                                 ackNums++;
                             }
                             if (ackNums >= syncStateSet.size()) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 2c3ab85f7..1000c5e95 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
@@ -45,7 +44,7 @@ import org.apache.rocketmq.store.ha.io.HAWriter;
 public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
     /**
-     * Handshake header buffer size. Schema: state ordinal + Two flags + 
slaveAddressLength. Format:
+     * Handshake header buffer size. Schema: state ordinal + Two flags + 
slaveBrokerId. Format:
      *
      * <pre>
      *                   ┌──────────────────┬───────────────┐
@@ -57,8 +56,8 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
      *                       ╲                          /
      *                        ╲                        /
      * 
┌───────────────────────┬───────────────────────┬───────────────────────┐
-     * │      current state    │          Flags        │  slaveAddressLength   
│
-     * │         (4bytes)      │         (4bytes)      │         (4bytes)      
│
+     * │      current state    │          Flags        │      slaveBrokerId    
│
+     * │         (4bytes)      │         (4bytes)      │         (8bytes)      
│
      * 
├───────────────────────┴───────────────────────┴───────────────────────┤
      * │                                                                       
│
      * │                          HANDSHAKE  Header                            
│
@@ -66,7 +65,7 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
      * <p>
      * Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add 
more flags in the future if needed
      */
-    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4;
+    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8;
 
     /**
      * Header + slaveAddress, Format:
@@ -106,7 +105,6 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
     private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
     private final AtomicReference<String> masterHaAddress = new 
AtomicReference<>();
     private final AtomicReference<String> masterAddress = new 
AtomicReference<>();
-    private final AtomicReference<Long> slaveId = new AtomicReference<>();
     private final ByteBuffer handshakeHeaderBuffer = 
ByteBuffer.allocate(HANDSHAKE_SIZE);
     private final ByteBuffer transferHeaderBuffer = 
ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
     private final AutoSwitchHAService haService;
@@ -114,7 +112,8 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
     private final DefaultMessageStore messageStore;
     private final EpochFileCache epochCache;
 
-    private String localAddress;
+    private final Long brokerId;
+
     private SocketChannel socketChannel;
     private Selector selector;
     private AbstractHAReader haReader;
@@ -138,10 +137,11 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
     private volatile int currentReceivedEpoch;
 
     public AutoSwitchHAClient(AutoSwitchHAService haService, 
DefaultMessageStore defaultMessageStore,
-        EpochFileCache epochCache) throws IOException {
+        EpochFileCache epochCache, Long brokerId) throws IOException {
         this.haService = haService;
         this.messageStore = defaultMessageStore;
         this.epochCache = epochCache;
+        this.brokerId = brokerId;
         init();
     }
 
@@ -183,17 +183,6 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
         return AutoSwitchHAClient.class.getSimpleName();
     }
 
-    public void setLocalAddress(String localAddress) {
-        this.localAddress = localAddress;
-    }
-
-    public void updateSlaveId(Long newId) {
-        Long currentId = this.slaveId.get();
-        if (this.slaveId.compareAndSet(currentId, newId)) {
-            LOGGER.info("Update slave Id, OLD: {}, New: {}", currentId, newId);
-        }
-    }
-
     @Override
     public void updateMasterAddress(String newAddress) {
         String currentAddr = this.masterAddress.get();
@@ -300,7 +289,7 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
 
     private boolean sendHandshakeHeader() throws IOException {
         this.handshakeHeaderBuffer.position(0);
-        this.handshakeHeaderBuffer.limit(HANDSHAKE_SIZE);
+        this.handshakeHeaderBuffer.limit(HANDSHAKE_HEADER_SIZE);
         // Original state
         
this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
         // IsSyncFromLastFile
@@ -309,10 +298,8 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
         // IsAsyncLearner role
         short isAsyncLearner = 
this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner()
 ? (short) 1 : (short) 0;
         this.handshakeHeaderBuffer.putShort(isAsyncLearner);
-        // Address length
-        this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 : 
this.localAddress.length());
-        // Slave address
-        this.handshakeHeaderBuffer.put(this.localAddress == null ? new byte[0] 
: this.localAddress.getBytes(StandardCharsets.UTF_8));
+        // Slave brokerId
+        this.handshakeHeaderBuffer.putLong(this.brokerId);
 
         this.handshakeHeaderBuffer.flip();
         return this.haWriter.write(this.socketChannel, 
this.handshakeHeaderBuffer);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 7401574e5..383c5a4a7 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -92,7 +91,6 @@ public class AutoSwitchHAConnection implements HAConnection {
     private volatile boolean isSyncFromLastFile = false;
     private volatile boolean isAsyncLearner = false;
     private volatile long slaveId = -1;
-    private volatile String slaveAddress;
 
     /**
      * Last endOffset when master transfer data to slave
@@ -161,10 +159,6 @@ public class AutoSwitchHAConnection implements 
HAConnection {
         return slaveId;
     }
 
-    public String getSlaveAddress() {
-        return slaveAddress;
-    }
-
     @Override
     public HAConnectionState getCurrentState() {
         return currentState;
@@ -220,8 +214,8 @@ public class AutoSwitchHAConnection implements HAConnection 
{
     private synchronized void maybeExpandInSyncStateSet(long slaveMaxOffset) {
         if (!this.isAsyncLearner && slaveMaxOffset >= 
this.lastMasterMaxOffset) {
             long caughtUpTimeMs = 
this.haService.getDefaultMessageStore().getMaxPhyOffset() == slaveMaxOffset ? 
System.currentTimeMillis() : this.lastTransferTimeMs;
-            this.haService.updateConnectionLastCaughtUpTime(this.slaveAddress, 
caughtUpTimeMs);
-            this.haService.maybeExpandInSyncStateSet(this.slaveAddress, 
slaveMaxOffset);
+            this.haService.updateConnectionLastCaughtUpTime(this.slaveId, 
caughtUpTimeMs);
+            this.haService.maybeExpandInSyncStateSet(this.slaveId, 
slaveMaxOffset);
         }
     }
 
@@ -318,33 +312,25 @@ public class AutoSwitchHAConnection implements 
HAConnection {
 
                         switch (slaveState) {
                             case HANDSHAKE:
-                                // AddressLength
-                                int addressLength = 
byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 
4);
-                                if (diff < 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
-                                    processSuccess = false;
-                                    break;
-                                }
+                                // SlaveBrokerId
+                                Long slaveBrokerId = 
byteBufferRead.getLong(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 8);
+                                AutoSwitchHAConnection.this.slaveId = 
slaveBrokerId;
                                 // Flag(isSyncFromLastFile)
-                                short syncFromLastFileFlag = 
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 8);
+                                short syncFromLastFileFlag = 
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 12);
                                 if (syncFromLastFileFlag == 1) {
                                     
AutoSwitchHAConnection.this.isSyncFromLastFile = true;
                                 }
                                 // Flag(isAsyncLearner role)
-                                short isAsyncLearner = 
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 6);
+                                short isAsyncLearner = 
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 10);
                                 if (isAsyncLearner == 1) {
                                     AutoSwitchHAConnection.this.isAsyncLearner 
= true;
                                 }
-                                // Address
-                                final byte[] addressData = new 
byte[addressLength];
-                                byteBufferRead.position(readPosition + 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE);
-                                byteBufferRead.get(addressData);
-                                AutoSwitchHAConnection.this.slaveAddress = new 
String(addressData, StandardCharsets.UTF_8);
 
                                 isSlaveSendHandshake = true;
                                 byteBufferRead.position(readSocketPos);
-                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength;
-                                LOGGER.info("Receive slave handshake, 
slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
-                                    AutoSwitchHAConnection.this.slaveAddress, 
AutoSwitchHAConnection.this.isSyncFromLastFile, 
AutoSwitchHAConnection.this.isAsyncLearner);
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE;
+                                LOGGER.info("Receive slave handshake, 
slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
+                                    AutoSwitchHAConnection.this.slaveId, 
AutoSwitchHAConnection.this.isSyncFromLastFile, 
AutoSwitchHAConnection.this.isAsyncLearner);
                                 break;
                             case TRANSFER:
                                 long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
@@ -356,7 +342,7 @@ public class AutoSwitchHAConnection implements HAConnection 
{
                                 }
                                 byteBufferRead.position(readSocketPos);
                                 maybeExpandInSyncStateSet(slaveMaxOffset);
-                                
AutoSwitchHAConnection.this.haService.updateConfirmOffsetWhenSlaveAck(AutoSwitchHAConnection.this.slaveAddress);
+                                
AutoSwitchHAConnection.this.haService.updateConfirmOffsetWhenSlaveAck(AutoSwitchHAConnection.this.slaveId);
                                 
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
                                 break;
                             default:
@@ -629,7 +615,7 @@ public class AutoSwitchHAConnection implements HAConnection 
{
                 this.lastWriteOver = this.transferData(size);
             } else {
                 // If size == 0, we should update the lastCatchupTimeMs
-                
AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveAddress,
 System.currentTimeMillis());
+                
AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId,
 System.currentTimeMillis());
                 haService.getWaitNotifyObject().allWaitForRunning(100);
             }
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 7382587dc..b858cfcae 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -56,17 +56,15 @@ import 
org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
 public class AutoSwitchHAService extends DefaultHAService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(new 
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
-    private final List<Consumer<Set<String>>> syncStateSetChangedListeners = 
new ArrayList<>();
-    private final CopyOnWriteArraySet<String> syncStateSet = new 
CopyOnWriteArraySet<>();
-    private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable 
= new ConcurrentHashMap<>();
+    private final List<Consumer<Set<Long/*brokerId*/>>> 
syncStateSetChangedListeners = new ArrayList<>();
+    private final CopyOnWriteArraySet<Long/*brokerId*/> syncStateSet = new 
CopyOnWriteArraySet<>();
+    private final ConcurrentHashMap<Long/*brokerId*/, 
Long/*lastCaughtUpTimestamp*/> connectionCaughtUpTimeTable = new 
ConcurrentHashMap<>();
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     private volatile long confirmOffset = -1;
 
-    private String localAddress;
-
     private EpochFileCache epochCache;
     private AutoSwitchHAClient haClient;
 
-    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
     public AutoSwitchHAService() {
     }
@@ -93,8 +91,8 @@ public class AutoSwitchHAService extends DefaultHAService {
     @Override
     public void removeConnection(HAConnection conn) {
         if (!defaultMessageStore.isShutdown()) {
-            final Set<String> syncStateSet = getSyncStateSet();
-            String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress();
+            final Set<Long> syncStateSet = getSyncStateSet();
+            Long slave = ((AutoSwitchHAConnection) conn).getSlaveId();
             if (syncStateSet.contains(slave)) {
                 syncStateSet.remove(slave);
                 notifySyncStateSetChanged(syncStateSet);
@@ -163,12 +161,10 @@ public class AutoSwitchHAService extends DefaultHAService 
{
         try {
             destroyConnections();
             if (this.haClient == null) {
-                this.haClient = new AutoSwitchHAClient(this, 
defaultMessageStore, this.epochCache);
+                this.haClient = new AutoSwitchHAClient(this, 
defaultMessageStore, this.epochCache, slaveId);
             } else {
                 this.haClient.reOpen();
             }
-            this.haClient.setLocalAddress(this.localAddress);
-            this.haClient.updateSlaveId(slaveId);
             this.haClient.updateMasterAddress(newMasterAddr);
             this.haClient.updateHaMasterAddress(null);
             this.haClient.start();
@@ -213,15 +209,13 @@ public class AutoSwitchHAService extends DefaultHAService 
{
     public void updateMasterAddress(String newAddr) {
     }
 
-    public void registerSyncStateSetChangedListener(final 
Consumer<Set<String>> listener) {
+    public void registerSyncStateSetChangedListener(final Consumer<Set<Long>> 
listener) {
         this.syncStateSetChangedListeners.add(listener);
     }
 
-    public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) {
+    public void notifySyncStateSetChanged(final Set<Long> newSyncStateSet) {
         this.executorService.submit(() -> {
-            for (Consumer<Set<String>> listener : 
syncStateSetChangedListeners) {
-                listener.accept(newSyncStateSet);
-            }
+            syncStateSetChangedListeners.forEach(listener -> 
listener.accept(newSyncStateSet));
         });
     }
 
@@ -229,15 +223,15 @@ public class AutoSwitchHAService extends DefaultHAService 
{
      * Check and maybe shrink the inSyncStateSet.
      * A slave will be removed from inSyncStateSet if (curTime - 
HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
      */
-    public Set<String> maybeShrinkInSyncStateSet() {
-        final Set<String> newSyncStateSet = getSyncStateSet();
+    public Set<Long> maybeShrinkInSyncStateSet() {
+        final Set<Long> newSyncStateSet = getSyncStateSet();
         final long haMaxTimeSlaveNotCatchup = 
this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
-        for (Map.Entry<String, Long> next : 
this.connectionCaughtUpTimeTable.entrySet()) {
-            final String slaveAddress = next.getKey();
-            if (newSyncStateSet.contains(slaveAddress)) {
-                final Long lastCaughtUpTimeMs = 
this.connectionCaughtUpTimeTable.get(slaveAddress);
+        for (Map.Entry<Long, Long> next : 
this.connectionCaughtUpTimeTable.entrySet()) {
+            final Long slaveBrokerId = next.getKey();
+            if (newSyncStateSet.contains(slaveBrokerId)) {
+                final Long lastCaughtUpTimeMs = next.getValue();
                 if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > 
haMaxTimeSlaveNotCatchup) {
-                    newSyncStateSet.remove(slaveAddress);
+                    newSyncStateSet.remove(slaveBrokerId);
                 }
             }
         }
@@ -248,25 +242,25 @@ public class AutoSwitchHAService extends DefaultHAService 
{
      * Check and maybe add the slave to inSyncStateSet. A slave will be added 
to inSyncStateSet if its slaveMaxOffset >=
      * current confirmOffset, and it is caught up to an offset within the 
current leader epoch.
      */
-    public void maybeExpandInSyncStateSet(final String slaveAddress, final 
long slaveMaxOffset) {
-        final Set<String> currentSyncStateSet = getSyncStateSet();
-        if (currentSyncStateSet.contains(slaveAddress)) {
+    public void maybeExpandInSyncStateSet(final Long slaveBrokerId, final long 
slaveMaxOffset) {
+        final Set<Long> currentSyncStateSet = getSyncStateSet();
+        if (currentSyncStateSet.contains(slaveBrokerId)) {
             return;
         }
         final long confirmOffset = getConfirmOffset();
         if (slaveMaxOffset >= confirmOffset) {
             final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
             if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
-                currentSyncStateSet.add(slaveAddress);
+                currentSyncStateSet.add(slaveBrokerId);
                 // Notify the upper layer that syncStateSet changed.
                 notifySyncStateSetChanged(currentSyncStateSet);
             }
         }
     }
 
-    public void updateConnectionLastCaughtUpTime(final String slaveAddress, 
final long lastCaughtUpTimeMs) {
-        Long prevTime = 
ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, 
slaveAddress, k -> 0L);
-        this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime, 
lastCaughtUpTimeMs));
+    public void updateConnectionLastCaughtUpTime(final Long slaveBrokerId, 
final long lastCaughtUpTimeMs) {
+        Long prevTime = 
ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, 
slaveBrokerId, k -> 0L);
+        this.connectionCaughtUpTimeTable.put(slaveBrokerId, Math.max(prevTime, 
lastCaughtUpTimeMs));
     }
 
     /**
@@ -285,8 +279,8 @@ public class AutoSwitchHAService extends DefaultHAService {
         return confirmOffset;
     }
 
-    public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) {
-        if (this.syncStateSet.contains(slaveAddress)) {
+    public void updateConfirmOffsetWhenSlaveAck(final Long slaveBrokerId) {
+        if (this.syncStateSet.contains(slaveBrokerId)) {
             this.confirmOffset = computeConfirmOffset();
         }
     }
@@ -330,7 +324,7 @@ public class AutoSwitchHAService extends DefaultHAService {
                 
cInfo.setTransferredByteInSecond(conn.getTransferredByteInSecond());
                 cInfo.setTransferFromWhere(conn.getTransferFromWhere());
 
-                
cInfo.setInSync(syncStateSet.contains(((AutoSwitchHAConnection) 
conn).getSlaveAddress()));
+                
cInfo.setInSync(syncStateSet.contains(((AutoSwitchHAConnection) 
conn).getSlaveId()));
 
                 info.getHaConnectionInfo().add(cInfo);
             }
@@ -344,18 +338,18 @@ public class AutoSwitchHAService extends DefaultHAService 
{
     }
 
     private long computeConfirmOffset() {
-        final Set<String> currentSyncStateSet = getSyncStateSet();
+        final Set<Long> currentSyncStateSet = getSyncStateSet();
         long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
         for (HAConnection connection : this.connectionList) {
-            final String slaveAddress = ((AutoSwitchHAConnection) 
connection).getSlaveAddress();
-            if (currentSyncStateSet.contains(slaveAddress)) {
+            final Long slaveId = ((AutoSwitchHAConnection) 
connection).getSlaveId();
+            if (currentSyncStateSet.contains(slaveId)) {
                 confirmOffset = Math.min(confirmOffset, 
connection.getSlaveAckOffset());
             }
         }
         return confirmOffset;
     }
 
-    public void setSyncStateSet(final Set<String> syncStateSet) {
+    public void setSyncStateSet(final Set<Long> syncStateSet) {
         final Lock writeLock = readWriteLock.writeLock();
         try {
             writeLock.lock();
@@ -367,12 +361,11 @@ public class AutoSwitchHAService extends DefaultHAService 
{
         }
     }
 
-    public Set<String> getSyncStateSet() {
+    public Set<Long> getSyncStateSet() {
         final Lock readLock = readWriteLock.readLock();
         try {
             readLock.lock();
-            HashSet<String> set = new HashSet<>(this.syncStateSet.size());
-            set.addAll(this.syncStateSet);
+            HashSet<Long> set = new HashSet<>(this.syncStateSet);
             return set;
         } finally {
             readLock.unlock();
@@ -387,10 +380,6 @@ public class AutoSwitchHAService extends DefaultHAService {
         this.epochCache.truncateSuffixByOffset(offset);
     }
 
-    public void setLocalAddress(String localAddress) {
-        this.localAddress = localAddress;
-    }
-
     /**
      * Try to truncate incomplete msg transferred from master.
      */
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index bc392022d..d2e6d5c21 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -100,7 +100,7 @@ public class AutoSwitchHATest {
 
         storeConfig2 = new MessageStoreConfig();
         storeConfig2.setBrokerRole(BrokerRole.SLAVE);
-        storeConfig1.setHaSendHeartbeatInterval(1000);
+        storeConfig2.setHaSendHeartbeatInterval(1000);
         storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + 
brokerName + "#2");
         storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + 
brokerName + "#2" + File.separator + "commitlog");
         storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + 
brokerName + "#2" + File.separator + "EpochFileCache");
@@ -110,12 +110,12 @@ public class AutoSwitchHATest {
         buildMessageStoreConfig(storeConfig2, mappedFileSize);
         this.store2HaAddress = "127.0.0.1:10943";
 
-        messageStore1 = buildMessageStore(storeConfig1, 0L);
-        messageStore2 = buildMessageStore(storeConfig2, 1L);
+        messageStore1 = buildMessageStore(storeConfig1, 1L);
+        messageStore2 = buildMessageStore(storeConfig2, 2L);
 
         storeConfig3 = new MessageStoreConfig();
         storeConfig3.setBrokerRole(BrokerRole.SLAVE);
-        storeConfig1.setHaSendHeartbeatInterval(1000);
+        storeConfig3.setHaSendHeartbeatInterval(1000);
         storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + 
brokerName + "#3");
         storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + 
brokerName + "#3" + File.separator + "commitlog");
         storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + 
brokerName + "#3" + File.separator + "EpochFileCache");
@@ -132,9 +132,9 @@ public class AutoSwitchHATest {
         messageStore2.start();
         messageStore3.start();
 
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
-        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
-        ((AutoSwitchHAService) 
this.messageStore3.getHaService()).setLocalAddress("127.0.0.1:8002");
+//        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).("127.0.0.1:8000");
+//        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+//        ((AutoSwitchHAService) 
this.messageStore3.getHaService()).setLocalAddress("127.0.0.1:8002");
     }
 
     public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws 
Exception {
@@ -162,16 +162,16 @@ public class AutoSwitchHATest {
         buildMessageStoreConfig(storeConfig2, mappedFileSize);
         this.store2HaAddress = "127.0.0.1:10943";
 
-        messageStore1 = buildMessageStore(storeConfig1, 0L);
-        messageStore2 = buildMessageStore(storeConfig2, 1L);
+        messageStore1 = buildMessageStore(storeConfig1, 1L);
+        messageStore2 = buildMessageStore(storeConfig2, 2L);
 
         assertTrue(messageStore1.load());
         assertTrue(messageStore2.load());
         messageStore1.start();
         messageStore2.start();
 
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
-        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+//        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+//        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
     }
 
     private boolean changeMasterAndPutMessage(DefaultMessageStore master, 
MessageStoreConfig masterConfig,
@@ -206,7 +206,7 @@ public class AutoSwitchHATest {
     public void testConfirmOffset() throws Exception {
         init(defaultMappedFileSize, true);
         // Step1, set syncStateSet, if both broker1 and broker2 are in 
syncStateSet, the confirmOffset will be computed as the min 
slaveAckOffset(broker2's ack)
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Arrays.asList("127.0.0.1:8000", "127.0.0.1:8001")));
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Arrays.asList(1L, 2L)));
         boolean masterAndPutMessage = 
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         assertTrue(masterAndPutMessage);
         checkMessage(this.messageStore2, 10, 0);
@@ -231,7 +231,7 @@ public class AutoSwitchHATest {
         assertTrue(messageStore2.load());
         messageStore2.start();
         messageStore2.getHaService().changeToMaster(2);
-        ((AutoSwitchHAService) 
messageStore2.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8001")));
+        ((AutoSwitchHAService) 
messageStore2.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(2L)));
 
         // Put message on master
         for (int i = 0; i < 10; i++) {
@@ -252,7 +252,7 @@ public class AutoSwitchHATest {
     @Test
     public void testAsyncLearnerBrokerRole() throws Exception {
         init(defaultMappedFileSize);
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(1L)));
 
         storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
         storeConfig2.setBrokerRole(BrokerRole.SLAVE);
@@ -265,15 +265,15 @@ public class AutoSwitchHATest {
             messageStore1.putMessage(buildMessage());
         }
         checkMessage(messageStore2, 10, 0);
-        final Set<String> syncStateSet = ((AutoSwitchHAService) 
this.messageStore1.getHaService()).getSyncStateSet();
-        assertFalse(syncStateSet.contains("127.0.0.1:8001"));
+        final Set<Long> syncStateSet = ((AutoSwitchHAService) 
this.messageStore1.getHaService()).getSyncStateSet();
+        assertFalse(syncStateSet.contains(2L));
     }
 
     @Test
     public void testOptionAllAckInSyncStateSet() throws Exception {
         init(defaultMappedFileSize, true);
-        AtomicReference<Set<String>> syncStateSet = new AtomicReference<>();
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+        AtomicReference<Set<Long>> syncStateSet = new AtomicReference<>();
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(1L)));
         ((AutoSwitchHAService) 
this.messageStore1.getHaService()).registerSyncStateSetChangedListener(newSyncStateSet
 -> {
             syncStateSet.set(newSyncStateSet);
         });
@@ -281,9 +281,9 @@ public class AutoSwitchHATest {
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
         // Check syncStateSet
-        final Set<String> result = syncStateSet.get();
-        assertTrue(result.contains("127.0.0.1:8000"));
-        assertTrue(result.contains("127.0.0.1:8001"));
+        final Set<Long> result = syncStateSet.get();
+        assertTrue(result.contains(1L));
+        assertTrue(result.contains(2L));
 
         // Now, shutdown store2
         this.messageStore2.shutdown();
@@ -304,12 +304,12 @@ public class AutoSwitchHATest {
 
         // Step1, change store1 to master, store2 to follower
         init(defaultMappedFileSize);
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(1L)));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
 
         // Step2, change store1 to follower, store2 to master, epoch = 2
-        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8001")));
+        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(2L)));
         changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, 
this.messageStore1, 1, this.storeConfig1, 2, store2HaAddress, 10);
         checkMessage(this.messageStore1, 20, 0);
 
@@ -322,7 +322,7 @@ public class AutoSwitchHATest {
     public void testAddBroker() throws Exception {
         // Step1: broker1 as leader, broker2 as follower
         init(defaultMappedFileSize);
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(1L)));
 
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
@@ -341,7 +341,7 @@ public class AutoSwitchHATest {
         // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each 
epoch will be stored on one file(Because fileSize = 1700, which only can hold 
10 msgs);
         // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
 
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(1L)));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
 
@@ -378,7 +378,7 @@ public class AutoSwitchHATest {
         // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each 
epoch will be stored on one file(Because fileSize = 1700, which only can hold 
10 msgs);
         // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
 
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(1L)));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);
@@ -406,7 +406,7 @@ public class AutoSwitchHATest {
         checkMessage(messageStore3, 10, 10);
 
         // Step5: change broker2 as leader, broker3 as follower
-        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8001")));
+        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(2L)));
         changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, 
this.messageStore3, 3, this.storeConfig3, 3, this.store2HaAddress, 10);
         checkMessage(messageStore3, 20, 10);
 
@@ -424,7 +424,7 @@ public class AutoSwitchHATest {
 
         // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each 
epoch will be stored on one file(Because fileSize = 1700, which only can hold 
10 msgs);
         // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
-        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList(1L)));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);

Reply via email to