This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_ml_wal_no_delete
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b1023623d04c33afc412565ed1aadd6c49252b8a
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Fri Sep 23 17:17:33 2022 +0800

    Fix the issue that the wal won't be deleted when leader transfer to follower
---
 .../multileader/logdispatcher/LogDispatcher.java   |  4 +---
 .../java/org/apache/iotdb/db/wal/node/WALNode.java | 26 +++++++++++++++-------
 2 files changed, 19 insertions(+), 11 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ccf11bf1d5..557fbf0f62 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -270,9 +270,7 @@ public class LogDispatcher {
       // indicating that insert nodes whose search index are before this value 
can be deleted
       // safely
       long currentSafelyDeletedSearchIndex = 
impl.getCurrentSafelyDeletedSearchIndex();
-      reader.setSafelyDeletedSearchIndex(
-          currentSafelyDeletedSearchIndex
-              - currentSafelyDeletedSearchIndex % 
config.getReplication().getCheckpointGap());
+      reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
       // notify
       if (impl.unblockWrite()) {
         impl.signal();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java 
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index f1600863f2..a8e97bfdb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -683,15 +683,21 @@ public class WALNode implements IWALNode {
 
     @Override
     public void waitForNextReady() throws InterruptedException {
+      boolean walFileRolled = false;
       while (!hasNext()) {
-        boolean timeout =
-            !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, 
TimeUnit.SECONDS);
-        if (timeout) {
-          logger.info(
-              "timeout when waiting for next WAL entry ready, execute 
rollWALFile. Current search index in wal buffer is {}, and next target index is 
{}",
-              buffer.getCurrentSearchIndex(),
-              nextSearchIndex);
-          rollWALFile();
+        if (!walFileRolled) {
+          boolean timeout =
+              !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, 
TimeUnit.SECONDS);
+          if (timeout) {
+            logger.info(
+                "timeout when waiting for next WAL entry ready, execute 
rollWALFile. Current search index in wal buffer is {}, and next target index is 
{}",
+                buffer.getCurrentSearchIndex(),
+                nextSearchIndex);
+            rollWALFile();
+            walFileRolled = true;
+          }
+        } else {
+          buffer.waitForFlush();
         }
       }
     }
@@ -735,6 +741,10 @@ public class WALNode implements IWALNode {
       int fileIndex = 
WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
       logger.debug(
           "searchIndex: {}, result: {}, files: {}, ", nextSearchIndex, 
fileIndex, filesToSearch);
+      // (xingtanzjr) When the target entry does not exist, the reader will 
return minimum one whose searchIndex is larger than target searchIndex
+      if (fileIndex == -1) {
+        fileIndex = 0;
+      }
       if (filesToSearch != null
           && (fileIndex >= 0 && fileIndex < filesToSearch.length - 1)) { // 
possible to find next
         this.filesToSearch = filesToSearch;

Reply via email to