This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 913f5a4  RATIS-1154. Add sync flag when write stream data (#284)
913f5a4 is described below

commit 913f5a4ad54ddc948f49eca8907bb13ea67a6e35
Author: runzhiwang <[email protected]>
AuthorDate: Wed Nov 18 13:41:55 2020 +0800

    RATIS-1154. Add sync flag when write stream data (#284)
    
    * RATIS-1154. Add sync flag when write stream data
    
    * Update 
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
    
    Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
    
    * fix code review
    
    Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
---
 .../apache/ratis/client/api/DataStreamOutput.java  |  6 ++++-
 .../ratis/client/impl/DataStreamClientImpl.java    |  4 ++--
 .../ratis/netty/server/DataStreamManagement.java   | 26 ++++++++++++++--------
 ratis-proto/src/main/proto/Raft.proto              |  5 +++--
 .../apache/ratis/statemachine/StateMachine.java    |  9 +++++++-
 .../ratis/datastream/DataStreamBaseTest.java       | 20 +++++++++++++----
 6 files changed, 51 insertions(+), 19 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index dd86569..8219d69 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -26,5 +26,9 @@ import java.util.concurrent.CompletableFuture;
 /** An asynchronous output stream supporting zero buffer copying. */
 public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
   /** Send out the data in the buffer asynchronously */
-  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
+  default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer) {
+    return writeAsync(buffer, false);
+  }
+
+  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer, boolean 
sync);
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 2acf657..c7acb6e 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -85,12 +85,12 @@ public class DataStreamClientImpl implements 
DataStreamClient {
 
     // send to the attached dataStreamClientRpc
     @Override
-    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
+    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf, 
boolean sync) {
       if (isClosed()) {
         return JavaUtils.completeExceptionally(new AlreadyClosedException(
             clientId + ": stream already closed, request=" + header));
       }
-      final CompletableFuture<DataStreamReply> f = send(Type.STREAM_DATA, buf);
+      final CompletableFuture<DataStreamReply> f = send(sync ? 
Type.STREAM_DATA_SYNC : Type.STREAM_DATA, buf);
       streamOffset += buf.remaining();
       return combineHeader(f);
     }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 81e4f68..f88a52a 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -35,6 +35,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
+import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -46,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -73,9 +73,9 @@ public class DataStreamManagement {
       this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s -> 
0L));
     }
 
-    CompletableFuture<Long> write(ByteBuf buf, Executor executor) {
+    CompletableFuture<Long> write(ByteBuf buf, boolean sync, Executor 
executor) {
       return composeAsync(writeFuture, executor,
-          n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, stream), 
executor));
+          n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, sync, 
stream), executor));
     }
 
     CompletableFuture<Long> close(Executor executor) {
@@ -92,7 +92,7 @@ public class DataStreamManagement {
     }
 
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) 
{
-      return out.writeAsync(request.slice().nioBuffer());
+      return out.writeAsync(request.slice().nioBuffer(), request.getType() == 
Type.STREAM_DATA_SYNC);
     }
 
     CompletableFuture<DataStreamReply> 
startTransaction(DataStreamRequestByteBuf request,
@@ -235,8 +235,8 @@ public class DataStreamManagement {
     return composed;
   }
 
-  static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
+  static long writeTo(ByteBuf buf, boolean sync, DataStream stream) {
+    final StateMachineDataChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
       try {
@@ -245,6 +245,14 @@ public class DataStreamManagement {
         throw new CompletionException(t);
       }
     }
+
+    if (sync) {
+      try {
+        channel.force(false);
+      } catch (IOException e) {
+        throw new CompletionException(e);
+      }
+    }
     return byteWritten;
   }
 
@@ -419,8 +427,8 @@ public class DataStreamManagement {
     if (request.getType() == Type.STREAM_HEADER) {
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
-    } else if (request.getType() == Type.STREAM_DATA) {
-      localWrite = info.getLocal().write(buf, executor);
+    } else if (request.getType() == Type.STREAM_DATA || request.getType() == 
Type.STREAM_DATA_SYNC) {
+      localWrite = info.getLocal().write(buf, request.getType() == 
Type.STREAM_DATA_SYNC, executor);
       remoteWrites = info.applyToRemotes(out -> out.write(request));
     } else if (request.getType() == Type.STREAM_CLOSE) {
       localWrite = info.getLocal().close(executor);
@@ -432,7 +440,7 @@ public class DataStreamManagement {
     composeAsync(info.getPrevious(), executor, n -> 
JavaUtils.allOf(remoteWrites)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           if (request.getType() == Type.STREAM_HEADER
-              || request.getType() == Type.STREAM_DATA) {
+              || request.getType() == Type.STREAM_DATA || request.getType() == 
Type.STREAM_DATA_SYNC) {
             sendReply(remoteWrites, request, bytesWritten, ctx);
           } else if (request.getType() == Type.STREAM_CLOSE) {
             if (info.isPrimary()) {
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index da82151..b6ae878 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -286,8 +286,9 @@ message DataStreamPacketHeaderProto {
   enum Type {
     STREAM_HEADER = 0;
     STREAM_DATA = 1;
-    STREAM_CLOSE = 2;
-    START_TRANSACTION = 3;
+    STREAM_DATA_SYNC = 2;
+    STREAM_CLOSE = 3;
+    START_TRANSACTION = 4;
   }
 
   uint64 streamId = 1;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 351137d..74d4df5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -231,11 +231,18 @@ public interface StateMachine extends Closeable {
   }
 
   /**
+   * For write state machine data.
+   */
+  interface StateMachineDataChannel extends WritableByteChannel {
+    void force(boolean metadata) throws IOException;
+  }
+
+  /**
    * For streaming state machine data.
    */
   interface DataStream {
     /** @return a channel for streaming state machine data. */
-    WritableByteChannel getWritableByteChannel();
+    StateMachineDataChannel getWritableByteChannel();
 
     /**
      * Clean up asynchronously this stream.
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index b206b0c..5b7baf9 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -47,6 +47,7 @@ import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -132,8 +133,14 @@ abstract class DataStreamBaseTest extends BaseTest {
   static class SingleDataStream implements DataStream {
     private int byteWritten = 0;
     private final RaftClientRequest writeRequest;
+    private int forcedPosition = 0;
+
+    final StateMachineDataChannel channel = new StateMachineDataChannel() {
+      @Override
+      public void force(boolean metadata) throws IOException {
+        forcedPosition = byteWritten;
+      }
 
-    final WritableByteChannel channel = new WritableByteChannel() {
       private volatile boolean open = true;
 
       @Override
@@ -161,7 +168,7 @@ abstract class DataStreamBaseTest extends BaseTest {
     };
 
       @Override
-      public WritableByteChannel getWritableByteChannel() {
+      public StateMachineDataChannel getWritableByteChannel() {
         return channel;
       }
 
@@ -186,6 +193,10 @@ abstract class DataStreamBaseTest extends BaseTest {
     public RaftClientRequest getWriteRequest() {
       return writeRequest;
     }
+
+    public int getForcedPosition() {
+      return forcedPosition;
+    }
   }
 
   static class Server {
@@ -488,7 +499,7 @@ abstract class DataStreamBaseTest extends BaseTest {
       sizes.add(size);
 
       final ByteBuffer bf = initBuffer(dataSize, size);
-      futures.add(out.writeAsync(bf));
+      futures.add(out.writeAsync(bf, i == bufferNum - 1));
       dataSize += size;
     }
 
@@ -504,7 +515,7 @@ abstract class DataStreamBaseTest extends BaseTest {
       final DataStreamReply reply = futures.get(i).join();
       Assert.assertTrue(reply.isSuccess());
       Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
-      Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
+      Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? 
Type.STREAM_DATA_SYNC : Type.STREAM_DATA);
     }
     return dataSize;
   }
@@ -553,6 +564,7 @@ abstract class DataStreamBaseTest extends BaseTest {
     final SingleDataStream stream = s.getSingleDataStream(header);
     Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
     Assert.assertEquals(dataSize, stream.getByteWritten());
+    Assert.assertEquals(dataSize, stream.getForcedPosition());
 
     final RaftClientRequest writeRequest = stream.getWriteRequest();
     assertRaftClientMessage(header, writeRequest);

Reply via email to