This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 4772817ff RATIS-1708. RaftLog file channel closed before async flush 
ends (#746)
4772817ff is described below

commit 4772817ff5c45144856433997204265940d027de
Author: William Song <[email protected]>
AuthorDate: Wed Sep 14 17:13:29 2022 +0800

    RATIS-1708. RaftLog file channel closed before async flush ends (#746)
    
    (cherry picked from commit 93f2f748a9a2fa08011f469af43428ceb00bbf96)
---
 .../raftlog/segmented/BufferedWriteChannel.java    | 22 ++++++++++------------
 .../segmented/SegmentedRaftLogOutputStream.java    |  5 ++---
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 10 +++-------
 .../statemachine/SimpleStateMachine4Testing.java   |  2 +-
 .../segmented/TestBufferedWriteChannel.java        |  2 +-
 .../server/raftlog/segmented/TestLogSegment.java   |  8 ++++----
 .../raftlog/segmented/TestRaftLogReadWrite.java    | 12 ++++++------
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  2 +-
 8 files changed, 28 insertions(+), 35 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index 9bc26596f..f6c10ea41 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -27,11 +27,10 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
-import java.util.function.Supplier;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Provides a buffering layer in front of a FileChannel for writing.
@@ -39,8 +38,7 @@ import java.util.function.Supplier;
  * This class is NOT threadsafe.
  */
 class BufferedWriteChannel implements Closeable {
-  static BufferedWriteChannel open(File file, boolean append, ByteBuffer 
buffer,
-      Supplier<CompletableFuture<Void>> flushFuture) throws IOException {
+  static BufferedWriteChannel open(File file, boolean append, ByteBuffer 
buffer) throws IOException {
     final RandomAccessFile raf = new RandomAccessFile(file, "rw");
     final FileChannel fc = raf.getChannel();
     if (append) {
@@ -49,19 +47,19 @@ class BufferedWriteChannel implements Closeable {
       fc.truncate(0);
     }
     Preconditions.assertSame(fc.size(), fc.position(), "fc.position");
-    return new BufferedWriteChannel(fc, buffer, flushFuture);
+    return new BufferedWriteChannel(fc, buffer);
   }
 
   private final FileChannel fileChannel;
   private final ByteBuffer writeBuffer;
   private boolean forced = true;
-  private final Supplier<CompletableFuture<Void>> flushFuture;
+  private final AtomicReference<CompletableFuture<Void>> flushFuture
+      = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
-  BufferedWriteChannel(FileChannel fileChannel, ByteBuffer byteBuffer,
-      Supplier<CompletableFuture<Void>> flushFuture) {
+
+  BufferedWriteChannel(FileChannel fileChannel, ByteBuffer byteBuffer) {
     this.fileChannel = fileChannel;
     this.writeBuffer = byteBuffer;
-    this.flushFuture = flushFuture;
   }
 
   void write(byte[] b) throws IOException {
@@ -101,11 +99,11 @@ class BufferedWriteChannel implements Closeable {
   CompletableFuture<Void> asyncFlush(ExecutorService executor) throws 
IOException {
     flushBuffer();
     if (forced) {
-      return CompletableFuture.completedFuture(null);
+      return flushFuture.get();
     }
     final CompletableFuture<Void> f = 
CompletableFuture.supplyAsync(this::fileChannelForce, executor);
     forced = true;
-    return f;
+    return flushFuture.updateAndGet(previous -> f.thenCombine(previous, 
(current, prev) -> current));
   }
 
   private Void fileChannelForce() {
@@ -148,7 +146,7 @@ class BufferedWriteChannel implements Closeable {
     }
 
     try {
-      Optional.ofNullable(flushFuture).ifPresent(f -> f.get());
+      flushFuture.get().join();
       fileChannel.truncate(fileChannel.position());
     } finally {
       fileChannel.close();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index b96acdc61..22eebac93 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -34,7 +34,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.function.Supplier;
 import java.util.zip.Checksum;
 
 public class SegmentedRaftLogOutputStream implements Closeable {
@@ -58,13 +57,13 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
   private final long preallocatedSize;
 
   public SegmentedRaftLogOutputStream(File file, boolean append, long 
segmentMaxSize,
-      long preallocatedSize, ByteBuffer byteBuffer, 
Supplier<CompletableFuture<Void>> flushFuture)
+      long preallocatedSize, ByteBuffer byteBuffer)
       throws IOException {
     this.file = file;
     this.checksum = new PureJavaCrc32C();
     this.segmentMaxSize = segmentMaxSize;
     this.preallocatedSize = preallocatedSize;
-    this.out = BufferedWriteChannel.open(file, append, byteBuffer, 
flushFuture);
+    this.out = BufferedWriteChannel.open(file, append, byteBuffer);
 
     if (!append) {
       // write header
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 3bc2b593b..516b01c7b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -51,7 +51,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -179,8 +178,6 @@ class SegmentedRaftLogWorker {
   private final boolean asyncFlush;
   private final boolean unsafeFlush;
   private final ExecutorService flushExecutor;
-  private final AtomicReference<CompletableFuture<Void>> flushFuture
-      = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
   private final StateMachineDataPolicy stateMachineDataPolicy;
 
@@ -407,9 +404,8 @@ class SegmentedRaftLogWorker {
 
   private void asyncFlushOutStream(CompletableFuture<Void> stateMachineFlush) 
throws IOException {
     final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
-    final CompletableFuture<Void> f = out.asyncFlush(flushExecutor)
-        .thenCombine(stateMachineFlush, (async, sm) -> async);
-    flushFuture.updateAndGet(previous -> f.thenCombine(previous, (current, 
prev) -> current))
+    out.asyncFlush(flushExecutor)
+        .thenCombine(stateMachineFlush, (async, sm) -> async)
         .whenComplete((v, e) -> {
           updateFlushedIndexIncreasingly(lastWrittenIndex);
           logSyncTimerContext.stop();
@@ -754,6 +750,6 @@ class SegmentedRaftLogWorker {
   private void allocateSegmentedRaftLogOutputStream(File file, boolean append) 
throws IOException {
     Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
     out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
-            preallocatedSize, writeBuffer, flushFuture::get);
+            preallocatedSize, writeBuffer);
   }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index c8755abab..cf715585e 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -264,7 +264,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", 
termIndex.getTerm(),
         termIndex.getIndex(), snapshotFile);
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(snapshotFile, false,
-        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize), null)) {
+        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
       for (final LogEntryProto entry : indexMap.values()) {
         if (entry.getIndex() > endIndex) {
           break;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
index dec13903b..a7bb7000a 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
@@ -137,7 +137,7 @@ public class TestBufferedWriteChannel extends BaseTest {
     final byte[] bytes = new byte[10];
     final ByteBuffer buffer = ByteBuffer.wrap(bytes);
     final FakeFileChannel fake = new FakeFileChannel();
-    final BufferedWriteChannel out = new BufferedWriteChannel(fake, buffer, 
null);
+    final BufferedWriteChannel out = new BufferedWriteChannel(fake, buffer);
 
     // write exactly buffer size, then flush.
     fake.assertValues(0, 0);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 724c2d3b8..6e0af2dab 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -93,7 +93,7 @@ public class TestLogSegment extends BaseTest {
 
     final LogEntryProto[] entries = new LogEntryProto[numEntries];
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(file, false,
-        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize), null)) {
+        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
       for (int i = 0; i < entries.length; i++) {
         SimpleOperation op = new SimpleOperation("m" + i);
         entries[i] = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 
term, i + startIndex);
@@ -295,7 +295,7 @@ public class TestLogSegment extends BaseTest {
     for (int max : maxSizes) {
       for (int a : preallocated) {
         try(SegmentedRaftLogOutputStream ignored =
-                new SegmentedRaftLogOutputStream(file, false, max, a, 
ByteBuffer.allocateDirect(bufferSize), null)) {
+                new SegmentedRaftLogOutputStream(file, false, max, a, 
ByteBuffer.allocateDirect(bufferSize))) {
           Assert.assertEquals("max=" + max + ", a=" + a, file.length(), 
Math.min(max, a));
         }
         try(SegmentedRaftLogInputStream in = new 
SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
@@ -310,7 +310,7 @@ public class TestLogSegment extends BaseTest {
     Arrays.fill(content, (byte) 1);
     final long size;
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(file, false,
-        1024, 1024, ByteBuffer.allocateDirect(bufferSize), null)) {
+        1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
       SimpleOperation op = new SimpleOperation(new String(content));
       LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
       size = LogSegment.getEntrySize(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
@@ -345,7 +345,7 @@ public class TestLogSegment extends BaseTest {
     long totalSize = SegmentedRaftLogFormat.getHeaderLength();
     long preallocated = 16 * 1024;
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(file, false,
-        max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024), null)) 
{
+        max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024))) {
       Assert.assertEquals(preallocated, file.length());
       while (totalSize + entrySize < max.getSize()) {
         totalSize += entrySize;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 5d54eff03..88b5e2f48 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -108,7 +108,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     final LogEntryProto[] entries = new LogEntryProto[100];
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize), null)) {
+        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
       size += writeMessages(entries, out);
     } finally {
       storage.close();
@@ -126,7 +126,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
     LogEntryProto[] entries = new LogEntryProto[200];
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize), null)) {
+        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
       for (int i = 0; i < 100; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
         entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 
i);
@@ -135,7 +135,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     }
 
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(openSegment, true,
-        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize), null)) {
+        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
       for (int i = 100; i < 200; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
         entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 
i);
@@ -161,7 +161,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     LogEntryProto[] entries = new LogEntryProto[100];
     final SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize), null);
+        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize));
     size += writeMessages(entries, out);
     out.flush();
 
@@ -189,7 +189,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     LogEntryProto[] entries = new LogEntryProto[10];
     final SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(openSegment, false,
-        16 * 1024 * 1024, 4 * 1024 * 1024, 
ByteBuffer.allocateDirect(bufferSize), null);
+        16 * 1024 * 1024, 4 * 1024 * 1024, 
ByteBuffer.allocateDirect(bufferSize));
     for (int i = 0; i < 10; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -236,7 +236,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
     final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize), null)) {
+        segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
       for (int i = 0; i < 100; i++) {
         LogEntryProto entry = LogProtoUtils.toLogEntryProto(
             new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 6643dbdf3..7c7e77805 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -176,7 +176,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       final int size = (int) (range.end - range.start + 1);
       LogEntryProto[] entries = new LogEntryProto[size];
       try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(file, false,
-          segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize), null)) {
+          segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
         for (int i = 0; i < size; i++) {
           SimpleOperation m = new SimpleOperation("m" + (i + range.start));
           entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 
range.term, i + range.start);

Reply via email to