This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 36b3b4db7d89d3efb7e4dc481ed9d23809eebd9e Author: TheR1sing3un <[email protected]> AuthorDate: Sun Feb 5 23:15:41 2023 +0800 feat(broker): perfect logic test in broker 1. perfect logic test in broker --- .../broker/controller/ReplicasManager.java | 11 +++---- .../broker/processor/AdminBrokerProcessor.java | 2 +- .../broker/controller/ReplicasManagerTest.java | 34 +++++++++++++++------- .../store/ha/autoswitch/BrokerMetadata.java | 6 ++-- .../store/ha/autoswitch/TempBrokerMetadata.java | 8 ++--- 5 files changed, 37 insertions(+), 24 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 7bbe43e1e..4d2ed8e80 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -212,13 +212,13 @@ public class ReplicasManager { this.scheduledService.shutdown(); } - public synchronized void changeBrokerRole(final String newMasterAddress, final int newMasterEpoch, - final int syncStateSetEpoch, final long brokerId) { - if (StringUtils.isNoneEmpty(newMasterAddress) && newMasterEpoch > this.masterEpoch) { - if (StringUtils.equals(newMasterAddress, this.localAddress)) { + public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress, final Integer newMasterEpoch, + final Integer syncStateSetEpoch) { + if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) { + if (newMasterBrokerId.equals(this.brokerId)) { changeToMaster(newMasterEpoch, syncStateSetEpoch); } else { - changeToSlave(newMasterAddress, newMasterEpoch, brokerId); + changeToSlave(newMasterAddress, newMasterEpoch, newMasterBrokerId); } } } @@ -503,6 +503,7 @@ public class ReplicasManager { try { this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId()); this.tempBrokerMetadata.clear(); + this.brokerId = this.brokerMetadata.getBrokerId(); return true; } catch (Exception e) { LOGGER.error("fail to create metadata file", e); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 1723923d3..01fb084dd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2635,7 +2635,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final ReplicasManager replicasManager = this.brokerController.getReplicasManager(); if (replicasManager != null) { - replicasManager.changeBrokerRole(requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), requestHeader.getBrokerId()); + replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch()); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java index 0dc782e4e..2be54d0be 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java @@ -26,8 +26,12 @@ import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; @@ -69,7 +73,11 @@ public class ReplicasManagerTest { @Mock private BrokerOuterAPI brokerOuterAPI; - private RegisterBrokerToControllerResponseHeader registerBrokerToControllerResponseHeader; + private GetNextBrokerIdResponseHeader getNextBrokerIdResponseHeader; + + private ApplyBrokerIdResponseHeader applyBrokerIdResponseHeader; + + private RegisterSuccessResponseHeader registerSuccessResponseHeader; private ElectMasterResponseHeader brokerTryElectResponseHeader; @@ -83,9 +91,9 @@ public class ReplicasManagerTest { private static final String NEW_MASTER_ADDRESS = "192.168.1.2"; - private static final long MASTER_BROKER_ID = 0; + private static final long BROKER_ID_1 = 1; - private static final long SLAVE_BROKER_ID = 2; + private static final long BROKER_ID_2 = 2; private static final int OLD_MASTER_EPOCH = 2; private static final int NEW_MASTER_EPOCH = 3; @@ -100,7 +108,7 @@ public class ReplicasManagerTest { private static final long SCHEDULE_SERVICE_EXEC_PERIOD = 5; - private static final String SYNC_STATE = "1"; + private static final Long SYNC_STATE = 1L; @Before public void before() throws Exception { @@ -109,13 +117,15 @@ public class ReplicasManagerTest { brokerConfig = new BrokerConfig(); slaveSynchronize = new SlaveSynchronize(brokerController); getMetaDataResponseHeader = new GetMetaDataResponseHeader(GROUP, LEADER_ID, OLD_MASTER_ADDRESS, IS_LEADER, PEERS); - registerBrokerToControllerResponseHeader = new RegisterBrokerToControllerResponseHeader(); - registerBrokerToControllerResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS); + getNextBrokerIdResponseHeader = new GetNextBrokerIdResponseHeader(); + getNextBrokerIdResponseHeader.setNextBrokerId(BROKER_ID_1); + applyBrokerIdResponseHeader = new ApplyBrokerIdResponseHeader(); + registerSuccessResponseHeader = new RegisterSuccessResponseHeader(); brokerTryElectResponseHeader = new ElectMasterResponseHeader(); brokerTryElectResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS); getReplicaInfoResponseHeader = new GetReplicaInfoResponseHeader(); getReplicaInfoResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS); - getReplicaInfoResponseHeader.setBrokerId(MASTER_BROKER_ID); + getReplicaInfoResponseHeader.setMasterBrokerId(BROKER_ID_1); getReplicaInfoResponseHeader.setMasterEpoch(NEW_MASTER_EPOCH); syncStateSet = new SyncStateSet(Sets.newLinkedHashSet(SYNC_STATE), NEW_MASTER_EPOCH); result = new Pair<>(getReplicaInfoResponseHeader, syncStateSet); @@ -129,7 +139,9 @@ public class ReplicasManagerTest { when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS); when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader); when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true); - when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader); + when(brokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(getNextBrokerIdResponseHeader); + when(brokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(applyBrokerIdResponseHeader); + when(brokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(registerSuccessResponseHeader); when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result); when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(brokerTryElectResponseHeader); replicasManager = new ReplicasManager(brokerController); @@ -148,11 +160,11 @@ public class ReplicasManagerTest { @Test public void changeBrokerRoleTest() { // not equal to localAddress - Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, SLAVE_BROKER_ID)) + Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH)) .doesNotThrowAnyException(); // equal to localAddress - Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, SLAVE_BROKER_ID)) + Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH)) .doesNotThrowAnyException(); } @@ -163,7 +175,7 @@ public class ReplicasManagerTest { @Test public void changeToSlaveTest() { - Assertions.assertThatCode(() -> replicasManager.changeToSlave(NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, MASTER_BROKER_ID)) + Assertions.assertThatCode(() -> replicasManager.changeToSlave(NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, BROKER_ID_2)) .doesNotThrowAnyException(); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java index 24f400204..747eb4944 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java @@ -41,8 +41,8 @@ public class BrokerMetadata extends MetadataFile { @Override public String encodeToStr() { StringBuilder sb = new StringBuilder(); - sb.append(clusterName).append(";"); - sb.append(brokerName).append(";"); + sb.append(clusterName).append("#"); + sb.append(brokerName).append("#"); sb.append(brokerId); return sb.toString(); } @@ -50,7 +50,7 @@ public class BrokerMetadata extends MetadataFile { @Override public void decodeFromStr(String dataStr) { if (dataStr == null) return; - String[] dataArr = dataStr.split(";"); + String[] dataArr = dataStr.split("#"); this.clusterName = dataArr[0]; this.brokerName = dataArr[1]; this.brokerId = Long.valueOf(dataArr[2]); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java index 31b4aa5e8..f3b480522 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java @@ -52,9 +52,9 @@ public class TempBrokerMetadata extends MetadataFile { @Override public String encodeToStr() { StringBuilder sb = new StringBuilder(); - sb.append(clusterName).append(";"); - sb.append(brokerName).append(";"); - sb.append(brokerId).append(";"); + sb.append(clusterName).append("#"); + sb.append(brokerName).append("#"); + sb.append(brokerId).append("#"); sb.append(registerCheckCode); return sb.toString(); } @@ -62,7 +62,7 @@ public class TempBrokerMetadata extends MetadataFile { @Override public void decodeFromStr(String dataStr) { if (dataStr == null) return; - String[] dataArr = dataStr.split(";"); + String[] dataArr = dataStr.split("#"); this.clusterName = dataArr[0]; this.brokerName = dataArr[1]; this.brokerId = Long.valueOf(dataArr[2]);
