szetszwo commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507109304



##########
File path: 
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
##########
@@ -40,4 +40,4 @@ public long getDataLength() {
   public ByteBuffer slice() {
     return buffer.slice();
   }
-}
+}

Review comment:
       Please revert whitespace like this

##########
File path: 
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -166,8 +213,11 @@ public void runTestDataStream(){
     Assert.assertEquals(writeRequest.getCallId(), 
impl.getHeader().getCallId());
     Assert.assertEquals(writeRequest.getRaftGroupId(), 
impl.getHeader().getRaftGroupId());
     Assert.assertEquals(writeRequest.getServerId(), 
impl.getHeader().getServerId());
-
-    Assert.assertEquals(dataSize, byteWritten);
+    int actualBytesWritten = 0;
+    for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {
+      actualBytesWritten += s.getByteWritten();
+    }
+    Assert.assertEquals(dataSize * peers.size(), actualBytesWritten);

Review comment:
       Check individual sizes.
   ```
       for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {
         Assert.assertEquals(dataSize, s.getByteWritten());
       }
   ```
   

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final AtomicBoolean shouldRelease = new AtomicBoolean();
+
+        CompletableFuture<?>[] parallelWrites = new 
CompletableFuture<?>[clients.size() + 1];
+        CompletableFuture<?> localWrites =
+            streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf, shouldRelease))
+            .thenApply(stream -> writeTo(buf, stream, shouldRelease.get()));
+        parallelWrites[0] = localWrites;

Review comment:
       We may use isHeader to determine the cases.
   ```
           final CompletableFuture<?> localWrites = isHeader?
               streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf))
               : streams.get(request.getStreamId()).thenApply(stream -> 
writeTo(buf, stream));
           parallelWrites[parallelWrites.length - 1] = localWrites;
   ```

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,31 +65,44 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = 
new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput 
= new ConcurrentHashMap<>();
+
+  private List<DataStreamClientImpl> clients = new ArrayList<>();

Review comment:
       NettyServerStreamRpc should use the public internal api but not the 
impls.  Therefore, it should use DataStreamClient instead of 
DataStreamClientImpl.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -146,6 +190,13 @@ ChannelFuture buildChannel() {
         .bind();
   }
 
+  public void setupClient(List<RaftPeer> otherPeers, RaftProperties 
properties) {
+    for (RaftPeer peer : otherPeers) {
+      DataStreamClientImpl impl = new DataStreamClientImpl(peer, properties, 
null);

Review comment:
       Use DataStreamClient.Builder

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final AtomicBoolean shouldRelease = new AtomicBoolean();

Review comment:
       Change it to
   ```
   final boolean isHeader = request.getStreamOffset() == -1;
   ```

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final AtomicBoolean shouldRelease = new AtomicBoolean();
+
+        CompletableFuture<?>[] parallelWrites = new 
CompletableFuture<?>[clients.size() + 1];
+        CompletableFuture<?> localWrites =
+            streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf, shouldRelease))
+            .thenApply(stream -> writeTo(buf, stream, shouldRelease.get()));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), 
getDataStreamOutput());
+
+        for (int i = 0; i < clients.size(); i++) {
+          // do not need to forward header request
+          if (request.getStreamOffset() == -1) {
+            DataStreamClientImpl.DataStreamOutputImpl impl =
+                (DataStreamClientImpl.DataStreamOutputImpl) 
peersStreamOutput.get(request.getStreamId()).get(i);
+            parallelWrites[i + 1] = impl.getHeaderFuture();
+          } else {
+            CompletableFuture<?> remoteWrites;
+            if (shouldRelease.get()) {
+              remoteWrites = new CompletableFuture<>();
+            } else {
+              remoteWrites =
+                  
peersStreamOutput.get(request.getStreamId()).get(i).writeAsync(request.slice().nioBuffer());
+            }
+            parallelWrites[i + 1] = remoteWrites;
+          }
+        }

Review comment:
       Similar, use isHeader for the remote writes
   ```
           if (isHeader) {
             // header
             final List<DataStreamOutput> outs = getDataStreamOutput();
             peersStreamOutput.put(request.getStreamId(), outs);
             for (int i = 0; i < outs.size(); i++) {
               parallelWrites[i] = outs.get(i).getHeaderFuture();
             }
           } else {
             // body
             final List<DataStreamOutput> outs =  
peersStreamOutput.get(request.getStreamId());
             for (int i = 0; i < clients.size(); i++) {
               parallelWrites[i] = 
outs.get(i).writeAsync(request.slice().nioBuffer());
             }
           }
   ```
   
   Also, add getHeaderFuture() to DataStreamClient.

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       The correct way to make startClient to retry.  In a distributed system, 
we cannot assume that all the servers are already started.
   
   It needs more works for retrying.  I am fine to add 
NettyServerStreamRpc.startClientToPeers as a work around in the mean time.  
However, please remove DataStreamServerRpc.startClientToPeers.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -18,9 +18,17 @@
 
 package org.apache.ratis.netty.server;
 
+import java.util.stream.Collectors;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.impl.DataStreamClientImpl;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;

Review comment:
       Please sort the new imports with the existing imports.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -155,6 +206,13 @@ public void startServer() {
     channelFuture.syncUninterruptibly();
   }
 
+  @Override
+  public void startClientToPeers() {
+    for (DataStreamClientImpl client : clients) {
+      client.start();

Review comment:
       Add start() to DataStreamClient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to