This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f3b684  RATIS-1478 NettyClientStreamRpc error reply handling (#571)
8f3b684 is described below

commit 8f3b68449bd5626b9e864d3ff993e575be429256
Author: hao guo <[email protected]>
AuthorDate: Mon Jan 10 22:20:56 2022 +0800

    RATIS-1478 NettyClientStreamRpc error reply handling (#571)
---
 .../apache/ratis/netty/client/NettyClientStreamRpc.java  | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 1734e5b..9ecc98b 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -112,6 +112,10 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
       return queue.poll();
     }
 
+    int size() {
+      return queue.size();
+    }
+
     @Override
     public Iterator<CompletableFuture<DataStreamReply>> iterator() {
       return queue.iterator();
@@ -156,11 +160,19 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         final DataStreamReply reply = (DataStreamReply) msg;
         LOG.debug("{}: read {}", this, reply);
         clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), 
reply.getStreamId());
-        final ReplyQueue queue = replies.get(clientInvocationId);
+        final ReplyQueue queue = reply.isSuccess() ? 
replies.get(clientInvocationId) :
+                replies.remove(clientInvocationId);
         if (queue != null) {
           final CompletableFuture<DataStreamReply> f = queue.poll();
           if (f != null) {
             f.complete(reply);
+
+            if (!reply.isSuccess() && queue.size() > 0) {
+              final IllegalStateException e = new IllegalStateException(
+                  this + ": an earlier request failed with " + reply);
+              queue.forEach(future -> future.completeExceptionally(e));
+            }
+
             final Integer emptyId = queue.getEmptyId();
             if (emptyId != null) {
               timeoutScheduler.onTimeout(replyQueueGracePeriod,
@@ -175,6 +187,8 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        LOG.warn(name + ": exceptionCaught", cause);
+
         Optional.ofNullable(clientInvocationId)
             .map(replies::remove)
             .orElse(ReplyQueue.EMPTY)

Reply via email to