This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 190dcbdcf1 fix async flush
190dcbdcf1 is described below

commit 190dcbdcf19dd63dce3190904f166f1d4c54b419
Author: Tian Jiang <[email protected]>
AuthorDate: Mon May 8 09:57:26 2023 +0800

    fix async flush
---
 .../manager/serialization/StableEntryManager.java  |  2 +-
 .../serialization/SyncLogDequeSerializer.java      | 39 +++++++++++-----------
 2 files changed, 21 insertions(+), 20 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index f1a7e6ebfb..b7abc3fe73 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -34,7 +34,7 @@ public interface StableEntryManager {
   void append(List<Entry> entries, long commitIndex, long 
maxHaveAppliedCommitIndex)
       throws IOException;
 
-  void flushLogBuffer();
+  void flushLogBuffer(boolean isAsyncFlush);
 
   void forceFlushLogBuffer();
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index f8a5f3956e..02dc4eb65c 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -309,7 +309,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
         logger.debug("Raft log buffer overflow!");
         logDataBuffer.reset();
         logIndexBuffer.reset();
-        flushLogBuffer();
+        flushLogBuffer(true);
         logDataBuffer.put(logData);
         lastLogIndex = log.getCurrLogIndex();
       }
@@ -386,7 +386,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   }
 
   @Override
-  public void flushLogBuffer() {
+  public void flushLogBuffer(boolean isAsyncFlush) {
     if (isClosed || logDataBuffer.position() == 0) {
       return;
     }
@@ -403,6 +403,17 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
       switchBuffer();
       flushingLogFuture = flushingLogExecutorService.submit(() -> 
flushLogBufferTask(lastLogIndex));
     }
+
+    if (!isAsyncFlush) {
+      try {
+        flushingLogFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException e) {
+        logger.error("Unexpected exception when flushing log in {}", name);
+        throw new RuntimeException(e);
+      }
+    }
   }
 
   private void switchBuffer() {
@@ -436,7 +447,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
       }
       persistedLogIndex = currentLastIndex;
 
-      checkCloseCurrentFile();
+      checkCloseCurrentFile(currentLastIndex);
     } catch (IOException e) {
       logger.error("Error in logs serialization: ", e);
       return;
@@ -449,12 +460,14 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     logger.debug("End flushing log buffer.");
   }
 
-  private void forceFlushLogBufferWithoutCloseFile() {
+  /** flush the log buffer and force the persistence */
+  @Override
+  public void forceFlushLogBuffer() {
     if (isClosed) {
       return;
     }
     lock.lock();
-    flushLogBuffer();
+    flushLogBuffer(false);
     serializeMeta(meta);
     try {
       if (currentLogDataOutputStream != null) {
@@ -472,18 +485,6 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     }
   }
 
-  /** flush the log buffer and check if the file needs to be closed */
-  @Override
-  public void forceFlushLogBuffer() {
-    lock.lock();
-    try {
-      forceFlushLogBufferWithoutCloseFile();
-      checkCloseCurrentFile(meta.getCommitLogIndex());
-    } finally {
-      lock.unlock();
-    }
-  }
-
   @Override
   public void setHardStateAndFlush(HardState state) {
     this.state = state;
@@ -995,7 +996,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     lock.lock();
     try {
       // 1. flush logs in buffer
-      flushLogBuffer();
+      flushLogBuffer(true);
 
       // 2. check the log index offset list size
       if (logIndexOffsetList.size() > maxRaftLogIndexSizeInMemory) {
@@ -1362,7 +1363,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long> 
startAndEndOffset) {
     List<Entry> result = new ArrayList<>();
     if (file.getName().equals(getCurrentLogDataFile().getName())) {
-      forceFlushLogBufferWithoutCloseFile();
+      forceFlushLogBuffer();
     }
     try (FileInputStream fileInputStream = new FileInputStream(file);
         BufferedInputStream bufferedInputStream = new 
BufferedInputStream(fileInputStream)) {

Reply via email to