This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f8279d621 Ensuring IoTconsensus data consistency during region 
migration (#12438)
a5f8279d621 is described below

commit a5f8279d621a966eb9a92ea972a8e8f5db210370
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Sun Apr 28 19:22:16 2024 +0800

    Ensuring IoTconsensus data consistency during region migration (#12438)
    
    * data consistency
    
    * data consistency
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  2 ++
 .../consensus/iot/IoTConsensusServerImpl.java      | 25 +++++++++++++++-------
 .../consensus/iot/logdispatcher/LogDispatcher.java |  3 +--
 3 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 01d86f851db..5a4832e43b2 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -341,6 +341,8 @@ public class IoTConsensus implements IConsensus {
             "[IoTConsensus] failed to cleanup side effects after failed to add 
remote peer", mpe);
       }
       throw new ConsensusException(e);
+    } finally {
+      impl.checkAndUnlockSafeDeletedSearchIndex();
     }
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index e886ad51b4c..34f2de4aec5 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -128,6 +128,7 @@ public class IoTConsensusServerImpl {
   private final ScheduledExecutorService backgroundTaskService;
   private final IoTConsensusRateLimiter ioTConsensusRateLimiter =
       IoTConsensusRateLimiter.getInstance();
+  private volatile long lastPinnedSearchIndexForMigration = -1;
 
   public IoTConsensusServerImpl(
       String storageDir,
@@ -514,7 +515,7 @@ public class IoTConsensusServerImpl {
       if (peer.equals(thisNode)) {
         // use searchIndex for thisNode as the initialSyncIndex because 
targetPeer will load the
         // snapshot produced by thisNode
-        buildSyncLogChannel(targetPeer, searchIndex.get());
+        buildSyncLogChannel(targetPeer, lastPinnedSearchIndexForMigration);
       } else {
         // use RPC to tell other peers to build sync log channel to target peer
         try (SyncIoTConsensusServiceClient client =
@@ -552,14 +553,13 @@ public class IoTConsensusServerImpl {
     // The configuration will be modified during iterating because we will add 
the targetPeer to
     // configuration
     ImmutableList<Peer> currentMembers = 
ImmutableList.copyOf(this.configuration);
+    removeSyncLogChannel(targetPeer);
     for (Peer peer : currentMembers) {
       if (peer.equals(targetPeer)) {
         // if the targetPeer is the same as current peer, skip it because 
removing itself is illegal
         continue;
       }
-      if (peer.equals(thisNode)) {
-        removeSyncLogChannel(targetPeer);
-      } else {
+      if (!peer.equals(thisNode)) {
         // use RPC to tell other peers to build sync log channel to target peer
         try (SyncIoTConsensusServiceClient client =
             syncClientManager.borrowClient(peer.getEndpoint())) {
@@ -770,7 +770,9 @@ public class IoTConsensusServerImpl {
   }
 
   public long getMinFlushedSyncIndex() {
-    return logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get);
+    return lastPinnedSearchIndexForMigration == -1
+        ? logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get)
+        : lastPinnedSearchIndexForMigration;
   }
 
   public String getStorageDir() {
@@ -887,9 +889,16 @@ public class IoTConsensusServerImpl {
    * lost.
    */
   public void checkAndLockSafeDeletedSearchIndex() {
-    if (configuration.size() == 1) {
-      consensusReqReader.setSafelyDeletedSearchIndex(searchIndex.get());
-    }
+    lastPinnedSearchIndexForMigration = searchIndex.get();
+    consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
+  }
+
+  /**
+   * We should unlock safelyDeletedSearchIndex after addPeer to avoid 
potential data accumulation.
+   */
+  public void checkAndUnlockSafeDeletedSearchIndex() {
+    lastPinnedSearchIndexForMigration = -1;
+    checkAndUpdateSafeDeletedSearchIndex();
   }
 
   /**
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index adf8218b46d..d7424f136d7 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -397,8 +397,7 @@ public class LogDispatcher {
       // safely.
       //
       // Use minFlushedSyncIndex here to reserve the WAL which are not flushed 
and support kill -9.
-      long currentSafelyDeletedSearchIndex = impl.getMinFlushedSyncIndex();
-      reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
+      reader.setSafelyDeletedSearchIndex(impl.getMinFlushedSyncIndex());
       // notify
       if (impl.unblockWrite()) {
         impl.signal();

Reply via email to