This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b4fb7b6d96 [IOTDB-6257] Safely Delete IoT WAL with LastFlushedIndex 
To Support Kill -9 (#11614)
3b4fb7b6d96 is described below

commit 3b4fb7b6d96b55b4638ba0e3f7f518df8b5be726
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Nov 28 17:35:09 2023 +0800

    [IOTDB-6257] Safely Delete IoT WAL with LastFlushedIndex To Support Kill -9 
(#11614)
    
    * persist
    
    * seprate sync index and flusded index
    
    * rename
    
    * resolve commets
    
    * resolve commets
    
    * resolve commets
    
    * fix retry log
---
 .../consensus/iot/IoTConsensusServerImpl.java      | 23 +++++++++--------
 .../consensus/iot/IoTConsensusServerMetrics.java   |  2 +-
 .../consensus/iot/client/DispatchLogHandler.java   | 15 +++++++++--
 .../iot/logdispatcher/IndexController.java         |  2 --
 .../consensus/iot/logdispatcher/LogDispatcher.java | 14 ++++++++--
 .../service/IoTConsensusRPCServiceProcessor.java   |  2 +-
 .../apache/iotdb/consensus/iot/ReplicateTest.java  | 30 +++++++++-------------
 7 files changed, 51 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1f8b50ee2bb..e8f74cd0dfa 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -181,10 +181,7 @@ public class IoTConsensusServerImpl {
       ioTConsensusServerMetrics.recordGetStateMachineLockTime(
           getStateMachineLockTime - consensusWriteStartTime);
       if (needBlockWrite()) {
-        logger.info(
-            "[Throttle Down] index:{}, safeIndex:{}",
-            getSearchIndex(),
-            getCurrentSafelyDeletedSearchIndex());
+        logger.info("[Throttle Down] index:{}, safeIndex:{}", 
getSearchIndex(), getMinSyncIndex());
         try {
           boolean timeout =
               !stateMachineCondition.await(
@@ -213,7 +210,7 @@ public class IoTConsensusServerImpl {
         logger.info(
             "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
             thisNode.getGroupId(),
-            getCurrentSafelyDeletedSearchIndex(),
+            getMinSyncIndex(),
             indexedConsensusRequest.getSearchIndex());
       }
       IConsensusRequest planNode = 
stateMachine.deserializeRequest(indexedConsensusRequest);
@@ -560,7 +557,7 @@ public class IoTConsensusServerImpl {
    * @throws ConsensusGroupModifyPeerException
    */
   public void buildSyncLogChannel(Peer targetPeer) throws 
ConsensusGroupModifyPeerException {
-    buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
+    buildSyncLogChannel(targetPeer, getMinSyncIndex());
   }
 
   public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
@@ -674,10 +671,14 @@ public class IoTConsensusServerImpl {
    * In the case of multiple copies, the minimum synchronization index is 
selected. In the case of
    * single copies, the current index is selected
    */
-  public long getCurrentSafelyDeletedSearchIndex() {
+  public long getMinSyncIndex() {
     return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
   }
 
+  public long getMinFlushedSyncIndex() {
+    return logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get);
+  }
+
   public String getStorageDir() {
     return storageDir;
   }
@@ -695,8 +696,8 @@ public class IoTConsensusServerImpl {
   }
 
   public long getSyncLag() {
-    long safeIndex = getCurrentSafelyDeletedSearchIndex();
-    return getSearchIndex() - safeIndex;
+    long minSyncIndex = getMinSyncIndex();
+    return getSearchIndex() - minSyncIndex;
   }
 
   public IoTConsensusConfig getConfig() {
@@ -801,13 +802,13 @@ public class IoTConsensusServerImpl {
     if (configuration.size() == 1) {
       consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
     } else {
-      
consensusReqReader.setSafelyDeletedSearchIndex(getCurrentSafelyDeletedSearchIndex());
+      consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
     }
   }
 
   public void checkAndUpdateSearchIndex() {
     long currentSearchIndex = searchIndex.get();
-    long safelyDeletedSearchIndex = getCurrentSafelyDeletedSearchIndex();
+    long safelyDeletedSearchIndex = getMinFlushedSyncIndex();
     if (currentSearchIndex < safelyDeletedSearchIndex) {
       logger.warn(
           "The searchIndex for this region({}) is smaller than the 
safelyDeletedSearchIndex when "
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 736236f47d5..117f9712635 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -92,7 +92,7 @@ public class IoTConsensusServerMetrics implements IMetricSet {
         Metric.IOT_CONSENSUS.toString(),
         MetricLevel.IMPORTANT,
         impl,
-        IoTConsensusServerImpl::getCurrentSafelyDeletedSearchIndex,
+        IoTConsensusServerImpl::getMinSyncIndex,
         Tag.NAME.toString(),
         IMPL,
         Tag.REGION.toString(),
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 93d63ea6389..2814c6fb833 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.iot.client;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
 import 
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
@@ -30,7 +31,9 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRes> {
 
@@ -55,16 +58,24 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
   @Override
   public void onComplete(TSyncLogEntriesRes response) {
     if (response.getStatuses().stream().anyMatch(status -> 
needRetry(status.getCode()))) {
+      List<String> retryStatusMessages =
+          response.getStatuses().stream()
+              .filter(status -> needRetry(status.getCode()))
+              .map(TSStatus::getMessage)
+              .collect(Collectors.toList());
+
+      String messages = String.join(", ", retryStatusMessages);
       logger.warn(
           "Can not send {} to peer {} for {} times because {}",
           batch,
           thread.getPeer(),
           ++retryCount,
-          response.getStatuses().get(0).getMessage());
+          messages);
       sleepCorrespondingTimeAndRetryAsynchronous();
     } else {
       thread.getSyncStatus().removeBatch(batch);
-      // update safely deleted search index after current sync index is 
updated by removeBatch
+      // update safely deleted search index after last flushed sync index may 
be updated by
+      // removeBatch
       thread.updateSafelyDeletedSearchIndex();
     }
     logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - 
createTime);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index b233acb8ea9..61ace504758 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.ratis.utils.Utils;
 
@@ -97,7 +96,6 @@ public class IndexController {
     }
   }
 
-  @TestOnly
   public long getLastFlushedIndex() {
     return lastFlushedIndex;
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 93c931b84b6..4ff4ed86104 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -153,6 +153,10 @@ public class LogDispatcher {
     return 
threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
   }
 
+  public synchronized OptionalLong getMinFlushedSyncIndex() {
+    return 
threads.stream().mapToLong(LogDispatcherThread::getLastFlushedSyncIndex).min();
+  }
+
   public void offer(IndexedConsensusRequest request) {
     // we don't need to serialize and offer request when replicaNum is 1.
     if (!threads.isEmpty()) {
@@ -233,6 +237,10 @@ public class LogDispatcher {
       return controller.getCurrentIndex();
     }
 
+    public long getLastFlushedSyncIndex() {
+      return controller.getLastFlushedIndex();
+    }
+
     public Peer getPeer() {
       return peer;
     }
@@ -343,8 +351,10 @@ public class LogDispatcher {
     public void updateSafelyDeletedSearchIndex() {
       // update safely deleted search index to delete outdated info,
       // indicating that insert nodes whose search index are before this value 
can be deleted
-      // safely
-      long currentSafelyDeletedSearchIndex = 
impl.getCurrentSafelyDeletedSearchIndex();
+      // safely.
+      //
+      // Use minFlushedSyncIndex here to reserve the WAL which are not flushed 
and support kill -9.
+      long currentSafelyDeletedSearchIndex = impl.getMinFlushedSyncIndex();
       reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
       // notify
       if (impl.unblockWrite()) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 9f795c2f536..2a80ea3281a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -245,7 +245,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       return;
     }
     long searchIndex = impl.getSearchIndex();
-    long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
+    long safeIndex = impl.getMinSyncIndex();
     resultHandler.onComplete(
         new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, 
safeIndex));
   }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 03903319cee..0e7eb9e1559 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -182,7 +182,7 @@ public class ReplicateTest {
 
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
-      while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() 
< CHECK_POINT_GAP) {
+      while (servers.get(i).getImpl(gid).getMinSyncIndex() < CHECK_POINT_GAP) {
         long current = System.currentTimeMillis();
         if ((current - start) > 60 * 1000) {
           Assert.fail("Unable to replicate entries");
@@ -191,12 +191,9 @@ public class ReplicateTest {
       }
     }
 
-    Assert.assertEquals(
-        CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
-    Assert.assertEquals(
-        CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
-    Assert.assertEquals(
-        CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getMinSyncIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getMinSyncIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getMinSyncIndex());
     Assert.assertEquals(CHECK_POINT_GAP * 3, 
stateMachines.get(0).getRequestSet().size());
     Assert.assertEquals(CHECK_POINT_GAP * 3, 
stateMachines.get(1).getRequestSet().size());
     Assert.assertEquals(CHECK_POINT_GAP * 3, 
stateMachines.get(2).getRequestSet().size());
@@ -217,7 +214,7 @@ public class ReplicateTest {
 
       for (int i = 0; i < 3; i++) {
         long start = System.currentTimeMillis();
-        while 
(servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() < 
CHECK_POINT_GAP) {
+        while (servers.get(i).getImpl(gid).getMinSyncIndex() < 
CHECK_POINT_GAP) {
           long current = System.currentTimeMillis();
           if ((current - start) > 60 * 1000) {
             Assert.fail("Unable to recover entries");
@@ -226,12 +223,9 @@ public class ReplicateTest {
         }
       }
 
-      Assert.assertEquals(
-          CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
-      Assert.assertEquals(
-          CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
-      Assert.assertEquals(
-          CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+      Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getMinSyncIndex());
+      Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getMinSyncIndex());
+      Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getMinSyncIndex());
 
     } catch (IOException e) {
       if (e.getCause() instanceof StartupException) {
@@ -265,8 +259,8 @@ public class ReplicateTest {
       Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
     }
 
-    Assert.assertEquals(0, 
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
-    Assert.assertEquals(0, 
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getMinSyncIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getMinSyncIndex());
 
     try {
       stopServer();
@@ -286,10 +280,10 @@ public class ReplicateTest {
         long start = System.currentTimeMillis();
         // should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
         // replicating all entries
-        while 
(servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() < 
CHECK_POINT_GAP) {
+        while (servers.get(i).getImpl(gid).getMinSyncIndex() < 
CHECK_POINT_GAP) {
           long current = System.currentTimeMillis();
           if ((current - start) > 60 * 1000) {
-            logger.error("{}", 
servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+            logger.error("{}", servers.get(i).getImpl(gid).getMinSyncIndex());
             Assert.fail("Unable to replicate entries");
           }
           Thread.sleep(100);

Reply via email to