This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_ml_wal_no_delete in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b1023623d04c33afc412565ed1aadd6c49252b8a Author: Jinrui.Zhang <[email protected]> AuthorDate: Fri Sep 23 17:17:33 2022 +0800 Fix the issue that the wal won't be deleted when leader transfer to follower --- .../multileader/logdispatcher/LogDispatcher.java | 4 +--- .../java/org/apache/iotdb/db/wal/node/WALNode.java | 26 +++++++++++++++------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index ccf11bf1d5..557fbf0f62 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -270,9 +270,7 @@ public class LogDispatcher { // indicating that insert nodes whose search index are before this value can be deleted // safely long currentSafelyDeletedSearchIndex = impl.getCurrentSafelyDeletedSearchIndex(); - reader.setSafelyDeletedSearchIndex( - currentSafelyDeletedSearchIndex - - currentSafelyDeletedSearchIndex % config.getReplication().getCheckpointGap()); + reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex); // notify if (impl.unblockWrite()) { impl.signal(); diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java index f1600863f2..a8e97bfdb5 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java @@ -683,15 +683,21 @@ public class WALNode implements IWALNode { @Override public void waitForNextReady() throws InterruptedException { + boolean walFileRolled = false; while (!hasNext()) { - boolean timeout = - !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS); - if (timeout) { - logger.info( - "timeout when waiting for next WAL entry ready, execute rollWALFile. Current search index in wal buffer is {}, and next target index is {}", - buffer.getCurrentSearchIndex(), - nextSearchIndex); - rollWALFile(); + if (!walFileRolled) { + boolean timeout = + !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS); + if (timeout) { + logger.info( + "timeout when waiting for next WAL entry ready, execute rollWALFile. Current search index in wal buffer is {}, and next target index is {}", + buffer.getCurrentSearchIndex(), + nextSearchIndex); + rollWALFile(); + walFileRolled = true; + } + } else { + buffer.waitForFlush(); } } } @@ -735,6 +741,10 @@ public class WALNode implements IWALNode { int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex); logger.debug( "searchIndex: {}, result: {}, files: {}, ", nextSearchIndex, fileIndex, filesToSearch); + // (xingtanzjr) When the target entry does not exist, the reader will return minimum one whose searchIndex is larger than target searchIndex + if (fileIndex == -1) { + fileIndex = 0; + } if (filesToSearch != null && (fileIndex >= 0 && fileIndex < filesToSearch.length - 1)) { // possible to find next this.filesToSearch = filesToSearch;
