This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 7835d9523f5a71ce7dd01ed15079e0b985c6ebed Author: TheR1sing3un <[email protected]> AuthorDate: Sat Feb 4 18:21:21 2023 +0800 refactor code in module: store/ha for persistence broker-id 1. refactor code in module: store/ha for persistence broker-id --- .../broker/controller/ReplicasManager.java | 79 +++++++++++----------- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 18 ++--- .../rocketmq/store/ha/GroupTransferService.java | 8 +-- .../store/ha/autoswitch/AutoSwitchHAClient.java | 35 +++------- .../ha/autoswitch/AutoSwitchHAConnection.java | 38 ++++------- .../store/ha/autoswitch/AutoSwitchHAService.java | 77 +++++++++------------ .../store/ha/autoswitch/AutoSwitchHATest.java | 56 +++++++-------- 7 files changed, 138 insertions(+), 173 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 5184a2350..8a03015f7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -81,7 +81,11 @@ public class ReplicasManager { private ScheduledFuture<?> checkSyncStateSetTaskFuture; private ScheduledFuture<?> slaveSyncFuture; - private Set<String> syncStateSet; + private Long brokerId; + + private Long masterBrokerId; + + private Set<Long> syncStateSet; private int syncStateSetEpoch = 0; private String masterAddress = ""; private int masterEpoch = 0; @@ -99,7 +103,6 @@ public class ReplicasManager { this.availableControllerAddresses = new ConcurrentHashMap<>(); this.syncStateSet = new HashSet<>(); this.localAddress = brokerController.getBrokerAddr(); - this.haService.setLocalAddress(this.localAddress); } public long getConfirmOffset() { @@ -201,12 +204,13 @@ public class ReplicasManager { this.masterEpoch = newMasterEpoch; // Change sync state set - final HashSet<String> newSyncStateSet = new HashSet<>(); - newSyncStateSet.add(this.localAddress); + final HashSet<Long> newSyncStateSet = new HashSet<>(); + newSyncStateSet.add(this.brokerId); changeSyncStateSet(newSyncStateSet, syncStateSetEpoch); // Change record this.masterAddress = this.localAddress; + this.masterBrokerId = this.brokerId; // Handle the slave synchronise handleSlaveSynchronize(BrokerRole.SYNC_MASTER); @@ -228,33 +232,37 @@ public class ReplicasManager { LOGGER.error("Error happen when register broker to name-srv, Failed to change broker to master", e); return; } - LOGGER.info("Change broker {} to master success, masterEpoch {}, syncStateSetEpoch:{}", this.localAddress, newMasterEpoch, syncStateSetEpoch); + LOGGER.info("Change broker [id:{}][address:{}] to master success, masterEpoch {}, syncStateSetEpoch:{}", this.brokerConfig, this.localAddress, newMasterEpoch, syncStateSetEpoch); }); } } } - public void changeToSlave(final String newMasterAddress, final int newMasterEpoch, long brokerId) { + public void changeToSlave(final String newMasterAddress, final int newMasterEpoch, long newMasterBrokerId) { synchronized (this) { if (newMasterEpoch > this.masterEpoch) { - LOGGER.info("Begin to change to slave, brokerName={}, replicas={}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, brokerId); + LOGGER.info("Begin to change to slave, brokerName={}, brokerId={}, newMasterBrokerId={}, newMasterAddress, newMasterEpoch", + this.brokerConfig.getBrokerName(), this.brokerId, newMasterBrokerId, newMasterAddress, newMasterEpoch); // Change record this.masterAddress = newMasterAddress; this.masterEpoch = newMasterEpoch; + this.masterBrokerId = newMasterBrokerId; + // Stop checking syncStateSet because only master is able to check stopCheckSyncStateSet(); - // Change config + // Change config(compatibility problem) this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE); this.brokerController.changeSpecialServiceStatus(false); + // The brokerId in brokerConfig just means its role(master[0] or slave[>=1]) this.brokerConfig.setBrokerId(brokerId); // Handle the slave synchronise handleSlaveSynchronize(BrokerRole.SLAVE); // Notify ha service, change to slave - this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId()); + this.haService.changeToSlave(newMasterAddress, newMasterEpoch, brokerId); this.executorService.submit(() -> { // Register broker to name-srv @@ -265,13 +273,13 @@ public class ReplicasManager { return; } - LOGGER.info("Change broker {} to slave, newMasterAddress:{}, newMasterEpoch:{}", this.localAddress, newMasterAddress, newMasterEpoch); + LOGGER.info("Change broker [id:{}][address:{}] to slave, newMasterBrokerId:{}, newMasterAddress:{}, newMasterEpoch:{}", this.brokerId, this.localAddress, newMasterBrokerId, newMasterAddress, newMasterEpoch); }); } } } - private void changeSyncStateSet(final Set<String> newSyncStateSet, final int newSyncStateSetEpoch) { + private void changeSyncStateSet(final Set<Long> newSyncStateSet, final int newSyncStateSetEpoch) { synchronized (this) { if (newSyncStateSetEpoch > this.syncStateSetEpoch) { LOGGER.info("Sync state set changed from {} to {}", this.syncStateSet, newSyncStateSet); @@ -313,17 +321,18 @@ public class ReplicasManager { // Broker try to elect itself as a master in broker set. try { ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), - this.brokerConfig.getBrokerName(), this.localAddress); + this.brokerConfig.getBrokerName(), this.brokerId); final String masterAddress = tryElectResponse.getMasterAddress(); - if (StringUtils.isEmpty(masterAddress)) { + final Long masterBrokerId = tryElectResponse.getMasterBrokerId(); + if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) { LOGGER.warn("Now no master in broker set"); return false; } - if (StringUtils.equals(masterAddress, this.localAddress)) { + if (masterBrokerId.equals(this.brokerId)) { changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch()); } else { - changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getBrokerId()); + changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId()); } brokerController.setIsolated(false); return true; @@ -375,20 +384,17 @@ public class ReplicasManager { final SyncStateSet syncStateSet = result.getObject2(); final String newMasterAddress = info.getMasterAddress(); final int newMasterEpoch = info.getMasterEpoch(); - final long brokerId = info.getBrokerId(); + final Long masterBrokerId = info.getMasterBrokerId(); synchronized (this) { // Check if master changed if (newMasterEpoch > this.masterEpoch) { - if (StringUtils.isNoneEmpty(newMasterAddress)) { - if (StringUtils.equals(newMasterAddress, this.localAddress)) { + if (StringUtils.isNoneEmpty(newMasterAddress) && masterBrokerId != null) { + if (masterBrokerId.equals(this.brokerId)) { + // If this broker is now the master changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch()); } else { - if (brokerId > 0) { - changeToSlave(newMasterAddress, newMasterEpoch, brokerId); - } else if (brokerId < 0) { - // If the brokerId is no existed, we should try register again. - registerBrokerToController(); - } + // If this broker is now the slave, and master has been changed + changeToSlave(newMasterAddress, newMasterEpoch, masterBrokerId); } } else { // In this case, the master in controller is null, try elect in controller, this will trigger the electMasterEvent in controller. @@ -467,8 +473,8 @@ public class ReplicasManager { this.checkSyncStateSetTaskFuture.cancel(false); } this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(() -> { - final Set<String> newSyncStateSet = this.haService.maybeShrinkInSyncStateSet(); - newSyncStateSet.add(this.localAddress); + final Set<Long> newSyncStateSet = this.haService.maybeShrinkInSyncStateSet(); + newSyncStateSet.add(this.brokerId); synchronized (this) { if (this.syncStateSet != null) { // Check if syncStateSet changed @@ -481,9 +487,9 @@ public class ReplicasManager { }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS); } - private void doReportSyncStateSetChanged(Set<String> newSyncStateSet) { + private void doReportSyncStateSetChanged(Set<Long> newSyncStateSet) { try { - final SyncStateSet result = this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress, this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch, newSyncStateSet, this.syncStateSetEpoch); + final SyncStateSet result = this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress, this.brokerConfig.getBrokerName(), this.brokerId, this.masterEpoch, newSyncStateSet, this.syncStateSetEpoch); if (result != null) { changeSyncStateSet(result.getSyncStateSet(), result.getSyncStateSetEpoch()); } @@ -513,16 +519,13 @@ public class ReplicasManager { } for (String address : controllerAddresses) { - scanExecutor.submit(new Runnable() { - @Override - public void run() { - if (brokerOuterAPI.checkAddressReachable(address)) { - availableControllerAddresses.putIfAbsent(address, true); - } else { - Boolean value = availableControllerAddresses.remove(address); - if (value != null) { - LOGGER.warn("scanAvailableControllerAddresses remove unconnected address {}", address); - } + scanExecutor.submit(() -> { + if (brokerOuterAPI.checkAddressReachable(address)) { + availableControllerAddresses.putIfAbsent(address, true); + } else { + Boolean value = availableControllerAddresses.remove(address); + if (value != null) { + LOGGER.warn("scanAvailableControllerAddresses remove unconnected address {}", address); } } }); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 97aeae5a9..f123549cf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -140,12 +140,12 @@ public class BrokerOuterAPI { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final RemotingClient remotingClient; private final TopAddressing topAddressing = new DefaultTopAddressing(MixAll.getWSAddr()); + private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); + private final ClientMetadata clientMetadata; + private final RpcClient rpcClient; private String nameSrvAddr = null; - private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, - new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); - private ClientMetadata clientMetadata; - private RpcClient rpcClient; public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) { this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new ClientMetadata()); @@ -1145,10 +1145,10 @@ public class BrokerOuterAPI { public SyncStateSet alterSyncStateSet( final String controllerAddress, final String brokerName, - final String masterAddress, final int masterEpoch, - final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws Exception { + final Long masterBrokerId, final int masterEpoch, + final Set<Long> newSyncStateSet, final int syncStateSetEpoch) throws Exception { - final AlterSyncStateSetRequestHeader requestHeader = new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch); + final AlterSyncStateSetRequestHeader requestHeader = new AlterSyncStateSetRequestHeader(brokerName, masterBrokerId, masterEpoch); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, requestHeader); request.setBody(new SyncStateSet(newSyncStateSet, syncStateSetEpoch).encode()); final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); @@ -1169,9 +1169,9 @@ public class BrokerOuterAPI { * Broker try to elect itself as a master in broker set */ public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName, - String brokerAddress) throws Exception { + Long brokerId) throws Exception { - final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress); + final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); assert response != null; diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java index 5318dee8f..9347e2cbf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java @@ -41,10 +41,10 @@ public class GroupTransferService extends ServiceThread { private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); private final PutMessageSpinLock lock = new PutMessageSpinLock(); + private final DefaultMessageStore defaultMessageStore; + private final HAService haService; private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>(); private volatile List<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>(); - private HAService haService; - private DefaultMessageStore defaultMessageStore; public GroupTransferService(final HAService haService, final DefaultMessageStore defaultMessageStore) { this.haService = haService; @@ -97,7 +97,7 @@ public class GroupTransferService extends ServiceThread { if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) { // In this mode, we must wait for all replicas that in InSyncStateSet. final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService; - final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet(); + final Set<Long> syncStateSet = autoSwitchHAService.getSyncStateSet(); if (syncStateSet.size() <= 1) { // Only master transferOK = true; @@ -108,7 +108,7 @@ public class GroupTransferService extends ServiceThread { int ackNums = 1; for (HAConnection conn : haService.getConnectionList()) { final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn; - if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) { + if (syncStateSet.contains(autoSwitchHAConnection.getSlaveId()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) { ackNums++; } if (ackNums >= syncStateSet.size()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java index 2c3ab85f7..1000c5e95 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -45,7 +44,7 @@ import org.apache.rocketmq.store.ha.io.HAWriter; public class AutoSwitchHAClient extends ServiceThread implements HAClient { /** - * Handshake header buffer size. Schema: state ordinal + Two flags + slaveAddressLength. Format: + * Handshake header buffer size. Schema: state ordinal + Two flags + slaveBrokerId. Format: * * <pre> * ┌──────────────────┬───────────────┐ @@ -57,8 +56,8 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { * ╲ / * ╲ / * ┌───────────────────────┬───────────────────────┬───────────────────────┐ - * │ current state │ Flags │ slaveAddressLength │ - * │ (4bytes) │ (4bytes) │ (4bytes) │ + * │ current state │ Flags │ slaveBrokerId │ + * │ (4bytes) │ (4bytes) │ (8bytes) │ * ├───────────────────────┴───────────────────────┴───────────────────────┤ * │ │ * │ HANDSHAKE Header │ @@ -66,7 +65,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { * <p> * Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add more flags in the future if needed */ - public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4; + public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8; /** * Header + slaveAddress, Format: @@ -106,7 +105,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; private final AtomicReference<String> masterHaAddress = new AtomicReference<>(); private final AtomicReference<String> masterAddress = new AtomicReference<>(); - private final AtomicReference<Long> slaveId = new AtomicReference<>(); private final ByteBuffer handshakeHeaderBuffer = ByteBuffer.allocate(HANDSHAKE_SIZE); private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE); private final AutoSwitchHAService haService; @@ -114,7 +112,8 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { private final DefaultMessageStore messageStore; private final EpochFileCache epochCache; - private String localAddress; + private final Long brokerId; + private SocketChannel socketChannel; private Selector selector; private AbstractHAReader haReader; @@ -138,10 +137,11 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { private volatile int currentReceivedEpoch; public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore, - EpochFileCache epochCache) throws IOException { + EpochFileCache epochCache, Long brokerId) throws IOException { this.haService = haService; this.messageStore = defaultMessageStore; this.epochCache = epochCache; + this.brokerId = brokerId; init(); } @@ -183,17 +183,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { return AutoSwitchHAClient.class.getSimpleName(); } - public void setLocalAddress(String localAddress) { - this.localAddress = localAddress; - } - - public void updateSlaveId(Long newId) { - Long currentId = this.slaveId.get(); - if (this.slaveId.compareAndSet(currentId, newId)) { - LOGGER.info("Update slave Id, OLD: {}, New: {}", currentId, newId); - } - } - @Override public void updateMasterAddress(String newAddress) { String currentAddr = this.masterAddress.get(); @@ -300,7 +289,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { private boolean sendHandshakeHeader() throws IOException { this.handshakeHeaderBuffer.position(0); - this.handshakeHeaderBuffer.limit(HANDSHAKE_SIZE); + this.handshakeHeaderBuffer.limit(HANDSHAKE_HEADER_SIZE); // Original state this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal()); // IsSyncFromLastFile @@ -309,10 +298,8 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { // IsAsyncLearner role short isAsyncLearner = this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner() ? (short) 1 : (short) 0; this.handshakeHeaderBuffer.putShort(isAsyncLearner); - // Address length - this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 : this.localAddress.length()); - // Slave address - this.handshakeHeaderBuffer.put(this.localAddress == null ? new byte[0] : this.localAddress.getBytes(StandardCharsets.UTF_8)); + // Slave brokerId + this.handshakeHeaderBuffer.putLong(this.brokerId); this.handshakeHeaderBuffer.flip(); return this.haWriter.write(this.socketChannel, this.handshakeHeaderBuffer); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java index 7401574e5..383c5a4a7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.nio.charset.StandardCharsets; import java.util.List; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; @@ -92,7 +91,6 @@ public class AutoSwitchHAConnection implements HAConnection { private volatile boolean isSyncFromLastFile = false; private volatile boolean isAsyncLearner = false; private volatile long slaveId = -1; - private volatile String slaveAddress; /** * Last endOffset when master transfer data to slave @@ -161,10 +159,6 @@ public class AutoSwitchHAConnection implements HAConnection { return slaveId; } - public String getSlaveAddress() { - return slaveAddress; - } - @Override public HAConnectionState getCurrentState() { return currentState; @@ -220,8 +214,8 @@ public class AutoSwitchHAConnection implements HAConnection { private synchronized void maybeExpandInSyncStateSet(long slaveMaxOffset) { if (!this.isAsyncLearner && slaveMaxOffset >= this.lastMasterMaxOffset) { long caughtUpTimeMs = this.haService.getDefaultMessageStore().getMaxPhyOffset() == slaveMaxOffset ? System.currentTimeMillis() : this.lastTransferTimeMs; - this.haService.updateConnectionLastCaughtUpTime(this.slaveAddress, caughtUpTimeMs); - this.haService.maybeExpandInSyncStateSet(this.slaveAddress, slaveMaxOffset); + this.haService.updateConnectionLastCaughtUpTime(this.slaveId, caughtUpTimeMs); + this.haService.maybeExpandInSyncStateSet(this.slaveId, slaveMaxOffset); } } @@ -318,33 +312,25 @@ public class AutoSwitchHAConnection implements HAConnection { switch (slaveState) { case HANDSHAKE: - // AddressLength - int addressLength = byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 4); - if (diff < AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) { - processSuccess = false; - break; - } + // SlaveBrokerId + Long slaveBrokerId = byteBufferRead.getLong(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 8); + AutoSwitchHAConnection.this.slaveId = slaveBrokerId; // Flag(isSyncFromLastFile) - short syncFromLastFileFlag = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 8); + short syncFromLastFileFlag = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 12); if (syncFromLastFileFlag == 1) { AutoSwitchHAConnection.this.isSyncFromLastFile = true; } // Flag(isAsyncLearner role) - short isAsyncLearner = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 6); + short isAsyncLearner = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 10); if (isAsyncLearner == 1) { AutoSwitchHAConnection.this.isAsyncLearner = true; } - // Address - final byte[] addressData = new byte[addressLength]; - byteBufferRead.position(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE); - byteBufferRead.get(addressData); - AutoSwitchHAConnection.this.slaveAddress = new String(addressData, StandardCharsets.UTF_8); isSlaveSendHandshake = true; byteBufferRead.position(readSocketPos); - ReadSocketService.this.processPosition += AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength; - LOGGER.info("Receive slave handshake, slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", - AutoSwitchHAConnection.this.slaveAddress, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); + ReadSocketService.this.processPosition += AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE; + LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", + AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); break; case TRANSFER: long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4); @@ -356,7 +342,7 @@ public class AutoSwitchHAConnection implements HAConnection { } byteBufferRead.position(readSocketPos); maybeExpandInSyncStateSet(slaveMaxOffset); - AutoSwitchHAConnection.this.haService.updateConfirmOffsetWhenSlaveAck(AutoSwitchHAConnection.this.slaveAddress); + AutoSwitchHAConnection.this.haService.updateConfirmOffsetWhenSlaveAck(AutoSwitchHAConnection.this.slaveId); AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset); break; default: @@ -629,7 +615,7 @@ public class AutoSwitchHAConnection implements HAConnection { this.lastWriteOver = this.transferData(size); } else { // If size == 0, we should update the lastCatchupTimeMs - AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveAddress, System.currentTimeMillis()); + AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis()); haService.getWaitNotifyObject().allWaitForRunning(100); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index 7382587dc..b858cfcae 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -56,17 +56,15 @@ import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService; public class AutoSwitchHAService extends DefaultHAService { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_")); - private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>(); - private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>(); - private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>(); + private final List<Consumer<Set<Long/*brokerId*/>>> syncStateSetChangedListeners = new ArrayList<>(); + private final CopyOnWriteArraySet<Long/*brokerId*/> syncStateSet = new CopyOnWriteArraySet<>(); + private final ConcurrentHashMap<Long/*brokerId*/, Long/*lastCaughtUpTimestamp*/> connectionCaughtUpTimeTable = new ConcurrentHashMap<>(); + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private volatile long confirmOffset = -1; - private String localAddress; - private EpochFileCache epochCache; private AutoSwitchHAClient haClient; - private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public AutoSwitchHAService() { } @@ -93,8 +91,8 @@ public class AutoSwitchHAService extends DefaultHAService { @Override public void removeConnection(HAConnection conn) { if (!defaultMessageStore.isShutdown()) { - final Set<String> syncStateSet = getSyncStateSet(); - String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress(); + final Set<Long> syncStateSet = getSyncStateSet(); + Long slave = ((AutoSwitchHAConnection) conn).getSlaveId(); if (syncStateSet.contains(slave)) { syncStateSet.remove(slave); notifySyncStateSetChanged(syncStateSet); @@ -163,12 +161,10 @@ public class AutoSwitchHAService extends DefaultHAService { try { destroyConnections(); if (this.haClient == null) { - this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, this.epochCache); + this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, this.epochCache, slaveId); } else { this.haClient.reOpen(); } - this.haClient.setLocalAddress(this.localAddress); - this.haClient.updateSlaveId(slaveId); this.haClient.updateMasterAddress(newMasterAddr); this.haClient.updateHaMasterAddress(null); this.haClient.start(); @@ -213,15 +209,13 @@ public class AutoSwitchHAService extends DefaultHAService { public void updateMasterAddress(String newAddr) { } - public void registerSyncStateSetChangedListener(final Consumer<Set<String>> listener) { + public void registerSyncStateSetChangedListener(final Consumer<Set<Long>> listener) { this.syncStateSetChangedListeners.add(listener); } - public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) { + public void notifySyncStateSetChanged(final Set<Long> newSyncStateSet) { this.executorService.submit(() -> { - for (Consumer<Set<String>> listener : syncStateSetChangedListeners) { - listener.accept(newSyncStateSet); - } + syncStateSetChangedListeners.forEach(listener -> listener.accept(newSyncStateSet)); }); } @@ -229,15 +223,15 @@ public class AutoSwitchHAService extends DefaultHAService { * Check and maybe shrink the inSyncStateSet. * A slave will be removed from inSyncStateSet if (curTime - HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup) */ - public Set<String> maybeShrinkInSyncStateSet() { - final Set<String> newSyncStateSet = getSyncStateSet(); + public Set<Long> maybeShrinkInSyncStateSet() { + final Set<Long> newSyncStateSet = getSyncStateSet(); final long haMaxTimeSlaveNotCatchup = this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup(); - for (Map.Entry<String, Long> next : this.connectionCaughtUpTimeTable.entrySet()) { - final String slaveAddress = next.getKey(); - if (newSyncStateSet.contains(slaveAddress)) { - final Long lastCaughtUpTimeMs = this.connectionCaughtUpTimeTable.get(slaveAddress); + for (Map.Entry<Long, Long> next : this.connectionCaughtUpTimeTable.entrySet()) { + final Long slaveBrokerId = next.getKey(); + if (newSyncStateSet.contains(slaveBrokerId)) { + final Long lastCaughtUpTimeMs = next.getValue(); if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchup) { - newSyncStateSet.remove(slaveAddress); + newSyncStateSet.remove(slaveBrokerId); } } } @@ -248,25 +242,25 @@ public class AutoSwitchHAService extends DefaultHAService { * Check and maybe add the slave to inSyncStateSet. A slave will be added to inSyncStateSet if its slaveMaxOffset >= * current confirmOffset, and it is caught up to an offset within the current leader epoch. */ - public void maybeExpandInSyncStateSet(final String slaveAddress, final long slaveMaxOffset) { - final Set<String> currentSyncStateSet = getSyncStateSet(); - if (currentSyncStateSet.contains(slaveAddress)) { + public void maybeExpandInSyncStateSet(final Long slaveBrokerId, final long slaveMaxOffset) { + final Set<Long> currentSyncStateSet = getSyncStateSet(); + if (currentSyncStateSet.contains(slaveBrokerId)) { return; } final long confirmOffset = getConfirmOffset(); if (slaveMaxOffset >= confirmOffset) { final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry(); if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) { - currentSyncStateSet.add(slaveAddress); + currentSyncStateSet.add(slaveBrokerId); // Notify the upper layer that syncStateSet changed. notifySyncStateSetChanged(currentSyncStateSet); } } } - public void updateConnectionLastCaughtUpTime(final String slaveAddress, final long lastCaughtUpTimeMs) { - Long prevTime = ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, slaveAddress, k -> 0L); - this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime, lastCaughtUpTimeMs)); + public void updateConnectionLastCaughtUpTime(final Long slaveBrokerId, final long lastCaughtUpTimeMs) { + Long prevTime = ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, slaveBrokerId, k -> 0L); + this.connectionCaughtUpTimeTable.put(slaveBrokerId, Math.max(prevTime, lastCaughtUpTimeMs)); } /** @@ -285,8 +279,8 @@ public class AutoSwitchHAService extends DefaultHAService { return confirmOffset; } - public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) { - if (this.syncStateSet.contains(slaveAddress)) { + public void updateConfirmOffsetWhenSlaveAck(final Long slaveBrokerId) { + if (this.syncStateSet.contains(slaveBrokerId)) { this.confirmOffset = computeConfirmOffset(); } } @@ -330,7 +324,7 @@ public class AutoSwitchHAService extends DefaultHAService { cInfo.setTransferredByteInSecond(conn.getTransferredByteInSecond()); cInfo.setTransferFromWhere(conn.getTransferFromWhere()); - cInfo.setInSync(syncStateSet.contains(((AutoSwitchHAConnection) conn).getSlaveAddress())); + cInfo.setInSync(syncStateSet.contains(((AutoSwitchHAConnection) conn).getSlaveId())); info.getHaConnectionInfo().add(cInfo); } @@ -344,18 +338,18 @@ public class AutoSwitchHAService extends DefaultHAService { } private long computeConfirmOffset() { - final Set<String> currentSyncStateSet = getSyncStateSet(); + final Set<Long> currentSyncStateSet = getSyncStateSet(); long confirmOffset = this.defaultMessageStore.getMaxPhyOffset(); for (HAConnection connection : this.connectionList) { - final String slaveAddress = ((AutoSwitchHAConnection) connection).getSlaveAddress(); - if (currentSyncStateSet.contains(slaveAddress)) { + final Long slaveId = ((AutoSwitchHAConnection) connection).getSlaveId(); + if (currentSyncStateSet.contains(slaveId)) { confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset()); } } return confirmOffset; } - public void setSyncStateSet(final Set<String> syncStateSet) { + public void setSyncStateSet(final Set<Long> syncStateSet) { final Lock writeLock = readWriteLock.writeLock(); try { writeLock.lock(); @@ -367,12 +361,11 @@ public class AutoSwitchHAService extends DefaultHAService { } } - public Set<String> getSyncStateSet() { + public Set<Long> getSyncStateSet() { final Lock readLock = readWriteLock.readLock(); try { readLock.lock(); - HashSet<String> set = new HashSet<>(this.syncStateSet.size()); - set.addAll(this.syncStateSet); + HashSet<Long> set = new HashSet<>(this.syncStateSet); return set; } finally { readLock.unlock(); @@ -387,10 +380,6 @@ public class AutoSwitchHAService extends DefaultHAService { this.epochCache.truncateSuffixByOffset(offset); } - public void setLocalAddress(String localAddress) { - this.localAddress = localAddress; - } - /** * Try to truncate incomplete msg transferred from master. */ diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index bc392022d..d2e6d5c21 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -100,7 +100,7 @@ public class AutoSwitchHATest { storeConfig2 = new MessageStoreConfig(); storeConfig2.setBrokerRole(BrokerRole.SLAVE); - storeConfig1.setHaSendHeartbeatInterval(1000); + storeConfig2.setHaSendHeartbeatInterval(1000); storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#2"); storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "commitlog"); storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "EpochFileCache"); @@ -110,12 +110,12 @@ public class AutoSwitchHATest { buildMessageStoreConfig(storeConfig2, mappedFileSize); this.store2HaAddress = "127.0.0.1:10943"; - messageStore1 = buildMessageStore(storeConfig1, 0L); - messageStore2 = buildMessageStore(storeConfig2, 1L); + messageStore1 = buildMessageStore(storeConfig1, 1L); + messageStore2 = buildMessageStore(storeConfig2, 2L); storeConfig3 = new MessageStoreConfig(); storeConfig3.setBrokerRole(BrokerRole.SLAVE); - storeConfig1.setHaSendHeartbeatInterval(1000); + storeConfig3.setHaSendHeartbeatInterval(1000); storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#3"); storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#3" + File.separator + "commitlog"); storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#3" + File.separator + "EpochFileCache"); @@ -132,9 +132,9 @@ public class AutoSwitchHATest { messageStore2.start(); messageStore3.start(); - ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000"); - ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); - ((AutoSwitchHAService) this.messageStore3.getHaService()).setLocalAddress("127.0.0.1:8002"); +// ((AutoSwitchHAService) this.messageStore1.getHaService()).("127.0.0.1:8000"); +// ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); +// ((AutoSwitchHAService) this.messageStore3.getHaService()).setLocalAddress("127.0.0.1:8002"); } public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws Exception { @@ -162,16 +162,16 @@ public class AutoSwitchHATest { buildMessageStoreConfig(storeConfig2, mappedFileSize); this.store2HaAddress = "127.0.0.1:10943"; - messageStore1 = buildMessageStore(storeConfig1, 0L); - messageStore2 = buildMessageStore(storeConfig2, 1L); + messageStore1 = buildMessageStore(storeConfig1, 1L); + messageStore2 = buildMessageStore(storeConfig2, 2L); assertTrue(messageStore1.load()); assertTrue(messageStore2.load()); messageStore1.start(); messageStore2.start(); - ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000"); - ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); +// ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000"); +// ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); } private boolean changeMasterAndPutMessage(DefaultMessageStore master, MessageStoreConfig masterConfig, @@ -206,7 +206,7 @@ public class AutoSwitchHATest { public void testConfirmOffset() throws Exception { init(defaultMappedFileSize, true); // Step1, set syncStateSet, if both broker1 and broker2 are in syncStateSet, the confirmOffset will be computed as the min slaveAckOffset(broker2's ack) - ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Arrays.asList("127.0.0.1:8000", "127.0.0.1:8001"))); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Arrays.asList(1L, 2L))); boolean masterAndPutMessage = changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10); assertTrue(masterAndPutMessage); checkMessage(this.messageStore2, 10, 0); @@ -231,7 +231,7 @@ public class AutoSwitchHATest { assertTrue(messageStore2.load()); messageStore2.start(); messageStore2.getHaService().changeToMaster(2); - ((AutoSwitchHAService) messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001"))); + ((AutoSwitchHAService) messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(2L))); // Put message on master for (int i = 0; i < 10; i++) { @@ -252,7 +252,7 @@ public class AutoSwitchHATest { @Test public void testAsyncLearnerBrokerRole() throws Exception { init(defaultMappedFileSize); - ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L))); storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER); storeConfig2.setBrokerRole(BrokerRole.SLAVE); @@ -265,15 +265,15 @@ public class AutoSwitchHATest { messageStore1.putMessage(buildMessage()); } checkMessage(messageStore2, 10, 0); - final Set<String> syncStateSet = ((AutoSwitchHAService) this.messageStore1.getHaService()).getSyncStateSet(); - assertFalse(syncStateSet.contains("127.0.0.1:8001")); + final Set<Long> syncStateSet = ((AutoSwitchHAService) this.messageStore1.getHaService()).getSyncStateSet(); + assertFalse(syncStateSet.contains(2L)); } @Test public void testOptionAllAckInSyncStateSet() throws Exception { init(defaultMappedFileSize, true); - AtomicReference<Set<String>> syncStateSet = new AtomicReference<>(); - ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); + AtomicReference<Set<Long>> syncStateSet = new AtomicReference<>(); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L))); ((AutoSwitchHAService) this.messageStore1.getHaService()).registerSyncStateSetChangedListener(newSyncStateSet -> { syncStateSet.set(newSyncStateSet); }); @@ -281,9 +281,9 @@ public class AutoSwitchHATest { changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10); checkMessage(this.messageStore2, 10, 0); // Check syncStateSet - final Set<String> result = syncStateSet.get(); - assertTrue(result.contains("127.0.0.1:8000")); - assertTrue(result.contains("127.0.0.1:8001")); + final Set<Long> result = syncStateSet.get(); + assertTrue(result.contains(1L)); + assertTrue(result.contains(2L)); // Now, shutdown store2 this.messageStore2.shutdown(); @@ -304,12 +304,12 @@ public class AutoSwitchHATest { // Step1, change store1 to master, store2 to follower init(defaultMappedFileSize); - ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L))); changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10); checkMessage(this.messageStore2, 10, 0); // Step2, change store1 to follower, store2 to master, epoch = 2 - ((AutoSwitchHAService) this.messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001"))); + ((AutoSwitchHAService) this.messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(2L))); changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore1, 1, this.storeConfig1, 2, store2HaAddress, 10); checkMessage(this.messageStore1, 20, 0); @@ -322,7 +322,7 @@ public class AutoSwitchHATest { public void testAddBroker() throws Exception { // Step1: broker1 as leader, broker2 as follower init(defaultMappedFileSize); - ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L))); changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10); checkMessage(this.messageStore2, 10, 0); @@ -341,7 +341,7 @@ public class AutoSwitchHATest { // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs); // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270> - ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L))); changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10); checkMessage(this.messageStore2, 10, 0); @@ -378,7 +378,7 @@ public class AutoSwitchHATest { // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs); // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270> - ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L))); changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10); checkMessage(this.messageStore2, 10, 0); changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10); @@ -406,7 +406,7 @@ public class AutoSwitchHATest { checkMessage(messageStore3, 10, 10); // Step5: change broker2 as leader, broker3 as follower - ((AutoSwitchHAService) this.messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001"))); + ((AutoSwitchHAService) this.messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(2L))); changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore3, 3, this.storeConfig3, 3, this.store2HaAddress, 10); checkMessage(messageStore3, 20, 10); @@ -424,7 +424,7 @@ public class AutoSwitchHATest { // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs); // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270> - ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L))); changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10); checkMessage(this.messageStore2, 10, 0); changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);
