This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ad3cd5a  RATIS-1104.Should check "bytewritten" from remote writes to 
peers to determine whether current stream write is successful (#230)
ad3cd5a is described below

commit ad3cd5ae6c731d2f797890614a4f7bbcaa3ae818
Author: Rui Wang <[email protected]>
AuthorDate: Thu Oct 22 00:41:20 2020 -0700

    RATIS-1104.Should check "bytewritten" from remote writes to peers to 
determine whether current stream write is successful (#230)
    
    * RATIS-1104.Should check "bytewritten" from remote writes to peers to 
determine whether current stream write is successful
    
    * fixup! reorder imports
    
    * fixup! address comments
    
    * fixup! address comments
    
    * fixup! fix style
---
 .../ratis/netty/server/NettyServerStreamRpc.java   | 45 +++++++++++++++++++---
 1 file changed, 39 insertions(+), 6 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 59ec77b..54baea6 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -40,6 +40,7 @@ import 
org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
 import 
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
+import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.PeerProxyMap;
@@ -58,6 +59,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
@@ -176,21 +178,40 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long bytesWritten, 
ChannelHandlerContext ctx) {
+  private void sendReplyNotSuccess(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx) {
+    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+        request.getStreamId(), request.getStreamOffset(), null, -1, false);
+    ctx.writeAndFlush(reply);
+  }
+
+  private void sendReplySuccess(DataStreamRequestByteBuf request, long 
bytesWritten, ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
         request.getStreamId(), request.getStreamOffset(), null, bytesWritten, 
true);
     ctx.writeAndFlush(reply);
   }
 
+  private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
+      DataStreamRequestByteBuf request, long bytesWritten, 
ChannelHandlerContext ctx) {
+    try {
+      if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
+        sendReplyNotSuccess(request, ctx);
+      } else {
+        sendReplySuccess(request, bytesWritten, ctx);
+      }
+    } catch (ExecutionException | InterruptedException e) {
+      IOUtils.asIOException(e);
+    }
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException {
-        final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
+        final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf) 
msg;
         final ByteBuf buf = request.slice();
         final boolean isHeader = request.getStreamOffset() == -1;
 
-        final CompletableFuture<Long> localWrite = isHeader?
+        final CompletableFuture<Long> localWrite = isHeader ?
                 streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf)).thenApply(stream -> 0L)
                 : streams.get(request.getStreamId()).thenApply(stream -> 
writeTo(buf, stream));
 
@@ -199,25 +220,37 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
           // do not need to forward header request
           final List<DataStreamOutput> outs = proxies.getDataStreamOutput();
           peersStreamOutput.put(request.getStreamId(), outs);
-          for(DataStreamOutput out : outs) {
+          for (DataStreamOutput out : outs) {
             remoteWrites.add(out.getHeaderFuture());
           }
         } else {
           // body
-          for(DataStreamOutput out : 
peersStreamOutput.get(request.getStreamId())) {
+          for (DataStreamOutput out : 
peersStreamOutput.get(request.getStreamId())) {
             remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
           }
         }
 
         JavaUtils.allOf(remoteWrites).thenCombine(localWrite, (v, 
bytesWritten) -> {
               buf.release();
-              sendReply(request, bytesWritten, ctx);
+              sendReply(remoteWrites, request, bytesWritten, ctx);
               return null;
         });
       }
     };
   }
 
+  private boolean checkSuccessRemoteWrite(
+          List<CompletableFuture<DataStreamReply>> replyFutures, long 
bytesWritten)
+          throws ExecutionException, InterruptedException {
+    for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
+      DataStreamReply reply = replyFuture.get();
+      if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private ChannelInitializer<SocketChannel> getInitializer(){
     return new ChannelInitializer<SocketChannel>(){
       @Override

Reply via email to