This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 36b3b4db7d89d3efb7e4dc481ed9d23809eebd9e
Author: TheR1sing3un <[email protected]>
AuthorDate: Sun Feb 5 23:15:41 2023 +0800

    feat(broker): perfect logic test in broker
    
    1. perfect logic test in broker
---
 .../broker/controller/ReplicasManager.java         | 11 +++----
 .../broker/processor/AdminBrokerProcessor.java     |  2 +-
 .../broker/controller/ReplicasManagerTest.java     | 34 +++++++++++++++-------
 .../store/ha/autoswitch/BrokerMetadata.java        |  6 ++--
 .../store/ha/autoswitch/TempBrokerMetadata.java    |  8 ++---
 5 files changed, 37 insertions(+), 24 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 7bbe43e1e..4d2ed8e80 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -212,13 +212,13 @@ public class ReplicasManager {
         this.scheduledService.shutdown();
     }
 
-    public synchronized void changeBrokerRole(final String newMasterAddress, 
final int newMasterEpoch,
-        final int syncStateSetEpoch, final long brokerId) {
-        if (StringUtils.isNoneEmpty(newMasterAddress) && newMasterEpoch > 
this.masterEpoch) {
-            if (StringUtils.equals(newMasterAddress, this.localAddress)) {
+    public synchronized void changeBrokerRole(final Long newMasterBrokerId, 
final String newMasterAddress, final Integer newMasterEpoch,
+        final Integer syncStateSetEpoch) {
+        if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
+            if (newMasterBrokerId.equals(this.brokerId)) {
                 changeToMaster(newMasterEpoch, syncStateSetEpoch);
             } else {
-                changeToSlave(newMasterAddress, newMasterEpoch, brokerId);
+                changeToSlave(newMasterAddress, newMasterEpoch, 
newMasterBrokerId);
             }
         }
     }
@@ -503,6 +503,7 @@ public class ReplicasManager {
         try {
             
this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), 
brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
             this.tempBrokerMetadata.clear();
+            this.brokerId = this.brokerMetadata.getBrokerId();
             return true;
         } catch (Exception e) {
             LOGGER.error("fail to create metadata file", e);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 1723923d3..01fb084dd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2635,7 +2635,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
         final ReplicasManager replicasManager = 
this.brokerController.getReplicasManager();
         if (replicasManager != null) {
-            replicasManager.changeBrokerRole(requestHeader.getMasterAddress(), 
requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), 
requestHeader.getBrokerId());
+            
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), 
requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), 
requestHeader.getSyncStateSetEpoch());
         }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 0dc782e4e..2be54d0be 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -26,8 +26,12 @@ import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
@@ -69,7 +73,11 @@ public class ReplicasManagerTest {
     @Mock
     private BrokerOuterAPI brokerOuterAPI;
 
-    private RegisterBrokerToControllerResponseHeader 
registerBrokerToControllerResponseHeader;
+    private GetNextBrokerIdResponseHeader getNextBrokerIdResponseHeader;
+
+    private ApplyBrokerIdResponseHeader applyBrokerIdResponseHeader;
+
+    private RegisterSuccessResponseHeader registerSuccessResponseHeader;
 
     private ElectMasterResponseHeader brokerTryElectResponseHeader;
 
@@ -83,9 +91,9 @@ public class ReplicasManagerTest {
 
     private static final String NEW_MASTER_ADDRESS = "192.168.1.2";
 
-    private static final long MASTER_BROKER_ID = 0;
+    private static final long BROKER_ID_1 = 1;
 
-    private static final long SLAVE_BROKER_ID = 2;
+    private static final long BROKER_ID_2 = 2;
 
     private static final int OLD_MASTER_EPOCH = 2;
     private static final int NEW_MASTER_EPOCH = 3;
@@ -100,7 +108,7 @@ public class ReplicasManagerTest {
 
     private static final long SCHEDULE_SERVICE_EXEC_PERIOD = 5;
 
-    private static final String SYNC_STATE = "1";
+    private static final Long SYNC_STATE = 1L;
 
     @Before
     public void before() throws Exception {
@@ -109,13 +117,15 @@ public class ReplicasManagerTest {
         brokerConfig = new BrokerConfig();
         slaveSynchronize = new SlaveSynchronize(brokerController);
         getMetaDataResponseHeader = new GetMetaDataResponseHeader(GROUP, 
LEADER_ID, OLD_MASTER_ADDRESS, IS_LEADER, PEERS);
-        registerBrokerToControllerResponseHeader = new 
RegisterBrokerToControllerResponseHeader();
-        
registerBrokerToControllerResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
+        getNextBrokerIdResponseHeader = new GetNextBrokerIdResponseHeader();
+        getNextBrokerIdResponseHeader.setNextBrokerId(BROKER_ID_1);
+        applyBrokerIdResponseHeader = new ApplyBrokerIdResponseHeader();
+        registerSuccessResponseHeader = new RegisterSuccessResponseHeader();
         brokerTryElectResponseHeader = new ElectMasterResponseHeader();
         brokerTryElectResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
         getReplicaInfoResponseHeader = new GetReplicaInfoResponseHeader();
         getReplicaInfoResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
-        getReplicaInfoResponseHeader.setBrokerId(MASTER_BROKER_ID);
+        getReplicaInfoResponseHeader.setMasterBrokerId(BROKER_ID_1);
         getReplicaInfoResponseHeader.setMasterEpoch(NEW_MASTER_EPOCH);
         syncStateSet = new SyncStateSet(Sets.newLinkedHashSet(SYNC_STATE), 
NEW_MASTER_EPOCH);
         result = new Pair<>(getReplicaInfoResponseHeader, syncStateSet);
@@ -129,7 +139,9 @@ public class ReplicasManagerTest {
         when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
         
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
         when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
-        when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), 
any(), anyLong(), anyInt(), anyLong(), 
anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
+        when(brokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(getNextBrokerIdResponseHeader);
+        when(brokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), 
any())).thenReturn(applyBrokerIdResponseHeader);
+        when(brokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), 
any())).thenReturn(registerSuccessResponseHeader);
         when(brokerOuterAPI.getReplicaInfo(any(), any(), 
any())).thenReturn(result);
         when(brokerOuterAPI.brokerElect(any(), any(), any(), 
any())).thenReturn(brokerTryElectResponseHeader);
         replicasManager = new ReplicasManager(brokerController);
@@ -148,11 +160,11 @@ public class ReplicasManagerTest {
     @Test
     public void changeBrokerRoleTest() {
         // not equal to localAddress
-        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, 
OLD_MASTER_EPOCH, SLAVE_BROKER_ID))
+        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS, 
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH))
             .doesNotThrowAnyException();
 
         // equal to localAddress
-        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, 
OLD_MASTER_EPOCH, SLAVE_BROKER_ID))
+        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS, 
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH))
             .doesNotThrowAnyException();
     }
 
@@ -163,7 +175,7 @@ public class ReplicasManagerTest {
 
     @Test
     public void changeToSlaveTest() {
-        Assertions.assertThatCode(() -> 
replicasManager.changeToSlave(NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, 
MASTER_BROKER_ID))
+        Assertions.assertThatCode(() -> 
replicasManager.changeToSlave(NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, 
BROKER_ID_2))
             .doesNotThrowAnyException();
     }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
index 24f400204..747eb4944 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
@@ -41,8 +41,8 @@ public class BrokerMetadata extends MetadataFile {
     @Override
     public String encodeToStr() {
         StringBuilder sb = new StringBuilder();
-        sb.append(clusterName).append(";");
-        sb.append(brokerName).append(";");
+        sb.append(clusterName).append("#");
+        sb.append(brokerName).append("#");
         sb.append(brokerId);
         return sb.toString();
     }
@@ -50,7 +50,7 @@ public class BrokerMetadata extends MetadataFile {
     @Override
     public void decodeFromStr(String dataStr) {
         if (dataStr == null) return;
-        String[] dataArr = dataStr.split(";");
+        String[] dataArr = dataStr.split("#");
         this.clusterName = dataArr[0];
         this.brokerName = dataArr[1];
         this.brokerId = Long.valueOf(dataArr[2]);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java
index 31b4aa5e8..f3b480522 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java
@@ -52,9 +52,9 @@ public class TempBrokerMetadata extends MetadataFile {
     @Override
     public String encodeToStr() {
         StringBuilder sb = new StringBuilder();
-        sb.append(clusterName).append(";");
-        sb.append(brokerName).append(";");
-        sb.append(brokerId).append(";");
+        sb.append(clusterName).append("#");
+        sb.append(brokerName).append("#");
+        sb.append(brokerId).append("#");
         sb.append(registerCheckCode);
         return sb.toString();
     }
@@ -62,7 +62,7 @@ public class TempBrokerMetadata extends MetadataFile {
     @Override
     public void decodeFromStr(String dataStr) {
         if (dataStr == null) return;
-        String[] dataArr = dataStr.split(";");
+        String[] dataArr = dataStr.split("#");
         this.clusterName = dataArr[0];
         this.brokerName = dataArr[1];
         this.brokerId = Long.valueOf(dataArr[2]);

Reply via email to