This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2faf872 RATIS-1457. DataStreamManagement startTransaction error
handler (#552)
2faf872 is described below
commit 2faf8721265ae2f9c3551c0d487ae50619e19ee0
Author: hao guo <[email protected]>
AuthorDate: Thu Dec 9 15:47:06 2021 +0800
RATIS-1457. DataStreamManagement startTransaction error handler (#552)
---
.../apache/ratis/netty/server/DataStreamManagement.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 98e9c9e..81621c3 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -323,16 +323,20 @@ public class DataStreamManagement {
ctx.writeAndFlush(builder.build());
}
- private CompletableFuture<Void> startTransaction(StreamInfo info,
DataStreamRequestByteBuf request, long bytesWritten,
- ChannelHandlerContext ctx) {
+ private CompletableFuture<RaftClientReply> startTransaction(StreamInfo info,
DataStreamRequestByteBuf request,
+ long bytesWritten, ChannelHandlerContext ctx) {
try {
AsyncRpcApi asyncRpcApi = (AsyncRpcApi)
(server.getDivision(info.getRequest()
.getRaftGroupId())
.getRaftClient()
.async());
- return asyncRpcApi.sendForward(info.request).thenAcceptAsync(
- reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request,
reply, bytesWritten, info.getCommitInfos())),
- requestExecutor);
+ return asyncRpcApi.sendForward(info.request).whenCompleteAsync((reply,
e) -> {
+ if (e != null) {
+ replyDataStreamException(server, e, info.getRequest(), request, ctx);
+ } else {
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply,
bytesWritten, info.getCommitInfos()));
+ }
+ }, requestExecutor);
} catch (IOException e) {
throw new CompletionException(e);
}