This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e2594a3a6d [MultiLeader] Fix the issue that the wal won't be deleted
when leader transfer to follower (#7421)
e2594a3a6d is described below
commit e2594a3a6d7cb57a603afea49f60babf99fb6d03
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri Sep 23 17:56:54 2022 +0800
[MultiLeader] Fix the issue that the wal won't be deleted when leader
transfer to follower (#7421)
---
.../multileader/logdispatcher/LogDispatcher.java | 4 +---
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 27 +++++++++++++++-------
2 files changed, 20 insertions(+), 11 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ccf11bf1d5..557fbf0f62 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -270,9 +270,7 @@ public class LogDispatcher {
// indicating that insert nodes whose search index are before this value
can be deleted
// safely
long currentSafelyDeletedSearchIndex =
impl.getCurrentSafelyDeletedSearchIndex();
- reader.setSafelyDeletedSearchIndex(
- currentSafelyDeletedSearchIndex
- - currentSafelyDeletedSearchIndex %
config.getReplication().getCheckpointGap());
+ reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
// notify
if (impl.unblockWrite()) {
impl.signal();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index f1600863f2..0f4ced8df6 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -683,15 +683,21 @@ public class WALNode implements IWALNode {
@Override
public void waitForNextReady() throws InterruptedException {
+ boolean walFileRolled = false;
while (!hasNext()) {
- boolean timeout =
- !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC,
TimeUnit.SECONDS);
- if (timeout) {
- logger.info(
- "timeout when waiting for next WAL entry ready, execute
rollWALFile. Current search index in wal buffer is {}, and next target index is
{}",
- buffer.getCurrentSearchIndex(),
- nextSearchIndex);
- rollWALFile();
+ if (!walFileRolled) {
+ boolean timeout =
+ !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC,
TimeUnit.SECONDS);
+ if (timeout) {
+ logger.info(
+ "timeout when waiting for next WAL entry ready, execute
rollWALFile. Current search index in wal buffer is {}, and next target index is
{}",
+ buffer.getCurrentSearchIndex(),
+ nextSearchIndex);
+ rollWALFile();
+ walFileRolled = true;
+ }
+ } else {
+ buffer.waitForFlush();
}
}
}
@@ -735,6 +741,11 @@ public class WALNode implements IWALNode {
int fileIndex =
WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
logger.debug(
"searchIndex: {}, result: {}, files: {}, ", nextSearchIndex,
fileIndex, filesToSearch);
+ // (xingtanzjr) When the target entry does not exist, the reader will
return minimum one whose
+ // searchIndex is larger than target searchIndex
+ if (fileIndex == -1) {
+ fileIndex = 0;
+ }
if (filesToSearch != null
&& (fileIndex >= 0 && fileIndex < filesToSearch.length - 1)) { //
possible to find next
this.filesToSearch = filesToSearch;