runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502093573



##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -140,6 +160,14 @@ ChannelFuture buildChannel() {
         .bind();
   }
 
+  public void setupClient(List<RaftPeer> otherPeers, RaftProperties 
properties) {

Review comment:
       public -> private

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ?

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of 
this patch, streamId always be one, also weird.

##########
File path: 
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new 
SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new 
SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, 
null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       I think this test does not cover "forward the data to the other 
servers".  If you comment the following code, this test still can pass.
   ```
         // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputs) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ```

##########
File path: 
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new 
SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new 
SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, 
null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       I think we need. @szetszwo What do you think ?

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -91,6 +106,11 @@ private void writeTo(ByteBuf buf, DataStream stream, 
boolean released) {
           throw new CompletionException(t);
         }
       }
+

Review comment:
       I think we can do `channel.write(buffer);` and 
`streamOutput.streamAsync(buf.nioBuffer());` in parallel, 
`channel.write(buffer)`  maybe slow when write file. But not sure, just a 
discussion.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -91,6 +106,11 @@ private void writeTo(ByteBuf buf, DataStream stream, 
boolean released) {
           throw new CompletionException(t);
         }
       }
+

Review comment:
       I think we can do `channel.write(buffer);` and 
`streamOutput.streamAsync(buf.nioBuffer());` in parallel, 
`channel.write(buffer)`  maybe slow when write file in ozone. But not sure, 
just a discussion.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of 
this patch, streamId always be one, also weird.
   In my understanding, each request has a unique streamId.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of 
this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to 
https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java.
 I think we can change by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), 
stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  
DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of 
this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to 
https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java.
 I think we can change by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (DataStreamClientImpl client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), 
stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  
DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of 
this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to 
[DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java).
 I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (DataStreamClientImpl client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), 
stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  
DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of 
this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to 
[DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java).
 I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, 
released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), 
stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  
DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of 
this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to 
[DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java).
 I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, 
released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), 
stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  
DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the 
streamId got from client->server ? 

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of 
this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId. I think we can 
change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, 
released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), 
stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  
DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the 
streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to