This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 190dcbdcf1 fix async flush
190dcbdcf1 is described below
commit 190dcbdcf19dd63dce3190904f166f1d4c54b419
Author: Tian Jiang <[email protected]>
AuthorDate: Mon May 8 09:57:26 2023 +0800
fix async flush
---
.../manager/serialization/StableEntryManager.java | 2 +-
.../serialization/SyncLogDequeSerializer.java | 39 +++++++++++-----------
2 files changed, 21 insertions(+), 20 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index f1a7e6ebfb..b7abc3fe73 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -34,7 +34,7 @@ public interface StableEntryManager {
void append(List<Entry> entries, long commitIndex, long
maxHaveAppliedCommitIndex)
throws IOException;
- void flushLogBuffer();
+ void flushLogBuffer(boolean isAsyncFlush);
void forceFlushLogBuffer();
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index f8a5f3956e..02dc4eb65c 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -309,7 +309,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
logger.debug("Raft log buffer overflow!");
logDataBuffer.reset();
logIndexBuffer.reset();
- flushLogBuffer();
+ flushLogBuffer(true);
logDataBuffer.put(logData);
lastLogIndex = log.getCurrLogIndex();
}
@@ -386,7 +386,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
}
@Override
- public void flushLogBuffer() {
+ public void flushLogBuffer(boolean isAsyncFlush) {
if (isClosed || logDataBuffer.position() == 0) {
return;
}
@@ -403,6 +403,17 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
switchBuffer();
flushingLogFuture = flushingLogExecutorService.submit(() ->
flushLogBufferTask(lastLogIndex));
}
+
+ if (!isAsyncFlush) {
+ try {
+ flushingLogFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.error("Unexpected exception when flushing log in {}", name);
+ throw new RuntimeException(e);
+ }
+ }
}
private void switchBuffer() {
@@ -436,7 +447,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
}
persistedLogIndex = currentLastIndex;
- checkCloseCurrentFile();
+ checkCloseCurrentFile(currentLastIndex);
} catch (IOException e) {
logger.error("Error in logs serialization: ", e);
return;
@@ -449,12 +460,14 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
logger.debug("End flushing log buffer.");
}
- private void forceFlushLogBufferWithoutCloseFile() {
+ /** flush the log buffer and force the persistence */
+ @Override
+ public void forceFlushLogBuffer() {
if (isClosed) {
return;
}
lock.lock();
- flushLogBuffer();
+ flushLogBuffer(false);
serializeMeta(meta);
try {
if (currentLogDataOutputStream != null) {
@@ -472,18 +485,6 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
}
}
- /** flush the log buffer and check if the file needs to be closed */
- @Override
- public void forceFlushLogBuffer() {
- lock.lock();
- try {
- forceFlushLogBufferWithoutCloseFile();
- checkCloseCurrentFile(meta.getCommitLogIndex());
- } finally {
- lock.unlock();
- }
- }
-
@Override
public void setHardStateAndFlush(HardState state) {
this.state = state;
@@ -995,7 +996,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
lock.lock();
try {
// 1. flush logs in buffer
- flushLogBuffer();
+ flushLogBuffer(true);
// 2. check the log index offset list size
if (logIndexOffsetList.size() > maxRaftLogIndexSizeInMemory) {
@@ -1362,7 +1363,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long>
startAndEndOffset) {
List<Entry> result = new ArrayList<>();
if (file.getName().equals(getCurrentLogDataFile().getName())) {
- forceFlushLogBufferWithoutCloseFile();
+ forceFlushLogBuffer();
}
try (FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream = new
BufferedInputStream(fileInputStream)) {