This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ad0389  RATIS-1445. NettyClientStreamRpc exception handling (#541)
2ad0389 is described below

commit 2ad0389037dd56dda2a609040ec68aa3cb6bdbd0
Author: hao guo <[email protected]>
AuthorDate: Mon Nov 29 22:47:56 2021 +0800

    RATIS-1445. NettyClientStreamRpc exception handling (#541)
---
 .../java/org/apache/ratis/client/impl/OrderedStreamAsync.java |  5 ++++-
 .../org/apache/ratis/netty/client/NettyClientStreamRpc.java   | 11 ++++++++++-
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 94814ca..107e741 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -114,7 +114,10 @@ public class OrderedStreamAsync {
     final LongFunction<DataStreamWindowRequest> constructor
         = seqNum -> new DataStreamWindowRequest(header, data, seqNum);
     return slidingWindow.submitNewRequest(constructor, 
this::sendRequestToNetwork).
-           getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
+           getReplyFuture().whenComplete((r, e) -> {
+      LOG.error("Failed to send request, header=" + header, e);
+      requestSemaphore.release();
+    });
   }
 
   private void sendRequestToNetwork(DataStreamWindowRequest request){
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 8e60a2e..39a73fd 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -41,6 +41,7 @@ import org.apache.ratis.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
@@ -110,6 +111,9 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
   private ChannelInboundHandler getClientHandler(){
     return new ChannelInboundHandlerAdapter(){
+
+      private ClientInvocationId clientInvocationId;
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         if (!(msg instanceof DataStreamReply)) {
@@ -118,7 +122,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         }
         final DataStreamReply reply = (DataStreamReply) msg;
         LOG.debug("{}: read {}", this, reply);
-        ClientInvocationId clientInvocationId = 
ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
+        clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), 
reply.getStreamId());
         Optional.ofNullable(replies.get(clientInvocationId))
             .map(Queue::poll)
             .ifPresent(f -> f.complete(reply));
@@ -126,6 +130,11 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        Optional.ofNullable(clientInvocationId)
+            .map(replies::remove)
+            .orElseGet(LinkedList::new)
+            .forEach(f -> f.completeExceptionally(cause));
+
         LOG.warn(name + ": exceptionCaught", cause);
         ctx.close();
       }

Reply via email to