This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new fd9ca3ab3 RATIS-2329. NettyRpcProxy should support handling netty
channel exception to prevent replication stuck (#1285)
fd9ca3ab3 is described below
commit fd9ca3ab389b19913d3754459d6ee9a5ba33189b
Author: Xianming Lei <[email protected]>
AuthorDate: Fri Oct 10 19:43:49 2025 +0800
RATIS-2329. NettyRpcProxy should support handling netty channel exception
to prevent replication stuck (#1285)
---
.../main/java/org/apache/ratis/util/IOUtils.java | 11 +++++++---
.../java/org/apache/ratis/netty/NettyRpcProxy.java | 25 ++++++++++++++++++++--
.../test/java/org/apache/ratis/RaftAsyncTests.java | 14 ++++++------
3 files changed, 39 insertions(+), 11 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index f1fe6c35c..8e91b3fb0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -91,9 +91,14 @@ public interface IOUtils {
}
static boolean shouldReconnect(Throwable e) {
- return ReflectionUtils.isInstance(e,
- SocketException.class, SocketTimeoutException.class,
ClosedChannelException.class, EOFException.class,
- AlreadyClosedException.class);
+ for (; e != null; e = e.getCause()) {
+ if (ReflectionUtils.isInstance(e,
+ SocketException.class, SocketTimeoutException.class,
ClosedChannelException.class, EOFException.class,
+ AlreadyClosedException.class, TimeoutIOException.class)) {
+ return true;
+ }
+ }
+ return false;
}
static void readFully(InputStream in, int buffSize) throws IOException {
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 41269f76e..2c49de036 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -30,10 +30,13 @@ import
org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarin
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
@@ -47,6 +50,7 @@ import java.util.concurrent.TimeoutException;
import static
org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY;
public class NettyRpcProxy implements Closeable {
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyRpcProxy.class);
public static class PeerMap extends PeerProxyMap<NettyRpcProxy> {
private final EventLoopGroup group;
@@ -121,6 +125,18 @@ public class NettyRpcProxy implements Closeable {
future.complete(proto);
}
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
+ client.close();
+ failOutstandingRequests(new IOException("Caught an exception for the
connection to " + peer, cause));
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws
Exception {
+ failOutstandingRequests(new AlreadyClosedException("Channel to " +
peer + " is inactive."));
+ super.channelInactive(ctx);
+ }
};
final ChannelInitializer<SocketChannel> initializer
= new ChannelInitializer<SocketChannel>() {
@@ -153,9 +169,14 @@ public class NettyRpcProxy implements Closeable {
@Override
public synchronized void close() {
client.close();
+ failOutstandingRequests(new AlreadyClosedException("Closing connection
to " + peer));
+ }
+
+ private synchronized void failOutstandingRequests(Throwable cause) {
if (!replies.isEmpty()) {
- final IOException e = new IOException("Connection to " + peer + " is
closed.");
- replies.stream().forEach(f -> f.completeExceptionally(e));
+ LOG.warn("Still have {} requests outstanding from {} connection: {}",
+ replies.size(), peer, cause.toString());
+ replies.forEach(f -> f.completeExceptionally(cause));
replies.clear();
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 4119bea71..3c765d717 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -181,12 +181,8 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
// the second half still have retry time remaining.
sleepTime.apply(t -> t*2).sleep();
- if (leader != null) {
- cluster.restartServer(leader, false);
- } else {
- cluster.start();
- }
-
+ // The client will try to reconnect, but the server is
+ // not started at this time and the retry will fail anyway.
// all the calls should fail for ordering guarantee
for(int i = 0; i < replies.size(); i++) {
final CheckedRunnable<Exception> getReply = replies.get(i)::get;
@@ -203,6 +199,12 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
testFailureCaseAsync("last-request", () -> client.async().send(new
SimpleMessage("last")),
AlreadyClosedException.class, RaftRetryFailureException.class);
+
+ if (leader != null) {
+ cluster.restartServer(leader, false);
+ } else {
+ cluster.start();
+ }
}
}