This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller 
by this push:
     new f5c5f5c4f [Summer of code] Shrink and expand InSyncStateSet (#4355)
f5c5f5c4f is described below

commit f5c5f5c4f0ac9fe4bfbea0cf7a94c7037abdf716
Author: hzh0425 <[email protected]>
AuthorDate: Tue May 24 18:12:45 2022 +0800

    [Summer of code] Shrink and expand InSyncStateSet (#4355)
    
    * feature: add lastCatchupTime ms and expandInSyncStateSet
    
    * code review
    
    * add shrink and expand inSyncStateSet in AutoSwitchHAService
    
    * add option allAckInSyncStateSet
    
    * let replicasManager use AutoSwitchHAService's expand and shrink 
inSyncStateSet api.
    
    * fix bug
    
    * use CopyOnWriteArraySet to replace lock
    
    * code review
    
    * code review
    
    * code review
    
    * code review
---
 .../broker/hacontroller/ReplicasManager.java       |  27 ++--
 .../java/org/apache/rocketmq/store/CommitLog.java  |  19 ++-
 .../rocketmq/store/config/MessageStoreConfig.java  |  27 ++++
 .../apache/rocketmq/store/ha/DefaultHAService.java |   7 -
 .../rocketmq/store/ha/GroupTransferService.java    |  45 ++++--
 .../org/apache/rocketmq/store/ha/HAService.java    |  12 --
 .../ha/autoswitch/AutoSwitchHAConnection.java      |  35 ++++-
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 166 ++++++++++++++-------
 .../store/ha/autoswitch/AutoSwitchHATest.java      |  86 ++++++++++-
 .../ha/{ => autoswitch}/EpochFileCacheTest.java    |   3 +-
 10 files changed, 320 insertions(+), 107 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index b59995ae2..41addf37c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -134,6 +134,9 @@ public class ReplicasManager {
         }
 
         schedulingSyncBrokerMetadata();
+
+        // Register syncStateSet changed listener.
+        
this.haService.registerSyncStateSetChangedListener(this::doReportSyncStateSetChanged);
         return true;
     }
 
@@ -346,7 +349,7 @@ public class ReplicasManager {
      */
     private void schedulingCheckSyncStateSet() {
         this.checkSyncStateSetTaskFuture = 
this.scheduledService.scheduleAtFixedRate(() -> {
-            final Set<String> newSyncStateSet = 
this.haService.getLatestSyncStateSet();
+            final Set<String> newSyncStateSet = 
this.haService.maybeShrinkInSyncStateSet();
             newSyncStateSet.add(this.localAddress);
             synchronized (this) {
                 if (this.syncStateSet != null) {
@@ -356,18 +359,22 @@ public class ReplicasManager {
                     }
                 }
             }
-            try {
-                final SyncStateSet result = 
this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch, 
newSyncStateSet, this.syncStateSetEpoch);
-                if (result != null) {
-                    changeSyncStateSet(result.getSyncStateSet(), 
result.getSyncStateSetEpoch());
-                }
-            } catch (final Exception e) {
-                LOGGER.error("Error happen when change sync state set, 
broker:{}, masterAddress:{}, masterEpoch:{}, oldSyncStateSet:{}, 
newSyncStateSet:{}, syncStateSetEpoch:{}",
-                    this.brokerConfig.getBrokerName(), this.masterAddress, 
this.masterEpoch, this.syncStateSet, newSyncStateSet, this.syncStateSetEpoch, 
e);
-            }
+            doReportSyncStateSetChanged(newSyncStateSet);
         }, 3 * 1000, 
this.brokerConfig.getReplicasManagerCheckSyncStateSetPeriod(), 
TimeUnit.MILLISECONDS);
     }
 
+    private void doReportSyncStateSetChanged(Set<String> newSyncStateSet) {
+        try {
+            final SyncStateSet result = 
this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch, 
newSyncStateSet, this.syncStateSetEpoch);
+            if (result != null) {
+                changeSyncStateSet(result.getSyncStateSet(), 
result.getSyncStateSetEpoch());
+            }
+        } catch (final Exception e) {
+            LOGGER.error("Error happen when change sync state set, broker:{}, 
masterAddress:{}, masterEpoch:{}, oldSyncStateSet:{}, newSyncStateSet:{}, 
syncStateSetEpoch:{}",
+                this.brokerConfig.getBrokerName(), this.masterAddress, 
this.masterEpoch, this.syncStateSet, newSyncStateSet, this.syncStateSetEpoch, 
e);
+        }
+    }
+
     private void stopCheckSyncStateSet() {
         if (this.checkSyncStateSetTaskFuture != null) {
             this.checkSyncStateSetTaskFuture.cancel(false);
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index be013e9d4..5a96b5345 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -805,7 +805,7 @@ public class CommitLog implements Swappable {
         boolean needHandleHA = needHandleHA(msg);
         int needAckNums = 1;
 
-        if (needHandleHA) {
+        if (needHandleHA && 
!this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
             int inSyncReplicas = 
Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
                 
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
             needAckNums = calcNeedAckNums(inSyncReplicas);
@@ -952,7 +952,7 @@ public class CommitLog implements Swappable {
         int needAckNums = 1;
         boolean needHandleHA = needHandleHA(messageExtBatch);
 
-        if (needHandleHA) {
+        if (needHandleHA && 
!this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
             int inSyncReplicas = 
Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
                 
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
             needAckNums = calcNeedAckNums(inSyncReplicas);
@@ -1103,7 +1103,8 @@ public class CommitLog implements Swappable {
 
     private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult 
result, PutMessageResult putMessageResult,
         int needAckNums) {
-        if (needAckNums <= 1) {
+        final boolean allAckInSyncStateSet = 
this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet();
+        if (needAckNums <= 1 && !allAckInSyncStateSet) {
             return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
         }
 
@@ -1123,7 +1124,7 @@ public class CommitLog implements Swappable {
 //        }
 
         // Wait enough acks from different slaves
-        GroupCommitRequest request = new GroupCommitRequest(nextOffset, 
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums 
- 1);
+        GroupCommitRequest request = new GroupCommitRequest(nextOffset, 
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums 
- 1, allAckInSyncStateSet);
         haService.putRequest(request);
         haService.getWaitNotifyObject().wakeupAll();
         return request.future();
@@ -1390,6 +1391,7 @@ public class CommitLog implements Swappable {
         private final CompletableFuture<PutMessageStatus> flushOKFuture = new 
CompletableFuture<>();
         private volatile int ackNums = 1;
         private final long deadLine;
+        private boolean allAckInSyncStateSet;
 
         public GroupCommitRequest(long nextOffset, long timeoutMillis) {
             this.nextOffset = nextOffset;
@@ -1401,6 +1403,11 @@ public class CommitLog implements Swappable {
             this.ackNums = ackNums;
         }
 
+        public GroupCommitRequest(long nextOffset, long timeoutMillis, int 
ackNums, boolean allAckInSyncStateSet) {
+            this(nextOffset, timeoutMillis, ackNums);
+            this.allAckInSyncStateSet = allAckInSyncStateSet;
+        }
+
         public long getNextOffset() {
             return nextOffset;
         }
@@ -1413,6 +1420,10 @@ public class CommitLog implements Swappable {
             return deadLine;
         }
 
+        public boolean isAllAckInSyncStateSet() {
+            return allAckInSyncStateSet;
+        }
+
         public void wakeupCustomer(final PutMessageStatus status) {
             this.flushOKFuture.complete(status);
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index a182e07cf..310865805 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -246,6 +246,12 @@ public class MessageStoreConfig {
     @ImportantField
     private int minInSyncReplicas = 1;
 
+    /**
+     * Each message must be written successfully to all replicas in 
InSyncStateSet.
+     */
+    @ImportantField
+    private boolean allAckInSyncStateSet = false;
+
     /**
      * Dynamically adjust in-sync replicas to provide higher availability, the 
real time in-sync replicas
      * will smaller than inSyncReplicas config.
@@ -264,6 +270,11 @@ public class MessageStoreConfig {
      */
     private long maxHaTransferByteInSecond = 100 * 1024 * 1024;
 
+    /**
+     * The max gap time that slave doesn't catch up to master.
+     */
+    private long haMaxTimeSlaveNotCatchup = 1000 * 15;
+
     /**
      * Sync flush offset from master when broker startup, used in upgrading 
from old version broker.
      */
@@ -1160,6 +1171,14 @@ public class MessageStoreConfig {
         this.minInSyncReplicas = minInSyncReplicas;
     }
 
+    public boolean isAllAckInSyncStateSet() {
+        return allAckInSyncStateSet;
+    }
+
+    public void setAllAckInSyncStateSet(boolean allAckInSyncStateSet) {
+        this.allAckInSyncStateSet = allAckInSyncStateSet;
+    }
+
     public boolean isEnableAutoInSyncReplicas() {
         return enableAutoInSyncReplicas;
     }
@@ -1184,6 +1203,14 @@ public class MessageStoreConfig {
         this.maxHaTransferByteInSecond = maxHaTransferByteInSecond;
     }
 
+    public long getHaMaxTimeSlaveNotCatchup() {
+        return haMaxTimeSlaveNotCatchup;
+    }
+
+    public void setHaMaxTimeSlaveNotCatchup(long haMaxTimeSlaveNotCatchup) {
+        this.haMaxTimeSlaveNotCatchup = haMaxTimeSlaveNotCatchup;
+    }
+
     public boolean isSyncMasterFlushOffsetWhenStartup() {
         return syncMasterFlushOffsetWhenStartup;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
index 3c346ff93..ea27288be 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
@@ -156,13 +156,6 @@ public class DefaultHAService implements HAService {
         return false;
     }
 
-    @Override public Set<String> getLatestSyncStateSet() {
-        return null;
-    }
-
-    @Override public void setSyncStateSet(Set<String> syncStateSet) {
-    }
-
     public void destroyConnections() {
         synchronized (this.connectionList) {
             for (HAConnection c : this.connectionList) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
index dc41279b5..67ada60dd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store.ha;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -26,6 +27,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.CommitLog;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAConnection;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 
 /**
  * GroupTransferService Service
@@ -71,28 +74,52 @@ public class GroupTransferService extends ServiceThread {
                     boolean transferOK = false;
 
                     long deadLine = req.getDeadLine();
+                    final boolean allAckInSyncStateSet = 
req.isAllAckInSyncStateSet();
 
                     for (int i = 0; !transferOK && deadLine - 
System.nanoTime() > 0; i++) {
                         if (i > 0) {
                             this.notifyTransferObject.waitForRunning(1000);
                         }
 
-                        if (req.getAckNums() <= 1) {
+                        if (req.getAckNums() <= 1 && !allAckInSyncStateSet) {
                             transferOK = 
haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
                             continue;
                         }
 
-                        int ackNums = 0;
-                        for (HAConnection conn : 
haService.getConnectionList()) {
-                            // TODO: We must ensure every HAConnection 
represents a different slave
-                            // Solution: Consider assign a unique and fixed 
IP:ADDR for each different slave
-                            if (conn.getSlaveAckOffset() >= 
req.getNextOffset()) {
-                                ackNums++;
-                            }
-                            if (ackNums >= req.getAckNums()) {
+                        if (allAckInSyncStateSet && this.haService instanceof 
AutoSwitchHAService) {
+                            // In this mode, we must wait for all replicas 
that in InSyncStateSet.
+                            final AutoSwitchHAService autoSwitchHAService = 
(AutoSwitchHAService) this.haService;
+                            final Set<String> syncStateSet = 
autoSwitchHAService.getSyncStateSet();
+                            if (syncStateSet.size() <= 1) {
+                                // Only master
                                 transferOK = true;
                                 break;
                             }
+                            // Include master.
+                            int ackNums = 1;
+                            for (HAConnection conn : 
haService.getConnectionList()) {
+                                final AutoSwitchHAConnection 
autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
+                                if 
(syncStateSet.contains(autoSwitchHAConnection.getClientAddress()) && 
autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
+                                    ackNums ++;
+                                }
+                                if (ackNums >= syncStateSet.size()) {
+                                    transferOK = true;
+                                    break;
+                                }
+                            }
+                        } else {
+                            int ackNums = 0;
+                            for (HAConnection conn : 
haService.getConnectionList()) {
+                                // TODO: We must ensure every HAConnection 
represents a different slave
+                                // Solution: Consider assign a unique and 
fixed IP:ADDR for each different slave
+                                if (conn.getSlaveAckOffset() >= 
req.getNextOffset()) {
+                                    ackNums++;
+                                }
+                                if (ackNums >= req.getAckNums()) {
+                                    transferOK = true;
+                                    break;
+                                }
+                            }
                         }
                     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index b5dbd1fac..4006ff774 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.store.ha;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
@@ -62,17 +61,6 @@ public interface HAService {
      */
     boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long 
slaveId);
 
-    /**
-     * Get the latest sync state set.
-     * @return new syncStateSet
-     */
-    Set<String> getLatestSyncStateSet();
-
-    /**
-     * Set sync state set
-     */
-    void setSyncStateSet(Set<String> syncStateSet);
-
     /**
      * Update master address
      *
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index c5993684b..f29ff283e 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -52,6 +52,8 @@ public class AutoSwitchHAConnection implements HAConnection {
     private final EpochFileCache epochCache;
     private final AbstractWriteSocketService writeSocketService;
     private final ReadSocketService readSocketService;
+    private final FlowMonitor flowMonitor;
+
     private volatile HAConnectionState currentState = 
HAConnectionState.HANDSHAKE;
     private volatile long slaveRequestOffset = -1;
     private volatile long slaveAckOffset = -1;
@@ -65,7 +67,18 @@ public class AutoSwitchHAConnection implements HAConnection {
     private volatile long slaveId = -1;
     private volatile String slaveAddress;
 
-    private final FlowMonitor flowMonitor;
+    /**
+     * Last endOffset when master transfer data to slave
+     */
+    private volatile long lastMasterMaxOffset = -1;
+    /**
+     * Last time ms when transfer data to slave.
+     */
+    private volatile long lastTransferTimeMs = 0;
+    /**
+     * Last catchup time ms when slaveAckOffset >= lastMasterMaxOffset.
+     */
+    private volatile long lastCatchUpTimeMs = 0;
 
     public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel 
socketChannel,
         EpochFileCache epochCache) throws IOException {
@@ -122,6 +135,10 @@ public class AutoSwitchHAConnection implements 
HAConnection {
         return slaveAddress;
     }
 
+    public long getLastCatchUpTimeMs() {
+        return lastCatchUpTimeMs;
+    }
+
     @Override public HAConnectionState getCurrentState() {
         return currentState;
     }
@@ -155,6 +172,18 @@ public class AutoSwitchHAConnection implements 
HAConnection {
         }
     }
 
+    private synchronized void updateLastTransferInfo() {
+        this.lastMasterMaxOffset = 
this.haService.getDefaultMessageStore().getMaxPhyOffset();
+        this.lastTransferTimeMs = System.currentTimeMillis();
+    }
+
+    private synchronized void maybeExpandInSyncStateSet(long slaveMaxOffset) {
+        if (slaveMaxOffset >= this.lastMasterMaxOffset) {
+            this.lastCatchUpTimeMs = Math.max(this.lastTransferTimeMs, 
this.lastCatchUpTimeMs);
+            this.haService.maybeExpandInSyncStateSet(this.slaveAddress, 
slaveMaxOffset);
+        }
+    }
+
     class ReadSocketService extends ServiceThread {
         private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
         private final Selector selector;
@@ -279,12 +308,13 @@ public class AutoSwitchHAConnection implements 
HAConnection {
                                     long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
                                     ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
 
-                                    slaveAckOffset = slaveMaxOffset;
+                                    AutoSwitchHAConnection.this.slaveAckOffset 
= slaveMaxOffset;
                                     if (slaveRequestOffset < 0) {
                                         slaveRequestOffset = slaveMaxOffset;
                                     }
                                     LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
                                     byteBufferRead.position(readSocketPos);
+                                    maybeExpandInSyncStateSet(slaveMaxOffset);
                                     
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
                                 }
                                 break;
@@ -547,6 +577,7 @@ public class AutoSwitchHAConnection implements HAConnection 
{
 
                 this.transferOffset = this.nextTransferFromWhere;
                 this.nextTransferFromWhere += size;
+                updateLastTransferInfo();
 
                 // Build Header
                 buildTransferHeaderBuffer(this.transferOffset, size);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index c5768e1b0..98b480d91 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -19,10 +19,16 @@ package org.apache.rocketmq.store.ha.autoswitch;
 
 import java.io.IOException;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
 import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -41,21 +47,22 @@ import 
org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
  */
 public class AutoSwitchHAService extends DefaultHAService {
     private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final ExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
+    private final List<Consumer<Set<String>>> syncStateSetChangedListeners = 
new ArrayList<>();
+    private final CopyOnWriteArraySet<String> syncStateSet = new 
CopyOnWriteArraySet<>();
+    private String localAddress;
+
     private EpochFileCache epochCache;
     private AutoSwitchHAClient haClient;
-    private volatile Set<String> syncStateSet;
-    private String localAddress;
 
     public AutoSwitchHAService() {
     }
 
-    @Override
-    public void init(final DefaultMessageStore defaultMessageStore) throws 
IOException {
+    @Override public void init(final DefaultMessageStore defaultMessageStore) 
throws IOException {
         this.epochCache = new 
EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
         this.epochCache.initCacheFromFile();
         this.defaultMessageStore = defaultMessageStore;
-        this.acceptSocketService =
-            new 
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+        this.acceptSocketService = new 
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
         this.groupTransferService = new GroupTransferService(this, 
defaultMessageStore);
         if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() 
== BrokerRole.SLAVE) {
             this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, 
this.epochCache);
@@ -63,8 +70,15 @@ public class AutoSwitchHAService extends DefaultHAService {
         this.haConnectionStateNotificationService = new 
HAConnectionStateNotificationService(this, defaultMessageStore);
     }
 
-    @Override
-    public boolean changeToMaster(int masterEpoch) {
+    @Override public void shutdown() {
+        super.shutdown();
+        if (this.haClient != null) {
+            this.haClient.shutdown();
+        }
+        this.executorService.shutdown();
+    }
+
+    @Override public boolean changeToMaster(int masterEpoch) {
         final int lastEpoch = this.epochCache.lastEpoch();
         if (masterEpoch <= lastEpoch) {
             return false;
@@ -89,12 +103,15 @@ public class AutoSwitchHAService extends DefaultHAService {
         this.epochCache.appendEntry(newEpochEntry);
 
         this.defaultMessageStore.recoverTopicQueueTable();
+
+        final HashSet<String> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add(this.localAddress);
+        setSyncStateSet(newSyncStateSet);
         LOGGER.info("Change ha to master success, newMasterEpoch:{}, 
startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
         return true;
     }
 
-    @Override
-    public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, 
Long slaveId) {
+    @Override public boolean changeToSlave(String newMasterAddr, int 
newMasterEpoch, Long slaveId) {
         final int lastEpoch = this.epochCache.lastEpoch();
         if (newMasterEpoch <= lastEpoch) {
             return false;
@@ -118,28 +135,93 @@ public class AutoSwitchHAService extends DefaultHAService 
{
         }
     }
 
-    @Override
-    public void setSyncStateSet(final Set<String> syncStateSet) {
-        this.syncStateSet = new HashSet<>(syncStateSet);
+    @Override public HAClient getHAClient() {
+        return this.haClient;
     }
 
-    @Override
-    public Set<String> getLatestSyncStateSet() {
-        final HashSet<String> newSyncStateSet = new 
HashSet<>(this.connectionList.size());
-        final long masterOffset = this.defaultMessageStore.getMaxPhyOffset();
-        for (HAConnection connection : this.connectionList) {
-            if (isInSyncSlave(masterOffset, connection)) {
-                final String slaveAddress = ((AutoSwitchHAConnection) 
connection).getSlaveAddress();
-                if (StringUtils.isNoneEmpty(slaveAddress)) {
-                    newSyncStateSet.add(slaveAddress);
+    @Override public void updateHaMasterAddress(String newAddr) {
+        if (this.haClient != null) {
+            this.haClient.updateHaMasterAddress(newAddr);
+        }
+    }
+
+    @Override public void updateMasterAddress(String newAddr) {
+    }
+
+    public void registerSyncStateSetChangedListener(final 
Consumer<Set<String>> listener) {
+        this.syncStateSetChangedListeners.add(listener);
+    }
+
+    public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) {
+        this.executorService.submit(() -> {
+            for (Consumer<Set<String>> listener : 
syncStateSetChangedListeners) {
+                listener.accept(newSyncStateSet);
+            }
+        });
+    }
+
+    /**
+     * Check and maybe shrink the inSyncStateSet.
+     * A slave will be removed from inSyncStateSet if (curTime - 
HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
+     */
+    public Set<String> maybeShrinkInSyncStateSet() {
+        final Set<String> currentSyncStateSet = getSyncStateSet();
+        final HashSet<String> newSyncStateSet = new 
HashSet<>(currentSyncStateSet);
+        final long haMaxTimeSlaveNotCatchup = 
this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
+        for (HAConnection haConnection : this.connectionList) {
+            final AutoSwitchHAConnection connection = (AutoSwitchHAConnection) 
haConnection;
+            final String slaveAddress = connection.getSlaveAddress();
+            if (currentSyncStateSet.contains(slaveAddress)) {
+                if ((System.currentTimeMillis() - 
connection.getLastCatchUpTimeMs()) > haMaxTimeSlaveNotCatchup) {
+                    newSyncStateSet.remove(slaveAddress);
                 }
             }
         }
         return newSyncStateSet;
     }
 
-    @Override public HAClient getHAClient() {
-        return this.haClient;
+    /**
+     * Check and maybe add the slave to inSyncStateSet.
+     * A slave will be added to inSyncStateSet if its slaveMaxOffset >= 
current confirmOffset, and it is caught up to
+     * an offset within the current leader epoch.
+     */
+    public void maybeExpandInSyncStateSet(final String slaveAddress, final 
long slaveMaxOffset) {
+        final Set<String> currentSyncStateSet = getSyncStateSet();
+        if (currentSyncStateSet.contains(slaveAddress)) {
+            return;
+        }
+        final long confirmOffset = getConfirmOffset();
+        if (slaveMaxOffset >= confirmOffset) {
+            final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
+            if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
+                currentSyncStateSet.add(slaveAddress);
+                // Notify the upper layer that syncStateSet changed.
+                notifySyncStateSetChanged(currentSyncStateSet);
+            }
+        }
+    }
+
+    /**
+     * Get confirm offset (min slaveAckOffset of all syncStateSet members)
+     */
+    public long getConfirmOffset() {
+        final Set<String> currentSyncStateSet = getSyncStateSet();
+        long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+        for (HAConnection connection : this.connectionList) {
+            if (currentSyncStateSet.contains(connection.getClientAddress())) {
+                confirmOffset = Math.min(confirmOffset, 
connection.getSlaveAckOffset());
+            }
+        }
+        return confirmOffset;
+    }
+
+    public synchronized void setSyncStateSet(final Set<String> syncStateSet) {
+        this.syncStateSet.clear();
+        this.syncStateSet.addAll(syncStateSet);
+    }
+
+    public synchronized Set<String> getSyncStateSet() {
+        return new HashSet<>(this.syncStateSet);
     }
 
     public void truncateEpochFilePrefix(final long offset) {
@@ -150,6 +232,10 @@ public class AutoSwitchHAService extends DefaultHAService {
         this.epochCache.truncateSuffixByOffset(offset);
     }
 
+    public void setLocalAddress(String localAddress) {
+        this.localAddress = localAddress;
+    }
+
     /**
      * Try to truncate incomplete msg transferred from master.
      */
@@ -201,36 +287,6 @@ public class AutoSwitchHAService extends DefaultHAService {
         return reputFromOffset;
     }
 
-    /**
-     * Get confirm offset (min slaveAckOffset of all syncStateSet)
-     */
-    public long getConfirmOffset() {
-        if (this.syncStateSet != null) {
-            final HashSet<String> syncStateSetCopy = new 
HashSet<>(this.syncStateSet);
-            long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
-            for (HAConnection connection : this.connectionList) {
-                if (syncStateSetCopy.contains(connection.getClientAddress())) {
-                    confirmOffset = Math.min(confirmOffset, 
connection.getSlaveAckOffset());
-                }
-            }
-            return confirmOffset;
-        }
-        return -1;
-    }
-
-    public void setLocalAddress(String localAddress) {
-        this.localAddress = localAddress;
-    }
-
-    @Override public void updateHaMasterAddress(String newAddr) {
-        if (this.haClient != null) {
-            this.haClient.updateHaMasterAddress(newAddr);
-        }
-    }
-
-    @Override public void updateMasterAddress(String newAddr) {
-    }
-
     class AutoSwitchAcceptSocketService extends AcceptSocketService {
 
         public AutoSwitchAcceptSocketService(int port) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 8f74fbbb0..777a62036 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -21,8 +21,10 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -31,6 +33,8 @@ import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MappedFileQueue;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -116,6 +120,39 @@ public class AutoSwitchHATest {
         messageStore3.start();
     }
 
+    public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws 
Exception {
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+        StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
0);
+        storeConfig1 = new MessageStoreConfig();
+        storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+        storeConfig1.setStorePathRootDir(storePathRootDir + File.separator + 
"broker1");
+        storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + 
"broker1" + File.separator + "commitlog");
+        storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + 
"broker1" + File.separator + "EpochFileCache");
+        storeConfig1.setAllAckInSyncStateSet(allAckInSyncStateSet);
+        buildMessageStoreConfig(storeConfig1, mappedFileSize);
+        this.store1HaAddress = "127.0.0.1:10912";
+
+        storeConfig2 = new MessageStoreConfig();
+        storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+        storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + 
"broker2");
+        storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + 
"broker2" + File.separator + "commitlog");
+        storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + 
"broker2" + File.separator + "EpochFileCache");
+        storeConfig2.setHaListenPort(10943);
+        storeConfig2.setAllAckInSyncStateSet(allAckInSyncStateSet);
+        buildMessageStoreConfig(storeConfig2, mappedFileSize);
+        this.store2HaAddress = "127.0.0.1:10943";
+
+        messageStore1 = buildMessageStore(storeConfig1, 0L);
+        messageStore2 = buildMessageStore(storeConfig2, 1L);
+
+        assertTrue(messageStore1.load());
+        assertTrue(messageStore2.load());
+        messageStore1.start();
+        messageStore2.start();
+    }
+
     private void changeMasterAndPutMessage(DefaultMessageStore master, 
MessageStoreConfig masterConfig,
         DefaultMessageStore slave, long slaveId, MessageStoreConfig 
slaveConfig, int epoch, String masterHaAddress,
         int totalPutMessageNums) throws Exception {
@@ -147,6 +184,37 @@ public class AutoSwitchHATest {
         }
     }
 
+    @Test
+    public void testOptionAllAckInSyncStateSet() throws Exception {
+        init(defaultMappedFileSize, true);
+        AtomicReference<Set<String>> syncStateSet = new AtomicReference<>();
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).registerSyncStateSetChangedListener((newSyncStateSet)
 -> {
+            System.out.println("Get newSyncStateSet:" + newSyncStateSet);
+            syncStateSet.set(newSyncStateSet);
+        });
+
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+        Thread.sleep(1000);
+        checkMessage(this.messageStore2, 10, 0);
+
+        Thread.sleep(1000);
+        // Check syncStateSet
+        final Set<String> result = syncStateSet.get();
+        assertTrue(result.contains("127.0.0.1:8000"));
+        assertTrue(result.contains("127.0.0.1:8001"));
+
+        // Now, shutdown store2
+        this.messageStore2.shutdown();
+        this.messageStore2.destroy();
+
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(result);
+
+        final PutMessageResult putMessageResult = 
this.messageStore1.putMessage(buildMessage());
+        assertEquals(putMessageResult.getPutMessageStatus(), 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+    }
+
     @Test
     public void testChangeRoleManyTimes() throws Exception {
         // Step1, change store1 to master, store2 to follower
@@ -292,12 +360,18 @@ public class AutoSwitchHATest {
     @After
     public void destroy() throws Exception {
         Thread.sleep(5000L);
-        messageStore2.shutdown();
-        messageStore2.destroy();
-        messageStore1.shutdown();
-        messageStore1.destroy();
-        messageStore3.shutdown();
-        messageStore3.destroy();
+        if (this.messageStore2 != null) {
+            messageStore2.shutdown();
+            messageStore2.destroy();
+        }
+        if (this.messageStore1 != null) {
+            messageStore1.shutdown();
+            messageStore1.destroy();
+        }
+        if (this.messageStore3 != null) {
+            messageStore3.shutdown();
+            messageStore3.destroy();
+        }
         File file = new File(storePathRootParentDir);
         UtilAll.deleteFile(file);
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCacheTest.java
similarity index 98%
rename from 
store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
rename to 
store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCacheTest.java
index aadf683a2..5fa4891ca 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCacheTest.java
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.ha;
+package org.apache.rocketmq.store.ha.autoswitch;
 
 import java.io.File;
 import java.nio.file.Paths;
 import org.apache.rocketmq.common.EpochEntry;
-import org.apache.rocketmq.store.ha.autoswitch.EpochFileCache;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

Reply via email to