This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8f3b684 RATIS-1478 NettyClientStreamRpc error reply handling (#571)
8f3b684 is described below
commit 8f3b68449bd5626b9e864d3ff993e575be429256
Author: hao guo <[email protected]>
AuthorDate: Mon Jan 10 22:20:56 2022 +0800
RATIS-1478 NettyClientStreamRpc error reply handling (#571)
---
.../apache/ratis/netty/client/NettyClientStreamRpc.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 1734e5b..9ecc98b 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -112,6 +112,10 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
return queue.poll();
}
+ int size() {
+ return queue.size();
+ }
+
@Override
public Iterator<CompletableFuture<DataStreamReply>> iterator() {
return queue.iterator();
@@ -156,11 +160,19 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
final DataStreamReply reply = (DataStreamReply) msg;
LOG.debug("{}: read {}", this, reply);
clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(),
reply.getStreamId());
- final ReplyQueue queue = replies.get(clientInvocationId);
+ final ReplyQueue queue = reply.isSuccess() ?
replies.get(clientInvocationId) :
+ replies.remove(clientInvocationId);
if (queue != null) {
final CompletableFuture<DataStreamReply> f = queue.poll();
if (f != null) {
f.complete(reply);
+
+ if (!reply.isSuccess() && queue.size() > 0) {
+ final IllegalStateException e = new IllegalStateException(
+ this + ": an earlier request failed with " + reply);
+ queue.forEach(future -> future.completeExceptionally(e));
+ }
+
final Integer emptyId = queue.getEmptyId();
if (emptyId != null) {
timeoutScheduler.onTimeout(replyQueueGracePeriod,
@@ -175,6 +187,8 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.warn(name + ": exceptionCaught", cause);
+
Optional.ofNullable(clientInvocationId)
.map(replies::remove)
.orElse(ReplyQueue.EMPTY)