This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 4772817ff RATIS-1708. RaftLog file channel closed before async flush
ends (#746)
4772817ff is described below
commit 4772817ff5c45144856433997204265940d027de
Author: William Song <[email protected]>
AuthorDate: Wed Sep 14 17:13:29 2022 +0800
RATIS-1708. RaftLog file channel closed before async flush ends (#746)
(cherry picked from commit 93f2f748a9a2fa08011f469af43428ceb00bbf96)
---
.../raftlog/segmented/BufferedWriteChannel.java | 22 ++++++++++------------
.../segmented/SegmentedRaftLogOutputStream.java | 5 ++---
.../raftlog/segmented/SegmentedRaftLogWorker.java | 10 +++-------
.../statemachine/SimpleStateMachine4Testing.java | 2 +-
.../segmented/TestBufferedWriteChannel.java | 2 +-
.../server/raftlog/segmented/TestLogSegment.java | 8 ++++----
.../raftlog/segmented/TestRaftLogReadWrite.java | 12 ++++++------
.../raftlog/segmented/TestSegmentedRaftLog.java | 2 +-
8 files changed, 28 insertions(+), 35 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index 9bc26596f..f6c10ea41 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -27,11 +27,10 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
-import java.util.function.Supplier;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Provides a buffering layer in front of a FileChannel for writing.
@@ -39,8 +38,7 @@ import java.util.function.Supplier;
* This class is NOT threadsafe.
*/
class BufferedWriteChannel implements Closeable {
- static BufferedWriteChannel open(File file, boolean append, ByteBuffer
buffer,
- Supplier<CompletableFuture<Void>> flushFuture) throws IOException {
+ static BufferedWriteChannel open(File file, boolean append, ByteBuffer
buffer) throws IOException {
final RandomAccessFile raf = new RandomAccessFile(file, "rw");
final FileChannel fc = raf.getChannel();
if (append) {
@@ -49,19 +47,19 @@ class BufferedWriteChannel implements Closeable {
fc.truncate(0);
}
Preconditions.assertSame(fc.size(), fc.position(), "fc.position");
- return new BufferedWriteChannel(fc, buffer, flushFuture);
+ return new BufferedWriteChannel(fc, buffer);
}
private final FileChannel fileChannel;
private final ByteBuffer writeBuffer;
private boolean forced = true;
- private final Supplier<CompletableFuture<Void>> flushFuture;
+ private final AtomicReference<CompletableFuture<Void>> flushFuture
+ = new AtomicReference<>(CompletableFuture.completedFuture(null));
- BufferedWriteChannel(FileChannel fileChannel, ByteBuffer byteBuffer,
- Supplier<CompletableFuture<Void>> flushFuture) {
+
+ BufferedWriteChannel(FileChannel fileChannel, ByteBuffer byteBuffer) {
this.fileChannel = fileChannel;
this.writeBuffer = byteBuffer;
- this.flushFuture = flushFuture;
}
void write(byte[] b) throws IOException {
@@ -101,11 +99,11 @@ class BufferedWriteChannel implements Closeable {
CompletableFuture<Void> asyncFlush(ExecutorService executor) throws
IOException {
flushBuffer();
if (forced) {
- return CompletableFuture.completedFuture(null);
+ return flushFuture.get();
}
final CompletableFuture<Void> f =
CompletableFuture.supplyAsync(this::fileChannelForce, executor);
forced = true;
- return f;
+ return flushFuture.updateAndGet(previous -> f.thenCombine(previous,
(current, prev) -> current));
}
private Void fileChannelForce() {
@@ -148,7 +146,7 @@ class BufferedWriteChannel implements Closeable {
}
try {
- Optional.ofNullable(flushFuture).ifPresent(f -> f.get());
+ flushFuture.get().join();
fileChannel.truncate(fileChannel.position());
} finally {
fileChannel.close();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index b96acdc61..22eebac93 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -34,7 +34,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.function.Supplier;
import java.util.zip.Checksum;
public class SegmentedRaftLogOutputStream implements Closeable {
@@ -58,13 +57,13 @@ public class SegmentedRaftLogOutputStream implements
Closeable {
private final long preallocatedSize;
public SegmentedRaftLogOutputStream(File file, boolean append, long
segmentMaxSize,
- long preallocatedSize, ByteBuffer byteBuffer,
Supplier<CompletableFuture<Void>> flushFuture)
+ long preallocatedSize, ByteBuffer byteBuffer)
throws IOException {
this.file = file;
this.checksum = new PureJavaCrc32C();
this.segmentMaxSize = segmentMaxSize;
this.preallocatedSize = preallocatedSize;
- this.out = BufferedWriteChannel.open(file, append, byteBuffer,
flushFuture);
+ this.out = BufferedWriteChannel.open(file, append, byteBuffer);
if (!append) {
// write header
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 3bc2b593b..516b01c7b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -51,7 +51,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -179,8 +178,6 @@ class SegmentedRaftLogWorker {
private final boolean asyncFlush;
private final boolean unsafeFlush;
private final ExecutorService flushExecutor;
- private final AtomicReference<CompletableFuture<Void>> flushFuture
- = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final StateMachineDataPolicy stateMachineDataPolicy;
@@ -407,9 +404,8 @@ class SegmentedRaftLogWorker {
private void asyncFlushOutStream(CompletableFuture<Void> stateMachineFlush)
throws IOException {
final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
- final CompletableFuture<Void> f = out.asyncFlush(flushExecutor)
- .thenCombine(stateMachineFlush, (async, sm) -> async);
- flushFuture.updateAndGet(previous -> f.thenCombine(previous, (current,
prev) -> current))
+ out.asyncFlush(flushExecutor)
+ .thenCombine(stateMachineFlush, (async, sm) -> async)
.whenComplete((v, e) -> {
updateFlushedIndexIncreasingly(lastWrittenIndex);
logSyncTimerContext.stop();
@@ -754,6 +750,6 @@ class SegmentedRaftLogWorker {
private void allocateSegmentedRaftLogOutputStream(File file, boolean append)
throws IOException {
Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
- preallocatedSize, writeBuffer, flushFuture::get);
+ preallocatedSize, writeBuffer);
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index c8755abab..cf715585e 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -264,7 +264,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}",
termIndex.getTerm(),
termIndex.getIndex(), snapshotFile);
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(snapshotFile, false,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), null)) {
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
for (final LogEntryProto entry : indexMap.values()) {
if (entry.getIndex() > endIndex) {
break;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
index dec13903b..a7bb7000a 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
@@ -137,7 +137,7 @@ public class TestBufferedWriteChannel extends BaseTest {
final byte[] bytes = new byte[10];
final ByteBuffer buffer = ByteBuffer.wrap(bytes);
final FakeFileChannel fake = new FakeFileChannel();
- final BufferedWriteChannel out = new BufferedWriteChannel(fake, buffer,
null);
+ final BufferedWriteChannel out = new BufferedWriteChannel(fake, buffer);
// write exactly buffer size, then flush.
fake.assertValues(0, 0);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 724c2d3b8..6e0af2dab 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -93,7 +93,7 @@ public class TestLogSegment extends BaseTest {
final LogEntryProto[] entries = new LogEntryProto[numEntries];
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(file, false,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), null)) {
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < entries.length; i++) {
SimpleOperation op = new SimpleOperation("m" + i);
entries[i] = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(),
term, i + startIndex);
@@ -295,7 +295,7 @@ public class TestLogSegment extends BaseTest {
for (int max : maxSizes) {
for (int a : preallocated) {
try(SegmentedRaftLogOutputStream ignored =
- new SegmentedRaftLogOutputStream(file, false, max, a,
ByteBuffer.allocateDirect(bufferSize), null)) {
+ new SegmentedRaftLogOutputStream(file, false, max, a,
ByteBuffer.allocateDirect(bufferSize))) {
Assert.assertEquals("max=" + max + ", a=" + a, file.length(),
Math.min(max, a));
}
try(SegmentedRaftLogInputStream in = new
SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
@@ -310,7 +310,7 @@ public class TestLogSegment extends BaseTest {
Arrays.fill(content, (byte) 1);
final long size;
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(file, false,
- 1024, 1024, ByteBuffer.allocateDirect(bufferSize), null)) {
+ 1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
SimpleOperation op = new SimpleOperation(new String(content));
LogEntryProto entry =
LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
size = LogSegment.getEntrySize(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
@@ -345,7 +345,7 @@ public class TestLogSegment extends BaseTest {
long totalSize = SegmentedRaftLogFormat.getHeaderLength();
long preallocated = 16 * 1024;
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(file, false,
- max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024), null))
{
+ max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024))) {
Assert.assertEquals(preallocated, file.length());
while (totalSize + entrySize < max.getSize()) {
totalSize += entrySize;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 5d54eff03..88b5e2f48 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -108,7 +108,7 @@ public class TestRaftLogReadWrite extends BaseTest {
final LogEntryProto[] entries = new LogEntryProto[100];
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), null)) {
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
size += writeMessages(entries, out);
} finally {
storage.close();
@@ -126,7 +126,7 @@ public class TestRaftLogReadWrite extends BaseTest {
final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
LogEntryProto[] entries = new LogEntryProto[200];
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), null)) {
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < 100; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0,
i);
@@ -135,7 +135,7 @@ public class TestRaftLogReadWrite extends BaseTest {
}
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, true,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), null)) {
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 100; i < 200; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0,
i);
@@ -161,7 +161,7 @@ public class TestRaftLogReadWrite extends BaseTest {
LogEntryProto[] entries = new LogEntryProto[100];
final SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), null);
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize));
size += writeMessages(entries, out);
out.flush();
@@ -189,7 +189,7 @@ public class TestRaftLogReadWrite extends BaseTest {
LogEntryProto[] entries = new LogEntryProto[10];
final SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
- 16 * 1024 * 1024, 4 * 1024 * 1024,
ByteBuffer.allocateDirect(bufferSize), null);
+ 16 * 1024 * 1024, 4 * 1024 * 1024,
ByteBuffer.allocateDirect(bufferSize));
for (int i = 0; i < 10; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -236,7 +236,7 @@ public class TestRaftLogReadWrite extends BaseTest {
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), null)) {
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < 100; i++) {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 6643dbdf3..7c7e77805 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -176,7 +176,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final int size = (int) (range.end - range.start + 1);
LogEntryProto[] entries = new LogEntryProto[size];
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(file, false,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), null)) {
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < size; i++) {
SimpleOperation m = new SimpleOperation("m" + (i + range.start));
entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(),
range.term, i + range.start);