This is an automated email from the ASF dual-hosted git repository.

guohao1225 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 49b400601 RATIS-1504. Add timeout handling to 
DataStreamManagement#checkSuccessRemoteWrite. (#1064)
49b400601 is described below

commit 49b4006015699625c445d70e2113fd5904b4590d
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Apr 12 00:58:23 2024 -0700

    RATIS-1504. Add timeout handling to 
DataStreamManagement#checkSuccessRemoteWrite. (#1064)
    
    * RATIS-1504. Add timeout handling to 
DataStreamManagement#checkSuccessRemoteWrite.
---
 .../ratis/netty/client/NettyClientReplies.java     | 29 ++++++++++++----------
 .../ratis/netty/client/NettyClientStreamRpc.java   |  9 +++----
 .../ratis/netty/server/DataStreamManagement.java   | 14 ++++++++---
 3 files changed, 30 insertions(+), 22 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientReplies.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientReplies.java
index fc97b6fe3..4c49b1d16 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientReplies.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientReplies.java
@@ -30,11 +30,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 public class NettyClientReplies {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientReplies.class);
@@ -56,8 +55,8 @@ public class NettyClientReplies {
 
     ReplyEntry submitRequest(RequestEntry requestEntry, boolean isClose, 
CompletableFuture<DataStreamReply> f) {
       LOG.debug("put {} to the map for {}", requestEntry, clientInvocationId);
-      final MemoizedSupplier<ReplyEntry> replySupplier = 
MemoizedSupplier.valueOf(() -> new ReplyEntry(isClose, f));
-      return map.computeIfAbsent(requestEntry, r -> replySupplier.get());
+      // ConcurrentHashMap.computeIfAbsent javadoc: the function is applied at 
most once per key.
+      return map.computeIfAbsent(requestEntry, r -> new ReplyEntry(isClose, 
f));
     }
 
     void receiveReply(DataStreamReply reply) {
@@ -147,7 +146,7 @@ public class NettyClientReplies {
   static class ReplyEntry {
     private final boolean isClosed;
     private final CompletableFuture<DataStreamReply> replyFuture;
-    private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new 
AtomicReference<>();
+    private ScheduledFuture<?> timeoutFuture; // for reply timeout
 
     ReplyEntry(boolean isClosed, CompletableFuture<DataStreamReply> 
replyFuture) {
       this.isClosed = isClosed;
@@ -158,22 +157,26 @@ public class NettyClientReplies {
       return isClosed;
     }
 
-    void complete(DataStreamReply reply) {
-      cancelTimeoutFuture();
+    synchronized void complete(DataStreamReply reply) {
+      cancel(timeoutFuture);
       replyFuture.complete(reply);
     }
 
-    void completeExceptionally(Throwable t) {
-      cancelTimeoutFuture();
+    synchronized void completeExceptionally(Throwable t) {
+      cancel(timeoutFuture);
       replyFuture.completeExceptionally(t);
     }
 
-    private void cancelTimeoutFuture() {
-      Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> f.cancel(false));
+    static void cancel(ScheduledFuture<?> future) {
+      if (future != null) {
+        future.cancel(true);
+      }
     }
 
-    void setTimeoutFuture(ScheduledFuture<?> timeoutFuture) {
-      this.timeoutFuture.compareAndSet(null, timeoutFuture);
+    synchronized void scheduleTimeout(Supplier<ScheduledFuture<?>> 
scheduleMethod) {
+      if (!replyFuture.isDone()) {
+        timeoutFuture = scheduleMethod.get();
+      }
     }
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index b2dc3812f..534fcc581 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -53,7 +53,6 @@ import 
org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
-import org.apache.ratis.thirdparty.io.netty.util.concurrent.ScheduledFuture;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.NetUtils;
@@ -466,15 +465,13 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         LOG.debug("{}: write after {}", this, request);
 
         final TimeDuration timeout = isClose ? closeTimeout : requestTimeout;
-        // if reply success cancel this future
-        final ScheduledFuture<?> timeoutFuture = 
channel.eventLoop().schedule(() -> {
+        replyEntry.scheduleTimeout(() -> channel.eventLoop().schedule(() -> {
           if (!f.isDone()) {
             f.completeExceptionally(new TimeoutIOException(
-                "Timeout " + timeout + ": Failed to send " + request + " 
channel: " + channel));
+                "Timeout " + timeout + ": Failed to send " + request + " via 
channel " + channel));
             replyMap.fail(requestEntry);
           }
-        }, timeout.toLong(timeout.getUnit()), timeout.getUnit());
-        replyEntry.setTimeoutFuture(timeoutFuture);
+        }, timeout.getDuration(), timeout.getUnit()));
       }
     });
     return f;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index e265d8b92..74d5cd7fd 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -18,6 +18,7 @@
 
 package org.apache.ratis.netty.server;
 
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
@@ -219,6 +220,7 @@ public class DataStreamManagement {
   private final ChannelMap channels;
   private final ExecutorService requestExecutor;
   private final ExecutorService writeExecutor;
+  private final TimeDuration requestTimeout;
 
   private final NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics;
 
@@ -235,6 +237,7 @@ public class DataStreamManagement {
     this.writeExecutor = 
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
           RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
           name + "-write-");
+    this.requestTimeout = 
RaftClientConfigKeys.DataStream.requestTimeout(server.getProperties());
 
     this.nettyServerStreamRpcMetrics = metrics;
   }
@@ -339,7 +342,7 @@ public class DataStreamManagement {
         .build();
   }
 
-  static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
+  private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
       DataStreamRequestByteBuf request, long bytesWritten, 
Collection<CommitInfoProto> commitInfos,
       ChannelHandlerContext ctx) {
     final boolean success = checkSuccessRemoteWrite(remoteWrites, 
bytesWritten, request);
@@ -493,10 +496,15 @@ public class DataStreamManagement {
     Preconditions.assertTrue(request.getStreamOffset() == 
reply.getStreamOffset());
   }
 
-  static boolean 
checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, 
long bytesWritten,
+  private boolean 
checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, 
long bytesWritten,
       final DataStreamRequestByteBuf request) {
     for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
-      final DataStreamReply reply = replyFuture.join();
+      final DataStreamReply reply;
+      try {
+        reply = replyFuture.get(requestTimeout.getDuration(), 
requestTimeout.getUnit());
+      } catch (Exception e) {
+        throw new CompletionException("Failed to get reply for bytesWritten=" 
+ bytesWritten + ", " + request, e);
+      }
       assertReplyCorrespondingToRequest(request, reply);
       if (!reply.isSuccess()) {
         LOG.warn("reply is not success, request: {}", request);

Reply via email to