This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c05a0b222 [ISSUE #6377] Polish the code when broker change to master 
(#6378)
c05a0b222 is described below

commit c05a0b222c5d5593daffc984b6835022bd193652
Author: rongtong <[email protected]>
AuthorDate: Mon Mar 20 09:52:53 2023 +0800

    [ISSUE #6377] Polish the code when broker change to master (#6378)
    
    * Polish the code when broker change to master
    
    * Pass the check style
    
    * Polish the log
---
 .../broker/controller/ReplicasManager.java         | 89 +++++++++++++---------
 1 file changed, 51 insertions(+), 38 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index c83c823c0..5bdd1dbe9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -110,7 +110,7 @@ public class ReplicasManager {
         this.scheduledService = Executors.newScheduledThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ScheduledService_", 
brokerController.getBrokerIdentity()));
         this.executorService = Executors.newFixedThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ExecutorService_", 
brokerController.getBrokerIdentity()));
         this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
-                new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("ReplicasManager_scan_thread_", 
brokerController.getBrokerIdentity()));
+            new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("ReplicasManager_scan_thread_", 
brokerController.getBrokerIdentity()));
         this.haService = (AutoSwitchHAService) 
brokerController.getMessageStore().getHaService();
         this.brokerConfig = brokerController.getBrokerConfig();
         this.availableControllerAddresses = new ConcurrentHashMap<>();
@@ -166,7 +166,8 @@ public class ReplicasManager {
     }
 
     private boolean startBasicService() {
-        if (this.state == State.SHUTDOWN) return false;
+        if (this.state == State.SHUTDOWN)
+            return false;
         if (this.state == State.INITIAL) {
             if (schedulingSyncControllerMetadata()) {
                 this.state = State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE;
@@ -226,8 +227,9 @@ public class ReplicasManager {
         this.scanExecutor.shutdownNow();
     }
 
-    public synchronized void changeBrokerRole(final Long newMasterBrokerId, 
final String newMasterAddress, final Integer newMasterEpoch,
-                                              final Integer syncStateSetEpoch, 
final Set<Long> syncStateSet) {
+    public synchronized void changeBrokerRole(final Long newMasterBrokerId, 
final String newMasterAddress,
+        final Integer newMasterEpoch,
+        final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
         if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
             if (newMasterBrokerId.equals(this.brokerControllerId)) {
                 changeToMaster(newMasterEpoch, syncStateSetEpoch, 
syncStateSet);
@@ -244,9 +246,13 @@ public class ReplicasManager {
 
                 this.masterEpoch = newMasterEpoch;
                 if (this.masterBrokerId != null && 
this.masterBrokerId.equals(this.brokerControllerId) && 
this.brokerController.getBrokerConfig().getBrokerId() == MixAll.MASTER_ID) {
+                    // Change SyncStateSet
+                    final HashSet<Long> newSyncStateSet = new 
HashSet<>(syncStateSet);
+                    changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
                     // if master doesn't change
                     
this.haService.changeToMasterWhenLastRoleIsMaster(newMasterEpoch);
                     
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
+                    
this.executorService.submit(this::checkSyncStateSetAndDoReport);
                     registerBrokerWhenRoleChange();
                     return;
                 }
@@ -272,6 +278,7 @@ public class ReplicasManager {
                 schedulingCheckSyncStateSet();
 
                 
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
+                
this.executorService.submit(this::checkSyncStateSetAndDoReport);
                 registerBrokerWhenRoleChange();
             }
         }
@@ -281,7 +288,7 @@ public class ReplicasManager {
         synchronized (this) {
             if (newMasterEpoch > this.masterEpoch) {
                 LOGGER.info("Begin to change to slave, brokerName={}, 
brokerId={}, newMasterBrokerId={}, newMasterAddress={}, newMasterEpoch={}",
-                        this.brokerConfig.getBrokerName(), 
this.brokerControllerId, newMasterBrokerId, newMasterAddress, newMasterEpoch);
+                    this.brokerConfig.getBrokerName(), 
this.brokerControllerId, newMasterBrokerId, newMasterAddress, newMasterEpoch);
 
                 this.masterEpoch = newMasterEpoch;
                 if (newMasterBrokerId.equals(this.masterBrokerId)) {
@@ -318,19 +325,19 @@ public class ReplicasManager {
     }
 
     public void registerBrokerWhenRoleChange() {
-        String currentRole = 
this.brokerController.getMessageStoreConfig().getBrokerRole().equals(BrokerRole.SLAVE)
 ? "slave" : "master";
 
         this.executorService.submit(() -> {
             // Register broker to name-srv
             try {
                 this.brokerController.registerBrokerAll(true, false, 
this.brokerController.getBrokerConfig().isForceRegister());
             } catch (final Throwable e) {
-                LOGGER.error("Error happen when register broker to name-srv, 
Failed to change broker to {}", currentRole, e);
+                LOGGER.error("Error happen when register broker to name-srv, 
Failed to change broker to {}", 
this.brokerController.getMessageStoreConfig().getBrokerRole(), e);
                 return;
             }
             LOGGER.info("Change broker [id:{}][address:{}] to {}, 
newMasterBrokerId:{}, newMasterAddress:{}, newMasterEpoch:{}, 
syncStateSetEpoch:{}",
-                this.brokerControllerId, this.brokerAddress, currentRole, 
this.masterBrokerId, this.masterAddress, this.masterEpoch, 
this.syncStateSetEpoch);
+                this.brokerControllerId, this.brokerAddress, 
this.brokerController.getMessageStoreConfig().getBrokerRole(), 
this.masterBrokerId, this.masterAddress, this.masterEpoch, 
this.syncStateSetEpoch);
         });
+
     }
 
     private void changeSyncStateSet(final Set<Long> newSyncStateSet, final int 
newSyncStateSetEpoch) {
@@ -375,7 +382,7 @@ public class ReplicasManager {
         // Broker try to elect itself as a master in broker set.
         try {
             Pair<ElectMasterResponseHeader, Set<Long>> tryElectResponsePair = 
this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerClusterName(),
-                    this.brokerConfig.getBrokerName(), 
this.brokerControllerId);
+                this.brokerConfig.getBrokerName(), this.brokerControllerId);
             ElectMasterResponseHeader tryElectResponse = 
tryElectResponsePair.getObject1();
             Set<Long> syncStateSet = tryElectResponsePair.getObject2();
             final String masterAddress = tryElectResponse.getMasterAddress();
@@ -402,17 +409,17 @@ public class ReplicasManager {
         for (String controllerAddress : controllerAddresses) {
             if (StringUtils.isNotEmpty(controllerAddress)) {
                 this.brokerOuterAPI.sendHeartbeatToController(
-                        controllerAddress,
-                        this.brokerConfig.getBrokerClusterName(),
-                        this.brokerAddress,
-                        this.brokerConfig.getBrokerName(),
-                        this.brokerControllerId,
-                        this.brokerConfig.getSendHeartbeatTimeoutMillis(),
-                        this.brokerConfig.isInBrokerContainer(), 
this.getLastEpoch(),
-                        
this.brokerController.getMessageStore().getMaxPhyOffset(),
-                        this.getConfirmOffset(),
-                        this.brokerConfig.getControllerHeartBeatTimeoutMills(),
-                        this.brokerConfig.getBrokerElectionPriority()
+                    controllerAddress,
+                    this.brokerConfig.getBrokerClusterName(),
+                    this.brokerAddress,
+                    this.brokerConfig.getBrokerName(),
+                    this.brokerControllerId,
+                    this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                    this.brokerConfig.isInBrokerContainer(), 
this.getLastEpoch(),
+                    this.brokerController.getMessageStore().getMaxPhyOffset(),
+                    this.getConfirmOffset(),
+                    this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+                    this.brokerConfig.getBrokerElectionPriority()
                 );
             }
         }
@@ -520,7 +527,7 @@ public class ReplicasManager {
     private boolean applyBrokerId() {
         try {
             ApplyBrokerIdResponseHeader response = 
this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), 
brokerConfig.getBrokerName(),
-                    tempBrokerMetadata.getBrokerId(), 
tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+                tempBrokerMetadata.getBrokerId(), 
tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
             return true;
 
         } catch (Exception e) {
@@ -549,14 +556,16 @@ public class ReplicasManager {
     }
 
     /**
-     * Send registerBrokerToController request to inform controller that now 
broker has been registered successfully and controller should update broker 
ipAddress if changed
+     * Send registerBrokerToController request to inform controller that now 
broker has been registered successfully and
+     * controller should update broker ipAddress if changed
      *
      * @return whether request success
      */
     private boolean registerBrokerToController() {
         try {
             Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> 
responsePair = 
this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(),
 brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, 
controllerLeaderAddress);
-            if (responsePair == null) return false;
+            if (responsePair == null)
+                return false;
             RegisterBrokerToControllerResponseHeader response = 
responsePair.getObject1();
             Set<Long> syncStateSet = responsePair.getObject2();
             final Long masterBrokerId = response.getMasterBrokerId();
@@ -606,24 +615,24 @@ public class ReplicasManager {
         if (this.registerState == 
RegisterState.CREATE_TEMP_METADATA_FILE_DONE) {
             if (this.tempBrokerMetadata.getClusterName() == null || 
!this.tempBrokerMetadata.getClusterName().equals(this.brokerConfig.getBrokerClusterName()))
 {
                 LOGGER.error("The clusterName: {} in broker temp metadata is 
different from the clusterName: {} in broker config",
-                        this.tempBrokerMetadata.getClusterName(), 
this.brokerConfig.getBrokerClusterName());
+                    this.tempBrokerMetadata.getClusterName(), 
this.brokerConfig.getBrokerClusterName());
                 return false;
             }
             if (this.tempBrokerMetadata.getBrokerName() == null || 
!this.tempBrokerMetadata.getBrokerName().equals(this.brokerConfig.getBrokerName()))
 {
                 LOGGER.error("The brokerName: {} in broker temp metadata is 
different from the brokerName: {} in broker config",
-                        this.tempBrokerMetadata.getBrokerName(), 
this.brokerConfig.getBrokerName());
+                    this.tempBrokerMetadata.getBrokerName(), 
this.brokerConfig.getBrokerName());
                 return false;
             }
         }
         if (this.registerState == RegisterState.CREATE_METADATA_FILE_DONE) {
             if (this.brokerMetadata.getClusterName() == null || 
!this.brokerMetadata.getClusterName().equals(this.brokerConfig.getBrokerClusterName()))
 {
                 LOGGER.error("The clusterName: {} in broker metadata is 
different from the clusterName: {} in broker config",
-                        this.brokerMetadata.getClusterName(), 
this.brokerConfig.getBrokerClusterName());
+                    this.brokerMetadata.getClusterName(), 
this.brokerConfig.getBrokerClusterName());
                 return false;
             }
             if (this.brokerMetadata.getBrokerName() == null || 
!this.brokerMetadata.getBrokerName().equals(this.brokerConfig.getBrokerName())) 
{
                 LOGGER.error("The brokerName: {} in broker metadata is 
different from the brokerName: {} in broker config",
-                        this.brokerMetadata.getBrokerName(), 
this.brokerConfig.getBrokerName());
+                    this.brokerMetadata.getBrokerName(), 
this.brokerConfig.getBrokerName());
                 return false;
             }
         }
@@ -730,18 +739,22 @@ public class ReplicasManager {
             this.checkSyncStateSetTaskFuture.cancel(false);
         }
         this.checkSyncStateSetTaskFuture = 
this.scheduledService.scheduleAtFixedRate(() -> {
-            final Set<Long> newSyncStateSet = 
this.haService.maybeShrinkSyncStateSet();
-            newSyncStateSet.add(this.brokerControllerId);
-            synchronized (this) {
-                if (this.syncStateSet != null) {
-                    // Check if syncStateSet changed
-                    if (this.syncStateSet.size() == newSyncStateSet.size() && 
this.syncStateSet.containsAll(newSyncStateSet)) {
-                        return;
-                    }
+            checkSyncStateSetAndDoReport();
+        }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), 
TimeUnit.MILLISECONDS);
+    }
+
+    private void checkSyncStateSetAndDoReport() {
+        final Set<Long> newSyncStateSet = 
this.haService.maybeShrinkSyncStateSet();
+        newSyncStateSet.add(this.brokerControllerId);
+        synchronized (this) {
+            if (this.syncStateSet != null) {
+                // Check if syncStateSet changed
+                if (this.syncStateSet.size() == newSyncStateSet.size() && 
this.syncStateSet.containsAll(newSyncStateSet)) {
+                    return;
                 }
             }
-            doReportSyncStateSetChanged(newSyncStateSet);
-        }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), 
TimeUnit.MILLISECONDS);
+        }
+        doReportSyncStateSetChanged(newSyncStateSet);
     }
 
     private void doReportSyncStateSetChanged(Set<Long> newSyncStateSet) {
@@ -752,7 +765,7 @@ public class ReplicasManager {
             }
         } catch (final Exception e) {
             LOGGER.error("Error happen when change SyncStateSet, broker:{}, 
masterAddress:{}, masterEpoch:{}, oldSyncStateSet:{}, newSyncStateSet:{}, 
syncStateSetEpoch:{}",
-                    this.brokerConfig.getBrokerName(), this.masterAddress, 
this.masterEpoch, this.syncStateSet, newSyncStateSet, this.syncStateSetEpoch, 
e);
+                this.brokerConfig.getBrokerName(), this.masterAddress, 
this.masterEpoch, this.syncStateSet, newSyncStateSet, this.syncStateSetEpoch, 
e);
         }
     }
 

Reply via email to