szetszwo commented on a change in pull request #571:
URL: https://github.com/apache/ratis/pull/571#discussion_r776167118
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -158,9 +196,18 @@ public void channelRead(ChannelHandlerContext ctx, Object
msg) {
clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(),
reply.getStreamId());
final ReplyQueue queue = replies.get(clientInvocationId);
Review comment:
Let's call remove(..) if the reply is NOT success.
```
final ReplyQueue queue = reply.isSuccess()?
replies.get(clientInvocationId): replies.remove(clientInvocationId);
```
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -89,35 +90,72 @@ void shutdownGracefully() {
}
}
- static class ReplyQueue implements
Iterable<CompletableFuture<DataStreamReply>> {
+ static class ReplyQueue implements Iterable<DataStreamRequestEntry> {
static final ReplyQueue EMPTY = new ReplyQueue();
- private final Queue<CompletableFuture<DataStreamReply>> queue = new
ConcurrentLinkedQueue<>();
+ private final Queue<DataStreamRequestEntry> queue = new
ConcurrentLinkedQueue<>();
private int emptyId;
/** @return an empty ID if the queue is empty; otherwise, the queue is
non-empty, return null. */
synchronized Integer getEmptyId() {
return queue.isEmpty()? emptyId: null;
}
- synchronized boolean offer(CompletableFuture<DataStreamReply> f) {
+ synchronized boolean offer(DataStreamRequestEntry f) {
if (queue.offer(f)) {
emptyId++;
return true;
}
return false;
}
- CompletableFuture<DataStreamReply> poll() {
+ DataStreamRequestEntry poll() {
return queue.poll();
}
+ int size() {
+ return queue.size();
+ }
+
@Override
- public Iterator<CompletableFuture<DataStreamReply>> iterator() {
+ public Iterator<DataStreamRequestEntry> iterator() {
return queue.iterator();
}
}
+ static class DataStreamRequestEntry {
+ private final CompletableFuture<DataStreamReply> replyFuture;
+ private final DataStreamRequest request;
Review comment:
The request is for building the reply. The reply should not be built in
the client side. Just complete the future exceptionally.
Also, the request has data which needs memory.
Since we don't need the request, we don't have to create the new
DataStreamRequestEntry class.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -158,9 +196,18 @@ public void channelRead(ChannelHandlerContext ctx, Object
msg) {
clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(),
reply.getStreamId());
final ReplyQueue queue = replies.get(clientInvocationId);
if (queue != null) {
- final CompletableFuture<DataStreamReply> f = queue.poll();
+ final DataStreamRequestEntry f = queue.poll();
if (f != null) {
- f.complete(reply);
+ f.getReplyFuture().complete(reply);
+
+ if (!reply.isSuccess() && queue.size() > 0) {
+ replies.remove(clientInvocationId).forEach(cf -> {
+ if (!cf.getReplyFuture().isDone()) {
+ cf.getReplyFuture().complete(cf.getErrorDataStreamReply());
+ }
+ });
+ }
Review comment:
Just complete the futures exceptionally and don't build a reply.
```
if (!reply.isSuccess() && queue.size() > 0) {
final IllegalStateException e = new IllegalStateException(
this + ": an earlier request failed with " + reply);
queue.forEach(future -> future.completeExceptionally(e));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]