This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b940b6500 RATIS-2400. Support timeout and interrupt handling in
GrpcClientRpc. (#1342)
b940b6500 is described below
commit b940b65004bea5f15593e2bbcb5ce925c0580b39
Author: slfan1989 <[email protected]>
AuthorDate: Sun Feb 8 11:09:22 2026 +0800
RATIS-2400. Support timeout and interrupt handling in GrpcClientRpc. (#1342)
---
.../apache/ratis/grpc/client/GrpcClientRpc.java | 66 ++++++++++++++++------
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 54 ++++++++++++++++++
2 files changed, 104 insertions(+), 16 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 4010ade27..65175dc2a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.grpc.client;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
import org.apache.ratis.conf.RaftProperties;
@@ -24,6 +25,8 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
@@ -39,6 +42,7 @@ import
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,12 +50,16 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClient> {
public static final Logger LOG =
LoggerFactory.getLogger(GrpcClientRpc.class);
private final ClientId clientId;
private final int maxMessageSize;
+ private final TimeDuration requestTimeoutDuration;
+ private final TimeDuration watchRequestTimeoutDuration;
public GrpcClientRpc(ClientId clientId, RaftProperties properties,
SslContext adminSslContext, SslContext clientSslContext) {
@@ -59,6 +67,8 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
p -> new GrpcClientProtocolClient(clientId, p, properties,
adminSslContext, clientSslContext)));
this.clientId = clientId;
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties,
LOG::debug).getSizeInt();
+ this.requestTimeoutDuration =
RaftClientConfigKeys.Rpc.requestTimeout(properties);
+ this.watchRequestTimeoutDuration =
RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
}
@Override
@@ -121,24 +131,11 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
((LeaderElectionManagementRequest) request);
return
ClientProtoUtils.toRaftClientReply(proxy.leaderElectionManagement(proto));
} else {
- final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
- // TODO: timeout support
- try {
- return f.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException(
- "Interrupted while waiting for response of request " + request);
- } catch (ExecutionException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(clientId + ": failed " + request, e);
- }
- throw IOUtils.toIOException(e);
- }
+ return sendRequest(request, proxy);
}
}
- private CompletableFuture<RaftClientReply> sendRequest(
+ private RaftClientReply sendRequest(
RaftClientRequest request, GrpcClientProtocolClient proxy) throws
IOException {
final RaftClientRequestProto requestProto =
toRaftClientRequestProto(request);
@@ -167,7 +164,44 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
requestObserver.onNext(requestProto);
requestObserver.onCompleted();
- return replyFuture.thenApply(ClientProtoUtils::toRaftClientReply);
+ final TimeDuration timeout = getTimeoutDuration(request);
+ try {
+ return replyFuture.thenApply(ClientProtoUtils::toRaftClientReply)
+ .get(timeout.getDuration(), timeout.getUnit());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ replyFuture.cancel(true);
+ final InterruptedIOException ioe = new InterruptedIOException(clientId +
": Interrupted " + request);
+ sendOnError(requestObserver, Status.CANCELLED, ioe.getMessage());
+ throw ioe;
+ } catch (TimeoutException e) {
+ replyFuture.cancel(true);
+ final TimeoutIOException ioe =
+ new TimeoutIOException(clientId + ": Timed out " + timeout + " for "
+ request, e);
+ sendOnError(requestObserver, Status.DEADLINE_EXCEEDED, ioe.getMessage());
+ throw ioe;
+ } catch (ExecutionException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} : failed {}", clientId, request, e);
+ }
+ throw IOUtils.toIOException(e);
+ }
+ }
+
+ private void sendOnError(StreamObserver<RaftClientRequestProto>
requestObserver, Status status, String message) {
+ try {
+ requestObserver.onError(status.withDescription(message).asException());
+ } catch (Exception ignored) {
+ // the stream already closed.
+ }
+ }
+
+ private TimeDuration getTimeoutDuration(RaftClientRequest request) {
+ final long timeoutMs = request.getTimeoutMs();
+ if (timeoutMs > 0) {
+ return TimeDuration.valueOf(timeoutMs, TimeUnit.MILLISECONDS);
+ }
+ return request.is(RaftClientRequestProto.TypeCase.WATCH) ?
watchRequestTimeoutDuration : requestTimeoutDuration;
}
private RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest
request) throws IOException {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 322eb5228..100fb8d5d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -69,6 +69,7 @@ import org.slf4j.event.Level;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -81,6 +82,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
public class TestRaftServerWithGrpc extends BaseTest implements
MiniRaftClusterWithGrpc.FactoryGet {
{
@@ -238,6 +240,58 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
runWithNewCluster(3, this::testRaftClientRequestMetrics);
}
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testGrpcClientRpcSyncTimeout(Boolean separateHeartbeat) throws
Exception {
+ GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(),
separateHeartbeat);
+ runWithNewCluster(3, cluster -> {
+ final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+ try (RaftClient client = cluster.createClient(leaderId,
RetryPolicies.noRetry())) {
+ final SimpleStateMachine4Testing stateMachine =
SimpleStateMachine4Testing.get(cluster.getLeader());
+ stateMachine.blockStartTransaction();
+ try {
+ Assertions.assertThrows(TimeoutIOException.class,
+ () -> client.io().send(new SimpleMessage("sync-timeout")));
+ } finally {
+ stateMachine.unblockStartTransaction();
+ }
+ }
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testGrpcClientRpcSyncCancelOnInterrupt(Boolean
separateHeartbeat) throws Exception {
+ RaftClientConfigKeys.Rpc.setRequestTimeout(getProperties(),
TimeDuration.valueOf(10, TimeUnit.SECONDS));
+ GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(),
separateHeartbeat);
+ runWithNewCluster(3, cluster -> {
+ final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+ try (RaftClient client = cluster.createClient(leaderId,
RetryPolicies.noRetry())) {
+ final SimpleStateMachine4Testing stateMachine =
SimpleStateMachine4Testing.get(cluster.getLeader());
+ stateMachine.blockStartTransaction();
+ try {
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread t = new Thread(() -> {
+ try {
+ client.io().send(new SimpleMessage("sync-cancel"));
+ } catch (Throwable e) {
+ error.set(e);
+ }
+ });
+ t.start();
+ Thread.sleep(200);
+ t.interrupt();
+ t.join(5000);
+ Assertions.assertFalse(t.isAlive(), "request thread should exit
after interrupt");
+ Assertions.assertTrue(error.get() instanceof InterruptedIOException,
+ "expected InterruptedIOException but got " + error.get());
+ } finally {
+ stateMachine.unblockStartTransaction();
+ }
+ }
+ });
+ }
+
@ParameterizedTest
@MethodSource("data")
public void testRaftServerMetrics(Boolean separateHeartbeat) throws
Exception {