This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 017ad11047 [ISSUE #7219] Fix Concurrent modify syncStateSet and Mark 
synchronizing frequently when shrink. (#7220)
017ad11047 is described below

commit 017ad110475e8024585327b44f47e5e97aabc63b
Author: echooymxq <[email protected]>
AuthorDate: Wed Aug 23 11:11:42 2023 +0800

    [ISSUE #7219] Fix Concurrent modify syncStateSet and Mark synchronizing 
frequently when shrink. (#7220)
---
 .../broker/controller/ReplicasManager.java         | 29 ++++++++++++----------
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 21 +++++++++-------
 2 files changed, 28 insertions(+), 22 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index abae7cdb01..37c82e434b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -542,7 +542,7 @@ public class ReplicasManager {
             
this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), 
brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
             this.tempBrokerMetadata.clear();
             this.brokerControllerId = this.brokerMetadata.getBrokerId();
-            this.haService.setBrokerControllerId(this.brokerControllerId);
+            this.haService.setLocalBrokerId(this.brokerControllerId);
             return true;
         } catch (Exception e) {
             LOGGER.error("fail to create metadata file", e);
@@ -594,7 +594,7 @@ public class ReplicasManager {
         if (this.brokerMetadata.isLoaded()) {
             this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
             this.brokerControllerId = brokerMetadata.getBrokerId();
-            this.haService.setBrokerControllerId(this.brokerControllerId);
+            this.haService.setLocalBrokerId(this.brokerControllerId);
             return;
         }
         // 2. check if temp metadata exist
@@ -735,23 +735,26 @@ public class ReplicasManager {
         if (this.checkSyncStateSetTaskFuture != null) {
             this.checkSyncStateSetTaskFuture.cancel(false);
         }
-        this.checkSyncStateSetTaskFuture = 
this.scheduledService.scheduleAtFixedRate(() -> {
-            checkSyncStateSetAndDoReport();
-        }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), 
TimeUnit.MILLISECONDS);
+        this.checkSyncStateSetTaskFuture = 
this.scheduledService.scheduleAtFixedRate(this::checkSyncStateSetAndDoReport, 3 
* 1000,
+            this.brokerConfig.getCheckSyncStateSetPeriod(), 
TimeUnit.MILLISECONDS);
     }
 
     private void checkSyncStateSetAndDoReport() {
-        final Set<Long> newSyncStateSet = 
this.haService.maybeShrinkSyncStateSet();
-        newSyncStateSet.add(this.brokerControllerId);
-        synchronized (this) {
-            if (this.syncStateSet != null) {
-                // Check if syncStateSet changed
-                if (this.syncStateSet.size() == newSyncStateSet.size() && 
this.syncStateSet.containsAll(newSyncStateSet)) {
-                    return;
+        try {
+            final Set<Long> newSyncStateSet = 
this.haService.maybeShrinkSyncStateSet();
+            newSyncStateSet.add(this.brokerControllerId);
+            synchronized (this) {
+                if (this.syncStateSet != null) {
+                    // Check if syncStateSet changed
+                    if (this.syncStateSet.size() == newSyncStateSet.size() && 
this.syncStateSet.containsAll(newSyncStateSet)) {
+                        return;
+                    }
                 }
             }
+            doReportSyncStateSetChanged(newSyncStateSet);
+        } catch (Exception e) {
+            LOGGER.error("Check syncStateSet error", e);
         }
-        doReportSyncStateSetChanged(newSyncStateSet);
     }
 
     private void doReportSyncStateSetChanged(Set<Long> newSyncStateSet) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 6dc734e0c9..d5393fdca4 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -41,6 +41,7 @@ import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -73,7 +74,7 @@ public class AutoSwitchHAService extends DefaultHAService {
     private EpochFileCache epochCache;
     private AutoSwitchHAClient haClient;
 
-    private Long brokerControllerId = null;
+    private Long localBrokerId = null;
 
     public AutoSwitchHAService() {
     }
@@ -287,9 +288,11 @@ public class AutoSwitchHAService extends DefaultHAService {
 
         // If the slaveBrokerId is in syncStateSet but not in 
connectionCaughtUpTimeTable,
         // it means that the broker has not connected.
-        for (Long slaveBrokerId : newSyncStateSet) {
-            if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) {
-                newSyncStateSet.remove(slaveBrokerId);
+        Iterator<Long> iterator = newSyncStateSet.iterator();
+        while (iterator.hasNext()) {
+            Long slaveBrokerId = iterator.next();
+            if (!Objects.equals(slaveBrokerId, this.localBrokerId) && 
!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) {
+                iterator.remove();
                 isSyncStateSetChanged = true;
             }
         }
@@ -419,7 +422,7 @@ public class AutoSwitchHAService extends DefaultHAService {
         // To avoid the syncStateSet is not consistent with connectionList.
         // Fix issue: https://github.com/apache/rocketmq/issues/6662
         for (Long syncId : currentSyncStateSet) {
-            if (!idList.contains(syncId) && this.brokerControllerId != null && 
!Objects.equals(syncId, this.brokerControllerId)) {
+            if (!idList.contains(syncId) && this.localBrokerId != null && 
!Objects.equals(syncId, this.localBrokerId)) {
                 LOGGER.warn("Slave {} is still in syncStateSet, but has lost 
its connection. So new offset can't be compute.", syncId);
                 // Without check and re-compute, return the confirmOffset's 
value directly.
                 return this.defaultMessageStore.getConfirmOffsetDirectly();
@@ -545,12 +548,12 @@ public class AutoSwitchHAService extends DefaultHAService 
{
         return this.epochCache.getAllEntries();
     }
 
-    public Long getBrokerControllerId() {
-        return brokerControllerId;
+    public Long getLocalBrokerId() {
+        return localBrokerId;
     }
 
-    public void setBrokerControllerId(Long brokerControllerId) {
-        this.brokerControllerId = brokerControllerId;
+    public void setLocalBrokerId(Long localBrokerId) {
+        this.localBrokerId = localBrokerId;
     }
 
     class AutoSwitchAcceptSocketService extends AcceptSocketService {

Reply via email to