runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r503645201



##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new 
DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        
ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       @amaliujia The point of @szetszwo is there are many sub write requests 
of one big write request, for example, when client want to write 640KB data, it 
will split it into 10 sub requests, each sub request write 64KB, when server1 
receive the [0, 64KB), it will forward [0, 64KB) to server2 and server3, but if 
we do future.join, before finish forwarding [0, 64KB), server1 can not read 
[64KB, 128KB), it will delay the streaming.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to