This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new daa7afb RATIS-1122. Use thenApplyAsync instead of thenApply (#250)
daa7afb is described below
commit daa7afba3ce0b76b5fd9ec31dc49320fddaeb965
Author: runzhiwang <[email protected]>
AuthorDate: Wed Nov 4 19:25:09 2020 +0800
RATIS-1122. Use thenApplyAsync instead of thenApply (#250)
---
.../org/apache/ratis/netty/server/NettyServerStreamRpc.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 041d64d..0f24b0b 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -318,21 +318,21 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
} else {
info = streams.get(key);
- localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+ localWrite = info.getPrevious().get()
+ .thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf,
stream), executorService);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
}
- final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
- final CompletableFuture<?> current = previous.get()
- .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null,
executorService)
+ final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
sendReply(remoteWrites, request, bytesWritten, ctx);
return null;
}, executorService);
- previous.set(current);
+
+ info.getPrevious().set(current);
}
private boolean checkSuccessRemoteWrite(