runzhiwang commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r515811727
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
private final List<DataStreamOutput> outs;
private final AtomicReference<CompletableFuture<?>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final ExecutorService writeExecutor;
Review comment:
@szetszwo Thanks the suggestions. I agree. Besides, I am not sure one
thread is enough for all streams. So I create 4 ExecutorService, each has a
single thread in the pool, and assign one of the 4 ExecutorService with the
fewest tasks to the new stream. What do you think ?
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
private final List<DataStreamOutput> outs;
private final AtomicReference<CompletableFuture<?>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final ExecutorService writeExecutor;
Review comment:
@szetszwo Thanks the suggestions. I agree. Besides, I am not sure one
thread is enough for all streams. So I create 4 ExecutorService, each has a
single thread in the pool, and assign one of the 4 ExecutorService with the
fewest tasks to the new stream. Because one stream must write use the same
thread, so I can not create a ExecutorService with 4 threads, and all the
streams share the same ExecutorService. the What do you think ?
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
private final List<DataStreamOutput> outs;
private final AtomicReference<CompletableFuture<?>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final ExecutorService writeExecutor;
Review comment:
@szetszwo Thanks the suggestions. I agree. Besides, I am not sure one
thread is enough for all streams. So I create 4 ExecutorService, each has a
single thread in the pool, and assign one of the 4 ExecutorService with the
fewest tasks to the new stream. Because one stream must use the same thread to
write, so I can not create a ExecutorService with 4 threads, and all the
streams share the same ExecutorService. the What do you think ?
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
private final List<DataStreamOutput> outs;
private final AtomicReference<CompletableFuture<?>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final ExecutorService writeExecutor;
Review comment:
@szetszwo Thanks the suggestions. I agree. Besides, I am not sure one
thread is enough for all streams. So I create 4 ExecutorService, each has a
single thread in the pool, and assign one of the 4 ExecutorService with the
fewest tasks to the new stream. Because one stream must use the same thread to
write, so I can not create a ExecutorService with 4 threads, and all the
streams share the same ExecutorService. What do you think ?
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
@szetszwo It will cause error. Because one stream must use the same
thread to write, if we use NettyServerStreamRpc.executorService, it will happen
multi-thread call the write belongs to the same stream, error will happen. Then
we must add synchronized to write method, which is not appropriate, because it
will cause thread wait for lock .
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
@szetszwo It will cause error. Because one stream must use the same
thread to write, if we use NettyServerStreamRpc.executorService, it will happen
multi-thread call the write belongs to the same stream, error will happen. Then
we must add synchronized to write method, which is not appropriate, because it
will cause thread wait for lock . So I create 4 ExecutorService, each has a
single thread in the pool, and assign one of the 4 ExecutorService with the
fewest tasks to the new stream, then each stream use the same thread to write,
and no thread need to wait for lock.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
@szetszwo It will cause error.
1. Because one stream must use the same thread to write, if we use
NettyServerStreamRpc.executorService, it will happen multi-thread call the
write belongs to the same stream, error will happen. Then we must add
synchronized to write method, which is not appropriate, because it will cause
thread wait for lock .
2. So I create 4 ExecutorService, each has a single thread in the pool, and
assign one of the 4 ExecutorService with the fewest tasks to the new stream,
then each stream use the same thread to write, and no thread need to wait for
lock.
3. I remove NettyServerStreamRpc.executorService, and use the stream's
assigned ExecutorService.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
@szetszwo It will cause error.
1. Because one stream must use the same thread to write, if we use
NettyServerStreamRpc.executorService, it will happen multi-thread call the
write belongs to the same stream at the same time, error will happen. Then we
must add synchronized to write method, which is not appropriate, because it
will cause thread wait for lock .
2. So I create 4 ExecutorService, each has a single thread in the pool, and
assign one of the 4 ExecutorService with the fewest tasks to the new stream,
then each stream use the same thread to write, and no thread need to wait for
lock.
3. I remove NettyServerStreamRpc.executorService, and use the stream's
assigned ExecutorService.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
> for any moment of time, our program must guarantee only one thread
calling channel.write(buffer)
@szetszwo Current implementation can not guarantee only one thread calling
channel.write(buffer) at the same time.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
> for any moment of time, our program must guarantee only one thread
calling channel.write(buffer)
@szetszwo Current implementation can not guarantee only one thread calling
channel.write(buffer) at the same time. For example, If we only change
`localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));` to
`localWrite = info.getStream().thenApplyAsync(stream -> writeTo(buf, stream),
executorService);`. we can not pass the UT.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private final List<ExecutorService> streamExecutors = new ArrayList<>();
+
Review comment:
@szetszwo Could you help review this comment ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]