This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push: new 863454270 RATIS-2294. Fix NettyClientRpc exception and timeout handling (#1264) 863454270 is described below commit 863454270d16d0d934b3449c570357366c7ed407 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Tue May 20 01:52:24 2025 +0800 RATIS-2294. Fix NettyClientRpc exception and timeout handling (#1264) --- .../apache/ratis/netty/client/NettyClientRpc.java | 48 +++++++++++++++++++++- .../test/java/org/apache/ratis/RaftAsyncTests.java | 20 +++++---- .../test/java/org/apache/ratis/RaftBasicTests.java | 2 +- 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java index 26ac41f7d..ef34caf17 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.netty.client; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.client.impl.RaftClientRpcWithProxy; import org.apache.ratis.conf.RaftProperties; @@ -28,23 +29,40 @@ import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto; +import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.TimeoutExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> { + + public static final Logger LOG = LoggerFactory.getLogger(NettyClientRpc.class); + + private ClientId clientId; + private final TimeDuration requestTimeout; + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); + public NettyClientRpc(ClientId clientId, RaftProperties properties) { super(new NettyRpcProxy.PeerMap(clientId.toString(), properties)); + this.clientId = clientId; + this.requestTimeout = RaftClientConfigKeys.Rpc.requestTimeout(properties); } @Override public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) { final RaftPeerId serverId = request.getServerId(); + long callId = request.getCallId(); try { final NettyRpcProxy proxy = getProxies().getProxy(serverId); final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request); - return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> { + final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>(); + + proxy.sendAsync(serverRequestProto).thenApply(replyProto -> { if (request instanceof GroupListRequest) { return ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply()); } else if (request instanceof GroupInfoRequest) { @@ -52,7 +70,35 @@ public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> { } else { return ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply()); } + }).whenComplete((reply, e) -> { + if (e == null) { + if (reply == null) { + e = new NullPointerException("Both reply==null && e==null"); + } + if (e == null) { + e = reply.getNotLeaderException(); + } + if (e == null) { + e = reply.getLeaderNotReadyException(); + } + } + + if (e != null) { + replyFuture.completeExceptionally(e); + } else { + replyFuture.complete(reply); + } }); + + scheduler.onTimeout(requestTimeout, () -> { + if (!replyFuture.isDone()) { + final String s = clientId + "->" + serverId + " request #" + + callId + " timeout " + requestTimeout.getDuration(); + replyFuture.completeExceptionally(new TimeoutIOException(s)); + } + }, LOG, () -> "Timeout check for client request #" + callId); + + return replyFuture; } catch (Throwable e) { return JavaUtils.completeExceptionally(e); } diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 3a760a806..a1c16df8f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -47,6 +47,7 @@ import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.PlatformUtils; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedRunnable; @@ -83,6 +84,10 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba { getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); + if (!PlatformUtils.LINUX) { + getProperties().setBoolean("raft.netty.server.use-epoll", false); + getProperties().setBoolean("raft.netty.client.use-epoll", false); + } } @Test @@ -282,8 +287,8 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba void runTestStaleReadAsync(CLUSTER cluster) throws Exception { final int numMessages = 10; - try (RaftClient client = cluster.createClient()) { - RaftTestUtil.waitForLeader(cluster); + RaftServer.Division division = waitForLeader(cluster); + try (RaftClient client = cluster.createClient(division.getId())) { // submit some messages final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(); @@ -304,6 +309,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba // Use a follower with the max commit index final RaftClientReply lastWriteReply = replies.get(replies.size() - 1); final RaftPeerId leader = lastWriteReply.getServerId(); + Assert.assertEquals(leader, lastWriteReply.getServerId()); LOG.info("leader = " + leader); final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos(); LOG.info("commitInfos = " + commitInfos); @@ -366,8 +372,8 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws Exception { final int numMessages = 20; - try (RaftClient client = cluster.createClient()) { - RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leader = waitForLeader(cluster).getId(); + try (RaftClient client = cluster.createClient(leader)) { // submit some messages for (int i = 0; i < numMessages; i++) { @@ -417,13 +423,13 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba LOG.info("Running testAppendEntriesTimeout"); final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties()); RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS)); - waitForLeader(cluster); + final RaftPeerId leader = waitForLeader(cluster).getId(); long time = System.currentTimeMillis(); long waitTime = 5000; try (final RaftClient client = cluster.createClient()) { // block append requests cluster.getServerAliveStream() - .filter(impl -> !impl.getInfo().isLeader()) + .filter(impl -> !impl.getInfo().isLeader() && !impl.getPeer().getId().equals(leader)) .map(SimpleStateMachine4Testing::get) .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData); @@ -433,7 +439,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba Assert.assertFalse(replyFuture.isDone()); // unblock append request. cluster.getServerAliveStream() - .filter(impl -> !impl.getInfo().isLeader()) + .filter(impl -> !impl.getInfo().isLeader() && !impl.getPeer().getId().equals(leader)) .map(SimpleStateMachine4Testing::get) .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData); diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index afb189183..2ce1706cf 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -457,7 +457,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> static void runTestStateMachineMetrics(boolean async, MiniRaftCluster cluster) throws Exception { RaftServer.Division leader = waitForLeader(cluster); - try (final RaftClient client = cluster.createClient()) { + try (final RaftClient client = cluster.createClient(leader.getId())) { Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader, STATEMACHINE_APPLIED_INDEX_GAUGE); Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,