This is an automated email from the ASF dual-hosted git repository. hzh0425 pushed a commit to branch fix/issue-5663 in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit c43d8187824551932f9ee71345f20f1864a94b42 Author: hzh0425 <[email protected]> AuthorDate: Sun Jan 1 16:17:36 2023 +0800 Add a new state 'isSynchronizingSyncStateSet' to Solve the problem of missing messages --- .../store/ha/autoswitch/AutoSwitchHAService.java | 74 ++++++++++++++++------ .../store/ha/autoswitch/AutoSwitchHATest.java | 58 ++++++++++++----- 2 files changed, 99 insertions(+), 33 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index f2b421ecd..932b4e2dd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -17,18 +17,6 @@ package org.apache.rocketmq.store.ha.autoswitch; -import java.io.IOException; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.Consumer; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; @@ -47,15 +35,31 @@ import org.apache.rocketmq.store.ha.HAClient; import org.apache.rocketmq.store.ha.HAConnection; import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService; +import java.io.IOException; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + /** * SwitchAble ha service, support switch role to master or slave. */ public class AutoSwitchHAService extends DefaultHAService { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_")); + private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>(); private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>(); private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>(); - private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>(); + private final Set<String> remoteSyncStateSet = new HashSet<>(); + // Indicate whether the syncStateSet is currently in the process of being synchronized to controller. + private volatile boolean isSynchronizingSyncStateSet = false; private volatile long confirmOffset = -1; private String localAddress; @@ -88,10 +92,11 @@ public class AutoSwitchHAService extends DefaultHAService { @Override public void removeConnection(HAConnection conn) { if (!defaultMessageStore.isShutdown()) { - final Set<String> syncStateSet = getSyncStateSet(); + final Set<String> syncStateSet = new HashSet<>(this.syncStateSet); String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress(); if (syncStateSet.contains(slave)) { syncStateSet.remove(slave); + markSynchronizingSyncStateSet(syncStateSet); notifySyncStateSetChanged(syncStateSet); } } @@ -225,7 +230,8 @@ public class AutoSwitchHAService extends DefaultHAService { * A slave will be removed from inSyncStateSet if (curTime - HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup) */ public Set<String> maybeShrinkInSyncStateSet() { - final Set<String> newSyncStateSet = getSyncStateSet(); + final Set<String> newSyncStateSet = new HashSet<>(this.syncStateSet); + boolean isSyncStateSetChanged = false; final long haMaxTimeSlaveNotCatchup = this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup(); for (Map.Entry<String, Long> next : this.connectionCaughtUpTimeTable.entrySet()) { final String slaveAddress = next.getKey(); @@ -233,9 +239,13 @@ public class AutoSwitchHAService extends DefaultHAService { final Long lastCaughtUpTimeMs = this.connectionCaughtUpTimeTable.get(slaveAddress); if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchup) { newSyncStateSet.remove(slaveAddress); + isSyncStateSetChanged = true; } } } + if (isSyncStateSetChanged) { + markSynchronizingSyncStateSet(newSyncStateSet); + } return newSyncStateSet; } @@ -244,7 +254,7 @@ public class AutoSwitchHAService extends DefaultHAService { * current confirmOffset, and it is caught up to an offset within the current leader epoch. */ public void maybeExpandInSyncStateSet(final String slaveAddress, final long slaveMaxOffset) { - final Set<String> currentSyncStateSet = getSyncStateSet(); + final Set<String> currentSyncStateSet = new HashSet<>(this.syncStateSet); if (currentSyncStateSet.contains(slaveAddress)) { return; } @@ -253,12 +263,28 @@ public class AutoSwitchHAService extends DefaultHAService { final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry(); if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) { currentSyncStateSet.add(slaveAddress); + markSynchronizingSyncStateSet(currentSyncStateSet); // Notify the upper layer that syncStateSet changed. notifySyncStateSetChanged(currentSyncStateSet); } } } + private synchronized void markSynchronizingSyncStateSet(final Set<String> newSyncStateSet) { + this.isSynchronizingSyncStateSet = true; + this.remoteSyncStateSet.clear(); + this.remoteSyncStateSet.addAll(newSyncStateSet); + } + + private synchronized void markSynchronizingSyncStateSetDone() { + this.isSynchronizingSyncStateSet = false; + this.remoteSyncStateSet.clear(); + } + + public boolean isSynchronizingSyncStateSet() { + return isSynchronizingSyncStateSet; + } + public void updateConnectionLastCaughtUpTime(final String slaveAddress, final long lastCaughtUpTimeMs) { Long prevTime = ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, slaveAddress, k -> 0L); this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime, lastCaughtUpTimeMs)); @@ -288,7 +314,11 @@ public class AutoSwitchHAService extends DefaultHAService { @Override public synchronized int inSyncReplicasNums(final long masterPutWhere) { - return syncStateSet.size(); + if (this.isSynchronizingSyncStateSet) { + return Math.max(this.syncStateSet.size(), this.remoteSyncStateSet.size()); + } else { + return this.syncStateSet.size(); + } } @Override @@ -345,13 +375,21 @@ public class AutoSwitchHAService extends DefaultHAService { } public synchronized void setSyncStateSet(final Set<String> syncStateSet) { + markSynchronizingSyncStateSetDone(); this.syncStateSet.clear(); this.syncStateSet.addAll(syncStateSet); this.confirmOffset = computeConfirmOffset(); } public synchronized Set<String> getSyncStateSet() { - return new HashSet<>(this.syncStateSet); + if (this.isSynchronizingSyncStateSet) { + // Return the union of the local and remote syncStateSets + Set<String> syncStateSet = new HashSet<>(this.syncStateSet); + syncStateSet.addAll(this.remoteSyncStateSet); + return syncStateSet; + } else { + return new HashSet<>(this.syncStateSet); + } } public void truncateEpochFilePrefix(final long offset) { diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index 93f35630d..aa15d3e36 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -17,21 +17,9 @@ package org.apache.rocketmq.store.ha.autoswitch; -import java.io.File; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExtBrokerInner; @@ -46,10 +34,24 @@ import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.apache.rocketmq.common.MixAll; import org.junit.After; -import org.junit.Test; +import org.junit.Assert; import org.junit.Assume; +import org.junit.Test; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; @@ -444,6 +446,32 @@ public class AutoSwitchHATest { checkMessage(messageStore3, 10, 10); } + @Test + public void testCheckSynchronizingSyncStateSetFlag() throws Exception { + // Step1: broker1 as leader, broker2 as follower + init(defaultMappedFileSize); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); + + changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10); + checkMessage(this.messageStore2, 10, 0); + AutoSwitchHAService masterHAService = (AutoSwitchHAService) this.messageStore1.getHaService(); + + // Step2: check flag SynchronizingSyncStateSet + Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet()); + Assert.assertEquals(masterHAService.getConfirmOffset(), 1570); + Set<String> syncStateSet = (masterHAService.getSyncStateSet()); + Assert.assertEquals(syncStateSet.size(), 2); + Assert.assertTrue(syncStateSet.contains("127.0.0.1:8001")); + + // Step3: set new syncStateSet + HashSet<String> newSyncStateSet = new HashSet<String>() {{ + add("127.0.0.1:8000"); + add("127.0.0.1:8001"); + }}; + masterHAService.setSyncStateSet(newSyncStateSet); + Assert.assertFalse(masterHAService.isSynchronizingSyncStateSet()); + } + @After public void destroy() throws Exception { if (this.messageStore2 != null) {
