szetszwo commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r515788059
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
private final List<DataStreamOutput> outs;
private final AtomicReference<CompletableFuture<?>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final ExecutorService writeExecutor;
Review comment:
We should have a single executor for all the streams. Otherwise, it
requires a thread per stream. Since threads are limited resource, the number
of streams will be limited by the threads. Also, creating more threads is not
useful since io is bounded by the hardware (e.g. # of discs).
HDFS Datanode has the problem that it needs a thread per pipeline so that
the number of pipeline is limited by it.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
After RATIS-1126, we may simply use.
NettyServerStreamRpc.executorService. Or, we may add another
```
private final ExecutorService streamExecutor;
```
for the stream write() and close().
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -126,11 +127,14 @@ void close() {
private final List<DataStreamOutput> outs;
private final AtomicReference<CompletableFuture<?>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final ExecutorService executorService;
Review comment:
We don't have to add executorService in StreamInfo.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
> Because one stream must use the same thread to write, ...
No. We should not require using the same thread. This is not the async
model.
Think about this: for any moment of time, our program must guarantee only
one thread calling channel.write(buffer). Therefore, the program must be
synchronise somewhere. So, we may safely use a thread-pool (with multiple
threads).
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
> Because one stream must use the same thread to write, ...
No. We should not require using the same thread. This is not the async
model.
Think about this: for any moment of time, our program must guarantee only
one thread calling channel.write(buffer). Therefore, the program must be
synchronised somewhere. So, we may safely use a thread-pool (with multiple
threads).
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
private final List<DataStreamOutput> outs;
private final AtomicReference<CompletableFuture<?>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final ExecutorService writeExecutor;
Review comment:
> ... I am not sure one thread is enough for all streams. ...
I mean a single executor with multiple threads.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -224,8 +232,26 @@ public NettyServerStreamRpc(RaftServer server) {
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(port);
this.proxies = new Proxies(new PeerProxyMap<>(name, peer ->
newClient(peer, properties)));
- this.executorService = Executors.newFixedThreadPool(
-
RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
+
+ int threadPoolSize =
RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties());
+ for (int i = 0; i < threadPoolSize; i ++) {
+ this.streamExecutors.add(Executors.newFixedThreadPool(1));
+ }
+ }
+
+ private synchronized ExecutorService getMinTaskExecutor() {
+ ExecutorService minTaskExecutor = null;
+ int minTaskNum = Integer.MAX_VALUE;
+
+ for (ExecutorService e : streamExecutors) {
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor) e;
+ if (minTaskNum > tpe.getQueue().size()) {
+ minTaskNum = tpe.getQueue().size();
+ minTaskExecutor = e;
+ }
+ }
+
+ return minTaskExecutor;
}
Review comment:
We should just use the executor service but not try to do thread
scheduling ourselves. The existing ExecutorService is good enough.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]