This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2faf872  RATIS-1457. DataStreamManagement startTransaction error 
handler (#552)
2faf872 is described below

commit 2faf8721265ae2f9c3551c0d487ae50619e19ee0
Author: hao guo <[email protected]>
AuthorDate: Thu Dec 9 15:47:06 2021 +0800

    RATIS-1457. DataStreamManagement startTransaction error handler (#552)
---
 .../apache/ratis/netty/server/DataStreamManagement.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 98e9c9e..81621c3 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -323,16 +323,20 @@ public class DataStreamManagement {
     ctx.writeAndFlush(builder.build());
   }
 
-  private CompletableFuture<Void> startTransaction(StreamInfo info, 
DataStreamRequestByteBuf request, long bytesWritten,
-      ChannelHandlerContext ctx) {
+  private CompletableFuture<RaftClientReply> startTransaction(StreamInfo info, 
DataStreamRequestByteBuf request,
+      long bytesWritten, ChannelHandlerContext ctx) {
     try {
       AsyncRpcApi asyncRpcApi = (AsyncRpcApi) 
(server.getDivision(info.getRequest()
           .getRaftGroupId())
           .getRaftClient()
           .async());
-      return asyncRpcApi.sendForward(info.request).thenAcceptAsync(
-          reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, 
reply, bytesWritten, info.getCommitInfos())),
-          requestExecutor);
+      return asyncRpcApi.sendForward(info.request).whenCompleteAsync((reply, 
e) -> {
+        if (e != null) {
+          replyDataStreamException(server, e, info.getRequest(), request, ctx);
+        } else {
+          ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 
bytesWritten, info.getCommitInfos()));
+        }
+      }, requestExecutor);
     } catch (IOException e) {
       throw new CompletionException(e);
     }

Reply via email to