This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new fd9ca3ab3 RATIS-2329. NettyRpcProxy should support handling netty 
channel exception to prevent replication stuck (#1285)
fd9ca3ab3 is described below

commit fd9ca3ab389b19913d3754459d6ee9a5ba33189b
Author: Xianming Lei <[email protected]>
AuthorDate: Fri Oct 10 19:43:49 2025 +0800

    RATIS-2329. NettyRpcProxy should support handling netty channel exception 
to prevent replication stuck (#1285)
---
 .../main/java/org/apache/ratis/util/IOUtils.java   | 11 +++++++---
 .../java/org/apache/ratis/netty/NettyRpcProxy.java | 25 ++++++++++++++++++++--
 .../test/java/org/apache/ratis/RaftAsyncTests.java | 14 ++++++------
 3 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index f1fe6c35c..8e91b3fb0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -91,9 +91,14 @@ public interface IOUtils {
   }
 
   static boolean shouldReconnect(Throwable e) {
-    return ReflectionUtils.isInstance(e,
-        SocketException.class, SocketTimeoutException.class, 
ClosedChannelException.class, EOFException.class,
-        AlreadyClosedException.class);
+    for (; e != null; e = e.getCause()) {
+      if (ReflectionUtils.isInstance(e,
+          SocketException.class, SocketTimeoutException.class, 
ClosedChannelException.class, EOFException.class,
+          AlreadyClosedException.class, TimeoutIOException.class)) {
+        return true;
+      }
+    }
+    return false;
   }
 
   static void readFully(InputStream in, int buffSize) throws IOException {
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 41269f76e..2c49de036 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -30,10 +30,13 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarin
 import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.PeerProxyMap;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -47,6 +50,7 @@ import java.util.concurrent.TimeoutException;
 import static 
org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY;
 
 public class NettyRpcProxy implements Closeable {
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyRpcProxy.class);
   public static class PeerMap extends PeerProxyMap<NettyRpcProxy> {
     private final EventLoopGroup group;
 
@@ -121,6 +125,18 @@ public class NettyRpcProxy implements Closeable {
             future.complete(proto);
           }
         }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+          client.close();
+          failOutstandingRequests(new IOException("Caught an exception for the 
connection to " + peer, cause));
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+          failOutstandingRequests(new AlreadyClosedException("Channel to " + 
peer + " is inactive."));
+          super.channelInactive(ctx);
+        }
       };
       final ChannelInitializer<SocketChannel> initializer
           = new ChannelInitializer<SocketChannel>() {
@@ -153,9 +169,14 @@ public class NettyRpcProxy implements Closeable {
     @Override
     public synchronized void close() {
       client.close();
+      failOutstandingRequests(new AlreadyClosedException("Closing connection 
to " + peer));
+    }
+
+    private synchronized void failOutstandingRequests(Throwable cause) {
       if (!replies.isEmpty()) {
-        final IOException e = new IOException("Connection to " + peer + " is 
closed.");
-        replies.stream().forEach(f -> f.completeExceptionally(e));
+        LOG.warn("Still have {} requests outstanding from {} connection: {}",
+            replies.size(), peer, cause.toString());
+        replies.forEach(f -> f.completeExceptionally(cause));
         replies.clear();
       }
     }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 4119bea71..3c765d717 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -181,12 +181,8 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
       // the second half still have retry time remaining.
       sleepTime.apply(t -> t*2).sleep();
 
-      if (leader != null) {
-        cluster.restartServer(leader, false);
-      } else {
-        cluster.start();
-      }
-
+      // The client will try to reconnect, but the server is
+      // not started at this time and the retry will fail anyway.
       // all the calls should fail for ordering guarantee
       for(int i = 0; i < replies.size(); i++) {
         final CheckedRunnable<Exception> getReply = replies.get(i)::get;
@@ -203,6 +199,12 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
 
       testFailureCaseAsync("last-request", () -> client.async().send(new 
SimpleMessage("last")),
           AlreadyClosedException.class, RaftRetryFailureException.class);
+
+      if (leader != null) {
+        cluster.restartServer(leader, false);
+      } else {
+        cluster.start();
+      }
     }
   }
 

Reply via email to