This is an automated email from the ASF dual-hosted git repository.

hzh0425 pushed a commit to branch fix/issue-5663
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c43d8187824551932f9ee71345f20f1864a94b42
Author: hzh0425 <[email protected]>
AuthorDate: Sun Jan 1 16:17:36 2023 +0800

    Add a new state 'isSynchronizingSyncStateSet' to Solve the problem of 
missing messages
---
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 74 ++++++++++++++++------
 .../store/ha/autoswitch/AutoSwitchHATest.java      | 58 ++++++++++++-----
 2 files changed, 99 insertions(+), 33 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index f2b421ecd..932b4e2dd 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -17,18 +17,6 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Consumer;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
@@ -47,15 +35,31 @@ import org.apache.rocketmq.store.ha.HAClient;
 import org.apache.rocketmq.store.ha.HAConnection;
 import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
 
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
 /**
  * SwitchAble ha service, support switch role to master or slave.
  */
 public class AutoSwitchHAService extends DefaultHAService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(new 
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
+    private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable 
= new ConcurrentHashMap<>();
     private final List<Consumer<Set<String>>> syncStateSetChangedListeners = 
new ArrayList<>();
     private final CopyOnWriteArraySet<String> syncStateSet = new 
CopyOnWriteArraySet<>();
-    private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable 
= new ConcurrentHashMap<>();
+    private final Set<String> remoteSyncStateSet = new HashSet<>();
+    //  Indicate whether the syncStateSet is currently in the process of being 
synchronized to controller.
+    private volatile boolean isSynchronizingSyncStateSet = false;
     private volatile long confirmOffset = -1;
 
     private String localAddress;
@@ -88,10 +92,11 @@ public class AutoSwitchHAService extends DefaultHAService {
     @Override
     public void removeConnection(HAConnection conn) {
         if (!defaultMessageStore.isShutdown()) {
-            final Set<String> syncStateSet = getSyncStateSet();
+            final Set<String> syncStateSet = new HashSet<>(this.syncStateSet);
             String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress();
             if (syncStateSet.contains(slave)) {
                 syncStateSet.remove(slave);
+                markSynchronizingSyncStateSet(syncStateSet);
                 notifySyncStateSetChanged(syncStateSet);
             }
         }
@@ -225,7 +230,8 @@ public class AutoSwitchHAService extends DefaultHAService {
      * A slave will be removed from inSyncStateSet if (curTime - 
HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
      */
     public Set<String> maybeShrinkInSyncStateSet() {
-        final Set<String> newSyncStateSet = getSyncStateSet();
+        final Set<String> newSyncStateSet = new HashSet<>(this.syncStateSet);
+        boolean isSyncStateSetChanged = false;
         final long haMaxTimeSlaveNotCatchup = 
this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
         for (Map.Entry<String, Long> next : 
this.connectionCaughtUpTimeTable.entrySet()) {
             final String slaveAddress = next.getKey();
@@ -233,9 +239,13 @@ public class AutoSwitchHAService extends DefaultHAService {
                 final Long lastCaughtUpTimeMs = 
this.connectionCaughtUpTimeTable.get(slaveAddress);
                 if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > 
haMaxTimeSlaveNotCatchup) {
                     newSyncStateSet.remove(slaveAddress);
+                    isSyncStateSetChanged = true;
                 }
             }
         }
+        if (isSyncStateSetChanged) {
+            markSynchronizingSyncStateSet(newSyncStateSet);
+        }
         return newSyncStateSet;
     }
 
@@ -244,7 +254,7 @@ public class AutoSwitchHAService extends DefaultHAService {
      * current confirmOffset, and it is caught up to an offset within the 
current leader epoch.
      */
     public void maybeExpandInSyncStateSet(final String slaveAddress, final 
long slaveMaxOffset) {
-        final Set<String> currentSyncStateSet = getSyncStateSet();
+        final Set<String> currentSyncStateSet = new 
HashSet<>(this.syncStateSet);
         if (currentSyncStateSet.contains(slaveAddress)) {
             return;
         }
@@ -253,12 +263,28 @@ public class AutoSwitchHAService extends DefaultHAService 
{
             final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
             if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
                 currentSyncStateSet.add(slaveAddress);
+                markSynchronizingSyncStateSet(currentSyncStateSet);
                 // Notify the upper layer that syncStateSet changed.
                 notifySyncStateSetChanged(currentSyncStateSet);
             }
         }
     }
 
+    private synchronized void markSynchronizingSyncStateSet(final Set<String> 
newSyncStateSet) {
+        this.isSynchronizingSyncStateSet = true;
+        this.remoteSyncStateSet.clear();
+        this.remoteSyncStateSet.addAll(newSyncStateSet);
+    }
+
+    private synchronized void markSynchronizingSyncStateSetDone() {
+        this.isSynchronizingSyncStateSet = false;
+        this.remoteSyncStateSet.clear();
+    }
+
+    public boolean isSynchronizingSyncStateSet() {
+        return isSynchronizingSyncStateSet;
+    }
+
     public void updateConnectionLastCaughtUpTime(final String slaveAddress, 
final long lastCaughtUpTimeMs) {
         Long prevTime = 
ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, 
slaveAddress, k -> 0L);
         this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime, 
lastCaughtUpTimeMs));
@@ -288,7 +314,11 @@ public class AutoSwitchHAService extends DefaultHAService {
 
     @Override
     public synchronized int inSyncReplicasNums(final long masterPutWhere) {
-        return syncStateSet.size();
+        if (this.isSynchronizingSyncStateSet) {
+            return Math.max(this.syncStateSet.size(), 
this.remoteSyncStateSet.size());
+        } else {
+            return this.syncStateSet.size();
+        }
     }
 
     @Override
@@ -345,13 +375,21 @@ public class AutoSwitchHAService extends DefaultHAService 
{
     }
 
     public synchronized void setSyncStateSet(final Set<String> syncStateSet) {
+        markSynchronizingSyncStateSetDone();
         this.syncStateSet.clear();
         this.syncStateSet.addAll(syncStateSet);
         this.confirmOffset = computeConfirmOffset();
     }
 
     public synchronized Set<String> getSyncStateSet() {
-        return new HashSet<>(this.syncStateSet);
+        if (this.isSynchronizingSyncStateSet) {
+            // Return the union of the local and remote syncStateSets
+            Set<String> syncStateSet = new HashSet<>(this.syncStateSet);
+            syncStateSet.addAll(this.remoteSyncStateSet);
+            return syncStateSet;
+        } else {
+            return new HashSet<>(this.syncStateSet);
+        }
     }
 
     public void truncateEpochFilePrefix(final long offset) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 93f35630d..aa15d3e36 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -17,21 +17,9 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -46,10 +34,24 @@ import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.apache.rocketmq.common.MixAll;
 import org.junit.After;
-import org.junit.Test;
+import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
@@ -444,6 +446,32 @@ public class AutoSwitchHATest {
         checkMessage(messageStore3, 10, 10);
     }
 
+    @Test
+    public void testCheckSynchronizingSyncStateSetFlag() throws Exception {
+        // Step1: broker1 as leader, broker2 as follower
+        init(defaultMappedFileSize);
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+        AutoSwitchHAService masterHAService = (AutoSwitchHAService) 
this.messageStore1.getHaService();
+
+        // Step2: check flag SynchronizingSyncStateSet
+        Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
+        Assert.assertEquals(masterHAService.getConfirmOffset(), 1570);
+        Set<String> syncStateSet = (masterHAService.getSyncStateSet());
+        Assert.assertEquals(syncStateSet.size(), 2);
+        Assert.assertTrue(syncStateSet.contains("127.0.0.1:8001"));
+
+        // Step3: set new syncStateSet
+        HashSet<String> newSyncStateSet = new HashSet<String>() {{
+            add("127.0.0.1:8000");
+            add("127.0.0.1:8001");
+        }};
+        masterHAService.setSyncStateSet(newSyncStateSet);
+        Assert.assertFalse(masterHAService.isSynchronizingSyncStateSet());
+    }
+
     @After
     public void destroy() throws Exception {
         if (this.messageStore2 != null) {

Reply via email to