szetszwo commented on code in PR #740:
URL: https://github.com/apache/ratis/pull/740#discussion_r1559941222
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java:
##########
@@ -417,24 +366,45 @@ protected void decode(ChannelHandlerContext context,
ByteBuf buf, List<Object> o
public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest
request) {
final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
ClientInvocationId clientInvocationId =
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
- final ReplyQueue q = replies.computeIfAbsent(clientInvocationId, key ->
new ReplyQueue());
- if (!q.offer(f)) {
- f.completeExceptionally(new IllegalStateException(this + ": Failed to
offer a future for " + request));
- return f;
- }
- final Channel channel = connection.getChannelUninterruptibly();
- if (channel == null) {
- f.completeExceptionally(new AlreadyClosedException(this + ": Failed to
send " + request));
- return f;
+ final boolean isClose =
request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
+
+ final NettyClientReplies.ReplyMap replyMap =
replies.getReplyMap(clientInvocationId);
+ final ChannelFuture channelFuture;
+ final Channel channel;
+ final NettyClientReplies.RequestEntry requestEntry = new
NettyClientReplies.RequestEntry(request);
+ final NettyClientReplies.ReplyEntry replyEntry;
+ LOG.debug("{}: write begin {}", this, request);
+ synchronized (replyMap) {
+ channel = connection.getChannelUninterruptibly();
+ if (channel == null) {
+ f.completeExceptionally(new AlreadyClosedException(this + ": Failed to
send " + request));
+ return f;
+ }
+ replyEntry = replyMap.submitRequest(requestEntry, isClose, f);
+ final Function<DataStreamRequest, ChannelFuture> writeMethod =
outstandingRequests.write(request)?
+ channel::writeAndFlush: channel::write;
+ channelFuture = writeMethod.apply(request);
}
- LOG.debug("{}: write {}", this, request);
- final Function<DataStreamRequest, ChannelFuture> writeMethod =
outstandingRequests.write(request)?
- channel::writeAndFlush: channel::write;
- writeMethod.apply(request).addListener(future -> {
+ channelFuture.addListener(future -> {
if (!future.isSuccess()) {
- final IOException e = new IOException(this + ": Failed to send " +
request, future.cause());
- LOG.error("Channel write failed", e);
+ final IOException e = new IOException(this + ": Failed to send " +
request + " to " + channel.remoteAddress(),
+ future.cause());
f.completeExceptionally(e);
+ replyMap.fail(requestEntry);
+ LOG.error("Channel write failed", e);
+ } else {
+ LOG.debug("{}: write after {}", this, request);
+
+ final TimeDuration timeout = isClose ? closeTimeout : requestTimeout;
+ // if reply success cancel this future
+ final ScheduledFuture<?> timeoutFuture =
channel.eventLoop().schedule(() -> {
+ if (!f.isDone()) {
+ f.completeExceptionally(new TimeoutIOException(
+ "Timeout " + timeout + ": Failed to send " + request + "
channel: " + channel));
+ replyMap.fail(requestEntry);
+ }
+ }, timeout.toLong(timeout.getUnit()), timeout.getUnit());
+ replyEntry.setTimeoutFuture(timeoutFuture);
}
Review Comment:
@guohao-rosicky , The timeout schedule may not work as expected since the
code here is executed after `operationComplete` (i.e. the write is completed)
as specified in `GenericFutureListener`.
Found this problem when working on RATIS-1504. Will fix it there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]