This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 863454270 RATIS-2294. Fix NettyClientRpc exception and timeout 
handling (#1264)
863454270 is described below

commit 863454270d16d0d934b3449c570357366c7ed407
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Tue May 20 01:52:24 2025 +0800

    RATIS-2294. Fix NettyClientRpc exception and timeout handling (#1264)
---
 .../apache/ratis/netty/client/NettyClientRpc.java  | 48 +++++++++++++++++++++-
 .../test/java/org/apache/ratis/RaftAsyncTests.java | 20 +++++----
 .../test/java/org/apache/ratis/RaftBasicTests.java |  2 +-
 3 files changed, 61 insertions(+), 9 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index 26ac41f7d..ef34caf17 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.netty.client;
 
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
 import org.apache.ratis.conf.RaftProperties;
@@ -28,23 +29,40 @@ import 
org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientRpc.class);
+
+  private ClientId clientId;
+  private final TimeDuration requestTimeout;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
   public NettyClientRpc(ClientId clientId, RaftProperties properties) {
     super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
+    this.clientId = clientId;
+    this.requestTimeout = RaftClientConfigKeys.Rpc.requestTimeout(properties);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest 
request) {
     final RaftPeerId serverId = request.getServerId();
+    long callId = request.getCallId();
     try {
       final NettyRpcProxy proxy = getProxies().getProxy(serverId);
       final RaftNettyServerRequestProto serverRequestProto = 
buildRequestProto(request);
-      return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+      final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
+
+      proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
         if (request instanceof GroupListRequest) {
           return 
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
         } else if (request instanceof GroupInfoRequest) {
@@ -52,7 +70,35 @@ public class NettyClientRpc extends 
RaftClientRpcWithProxy<NettyRpcProxy> {
         } else {
           return 
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
         }
+      }).whenComplete((reply, e) -> {
+        if (e == null) {
+          if (reply == null) {
+            e = new NullPointerException("Both reply==null && e==null");
+          }
+          if (e == null) {
+            e = reply.getNotLeaderException();
+          }
+          if (e == null) {
+            e = reply.getLeaderNotReadyException();
+          }
+        }
+
+        if (e != null) {
+          replyFuture.completeExceptionally(e);
+        } else {
+          replyFuture.complete(reply);
+        }
       });
+
+      scheduler.onTimeout(requestTimeout, () -> {
+          if (!replyFuture.isDone()) {
+            final String s = clientId + "->" + serverId + " request #" +
+                callId + " timeout " + requestTimeout.getDuration();
+            replyFuture.completeExceptionally(new TimeoutIOException(s));
+          }
+        }, LOG, () -> "Timeout check for client request #" + callId);
+
+      return replyFuture;
     } catch (Throwable e) {
       return JavaUtils.completeExceptionally(e);
     }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 3a760a806..a1c16df8f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -47,6 +47,7 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.PlatformUtils;
 import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedRunnable;
@@ -83,6 +84,10 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
   {
     getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
+    if (!PlatformUtils.LINUX) {
+      getProperties().setBoolean("raft.netty.server.use-epoll", false);
+      getProperties().setBoolean("raft.netty.client.use-epoll", false);
+    }
   }
 
   @Test
@@ -282,8 +287,8 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
 
   void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
     final int numMessages = 10;
-    try (RaftClient client = cluster.createClient()) {
-      RaftTestUtil.waitForLeader(cluster);
+    RaftServer.Division division = waitForLeader(cluster);
+    try (RaftClient client = cluster.createClient(division.getId())) {
 
       // submit some messages
       final List<CompletableFuture<RaftClientReply>> futures = new 
ArrayList<>();
@@ -304,6 +309,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
       // Use a follower with the max commit index
       final RaftClientReply lastWriteReply = replies.get(replies.size() - 1);
       final RaftPeerId leader = lastWriteReply.getServerId();
+      Assert.assertEquals(leader, lastWriteReply.getServerId());
       LOG.info("leader = " + leader);
       final Collection<CommitInfoProto> commitInfos = 
lastWriteReply.getCommitInfos();
       LOG.info("commitInfos = " + commitInfos);
@@ -366,8 +372,8 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
 
   void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws 
Exception {
     final int numMessages = 20;
-    try (RaftClient client = cluster.createClient()) {
-      RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leader = waitForLeader(cluster).getId();
+    try (RaftClient client = cluster.createClient(leader)) {
 
       // submit some messages
       for (int i = 0; i < numMessages; i++) {
@@ -417,13 +423,13 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     LOG.info("Running testAppendEntriesTimeout");
     final TimeDuration oldExpiryTime = 
RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
TimeDuration.valueOf(20, TimeUnit.SECONDS));
-    waitForLeader(cluster);
+    final RaftPeerId leader = waitForLeader(cluster).getId();
     long time = System.currentTimeMillis();
     long waitTime = 5000;
     try (final RaftClient client = cluster.createClient()) {
       // block append requests
       cluster.getServerAliveStream()
-          .filter(impl -> !impl.getInfo().isLeader())
+          .filter(impl -> !impl.getInfo().isLeader() && 
!impl.getPeer().getId().equals(leader))
           .map(SimpleStateMachine4Testing::get)
           .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
 
@@ -433,7 +439,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
       Assert.assertFalse(replyFuture.isDone());
       // unblock append request.
       cluster.getServerAliveStream()
-          .filter(impl -> !impl.getInfo().isLeader())
+          .filter(impl -> !impl.getInfo().isLeader() && 
!impl.getPeer().getId().equals(leader))
           .map(SimpleStateMachine4Testing::get)
           .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index afb189183..2ce1706cf 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -457,7 +457,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
 
   static void runTestStateMachineMetrics(boolean async, MiniRaftCluster 
cluster) throws Exception {
     RaftServer.Division leader = waitForLeader(cluster);
-    try (final RaftClient client = cluster.createClient()) {
+    try (final RaftClient client = cluster.createClient(leader.getId())) {
       Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
           STATEMACHINE_APPLIED_INDEX_GAUGE);
       Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,

Reply via email to