This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b940b6500 RATIS-2400. Support timeout and interrupt handling in 
GrpcClientRpc. (#1342)
b940b6500 is described below

commit b940b65004bea5f15593e2bbcb5ce925c0580b39
Author: slfan1989 <[email protected]>
AuthorDate: Sun Feb 8 11:09:22 2026 +0800

    RATIS-2400. Support timeout and interrupt handling in GrpcClientRpc. (#1342)
---
 .../apache/ratis/grpc/client/GrpcClientRpc.java    | 66 ++++++++++++++++------
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  | 54 ++++++++++++++++++
 2 files changed, 104 insertions(+), 16 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 4010ade27..65175dc2a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.grpc.client;
 
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
 import org.apache.ratis.conf.RaftProperties;
@@ -24,6 +25,8 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
@@ -39,6 +42,7 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,12 +50,16 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 public class GrpcClientRpc extends 
RaftClientRpcWithProxy<GrpcClientProtocolClient> {
   public static final Logger LOG = 
LoggerFactory.getLogger(GrpcClientRpc.class);
 
   private final ClientId clientId;
   private final int maxMessageSize;
+  private final TimeDuration requestTimeoutDuration;
+  private final TimeDuration watchRequestTimeoutDuration;
 
   public GrpcClientRpc(ClientId clientId, RaftProperties properties,
       SslContext adminSslContext, SslContext clientSslContext) {
@@ -59,6 +67,8 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<GrpcClientProtocolClie
         p -> new GrpcClientProtocolClient(clientId, p, properties, 
adminSslContext, clientSslContext)));
     this.clientId = clientId;
     this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, 
LOG::debug).getSizeInt();
+    this.requestTimeoutDuration = 
RaftClientConfigKeys.Rpc.requestTimeout(properties);
+    this.watchRequestTimeoutDuration = 
RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
   }
 
   @Override
@@ -121,24 +131,11 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<GrpcClientProtocolClie
           ((LeaderElectionManagementRequest) request);
       return 
ClientProtoUtils.toRaftClientReply(proxy.leaderElectionManagement(proto));
     } else {
-      final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
-      // TODO: timeout support
-      try {
-        return f.get();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new InterruptedIOException(
-            "Interrupted while waiting for response of request " + request);
-      } catch (ExecutionException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(clientId + ": failed " + request, e);
-        }
-        throw IOUtils.toIOException(e);
-      }
+      return sendRequest(request, proxy);
     }
   }
 
-  private CompletableFuture<RaftClientReply> sendRequest(
+  private RaftClientReply sendRequest(
       RaftClientRequest request, GrpcClientProtocolClient proxy) throws 
IOException {
     final RaftClientRequestProto requestProto =
         toRaftClientRequestProto(request);
@@ -167,7 +164,44 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<GrpcClientProtocolClie
     requestObserver.onNext(requestProto);
     requestObserver.onCompleted();
 
-    return replyFuture.thenApply(ClientProtoUtils::toRaftClientReply);
+    final TimeDuration timeout = getTimeoutDuration(request);
+    try {
+      return replyFuture.thenApply(ClientProtoUtils::toRaftClientReply)
+          .get(timeout.getDuration(), timeout.getUnit());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      replyFuture.cancel(true);
+      final InterruptedIOException ioe = new InterruptedIOException(clientId + 
": Interrupted " + request);
+      sendOnError(requestObserver, Status.CANCELLED, ioe.getMessage());
+      throw ioe;
+    } catch (TimeoutException e) {
+      replyFuture.cancel(true);
+      final TimeoutIOException ioe =
+          new TimeoutIOException(clientId + ": Timed out " + timeout + " for " 
+ request, e);
+      sendOnError(requestObserver, Status.DEADLINE_EXCEEDED, ioe.getMessage());
+      throw ioe;
+    } catch (ExecutionException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("{} : failed {}", clientId, request, e);
+      }
+      throw IOUtils.toIOException(e);
+    }
+  }
+
+  private void sendOnError(StreamObserver<RaftClientRequestProto> 
requestObserver, Status status, String message) {
+    try {
+      requestObserver.onError(status.withDescription(message).asException());
+    } catch (Exception ignored) {
+      // the stream already closed.
+    }
+  }
+
+  private TimeDuration getTimeoutDuration(RaftClientRequest request) {
+    final long timeoutMs = request.getTimeoutMs();
+    if (timeoutMs > 0) {
+      return TimeDuration.valueOf(timeoutMs, TimeUnit.MILLISECONDS);
+    }
+    return request.is(RaftClientRequestProto.TypeCase.WATCH) ? 
watchRequestTimeoutDuration : requestTimeoutDuration;
   }
 
   private RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest 
request) throws IOException {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 322eb5228..100fb8d5d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -69,6 +69,7 @@ import org.slf4j.event.Level;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.TrustManager;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -81,6 +82,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TestRaftServerWithGrpc extends BaseTest implements 
MiniRaftClusterWithGrpc.FactoryGet {
   {
@@ -238,6 +240,58 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
     runWithNewCluster(3, this::testRaftClientRequestMetrics);
   }
 
+  @ParameterizedTest
+  @MethodSource("data")
+  public void testGrpcClientRpcSyncTimeout(Boolean separateHeartbeat) throws 
Exception {
+    GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), 
separateHeartbeat);
+    runWithNewCluster(3, cluster -> {
+      final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+      try (RaftClient client = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
+        final SimpleStateMachine4Testing stateMachine = 
SimpleStateMachine4Testing.get(cluster.getLeader());
+        stateMachine.blockStartTransaction();
+        try {
+          Assertions.assertThrows(TimeoutIOException.class,
+              () -> client.io().send(new SimpleMessage("sync-timeout")));
+        } finally {
+          stateMachine.unblockStartTransaction();
+        }
+      }
+    });
+  }
+
+  @ParameterizedTest
+  @MethodSource("data")
+  public void testGrpcClientRpcSyncCancelOnInterrupt(Boolean 
separateHeartbeat) throws Exception {
+    RaftClientConfigKeys.Rpc.setRequestTimeout(getProperties(), 
TimeDuration.valueOf(10, TimeUnit.SECONDS));
+    GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), 
separateHeartbeat);
+    runWithNewCluster(3, cluster -> {
+      final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+      try (RaftClient client = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
+        final SimpleStateMachine4Testing stateMachine = 
SimpleStateMachine4Testing.get(cluster.getLeader());
+        stateMachine.blockStartTransaction();
+        try {
+          final AtomicReference<Throwable> error = new AtomicReference<>();
+          final Thread t = new Thread(() -> {
+            try {
+              client.io().send(new SimpleMessage("sync-cancel"));
+            } catch (Throwable e) {
+              error.set(e);
+            }
+          });
+          t.start();
+          Thread.sleep(200);
+          t.interrupt();
+          t.join(5000);
+          Assertions.assertFalse(t.isAlive(), "request thread should exit 
after interrupt");
+          Assertions.assertTrue(error.get() instanceof InterruptedIOException,
+              "expected InterruptedIOException but got " + error.get());
+        } finally {
+          stateMachine.unblockStartTransaction();
+        }
+      }
+    });
+  }
+
   @ParameterizedTest
   @MethodSource("data")
   public void testRaftServerMetrics(Boolean separateHeartbeat) throws 
Exception {

Reply via email to