szetszwo commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r515788059



##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       We should have a single executor for all the streams.  Otherwise, it 
requires a thread per stream.  Since threads are limited resource, the number 
of streams will be limited by the threads.  Also, creating more threads is not 
useful since io is bounded by the hardware (e.g. # of discs).
   
   HDFS Datanode has the problem that it needs a thread per pipeline so that 
the number of pipeline is limited by it.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       After RATIS-1126, we may simply use. 
NettyServerStreamRpc.executorService.  Or, we may add another 
   ```
   private final ExecutorService streamExecutor;
   ```
   for the stream write() and close().

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -126,11 +127,14 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService executorService;

Review comment:
       We don't have to add executorService in StreamInfo.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       > Because one stream must use the same thread to write, ...
   
   No.  We should not require using the same thread.  This is not the async 
model.
   
   Think about this: for any moment of time, our program must guarantee only 
one thread calling channel.write(buffer).  Therefore, the program must be 
synchronise somewhere.  So, we may safely use a thread-pool (with multiple 
threads).

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       > Because one stream must use the same thread to write, ...
   
   No.  We should not require using the same thread.  This is not the async 
model.
   
   Think about this: for any moment of time, our program must guarantee only 
one thread calling channel.write(buffer).  Therefore, the program must be 
synchronised somewhere.  So, we may safely use a thread-pool (with multiple 
threads).

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       > ... I am not sure one thread is enough for all streams. ...
   
   I mean a single executor with multiple threads.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -224,8 +232,26 @@ public NettyServerStreamRpc(RaftServer server) {
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .bind(port);
     this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> 
newClient(peer, properties)));
-    this.executorService = Executors.newFixedThreadPool(
-        
RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
+
+    int threadPoolSize = 
RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties());
+    for (int i = 0; i < threadPoolSize; i ++) {
+      this.streamExecutors.add(Executors.newFixedThreadPool(1));
+    }
+  }
+
+  private synchronized ExecutorService getMinTaskExecutor() {
+    ExecutorService minTaskExecutor = null;
+    int minTaskNum = Integer.MAX_VALUE;
+
+    for (ExecutorService e : streamExecutors) {
+      ThreadPoolExecutor tpe = (ThreadPoolExecutor) e;
+      if (minTaskNum > tpe.getQueue().size()) {
+        minTaskNum = tpe.getQueue().size();
+        minTaskExecutor = e;
+      }
+    }
+
+    return minTaskExecutor;
   }

Review comment:
       We should just use the executor service but not try to do thread 
scheduling ourselves.  The existing ExecutorService is good enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to