This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c69361ce6 RATIS-2349. NettyClient#writeAndFlush should support 
throwing AlreadyClosedException (#1303)
c69361ce6 is described below

commit c69361ce6a8bdadce481d9df3be376742a5ab6c2
Author: Xianming Lei <[email protected]>
AuthorDate: Mon Nov 3 12:51:22 2025 +0800

    RATIS-2349. NettyClient#writeAndFlush should support throwing 
AlreadyClosedException (#1303)
---
 .../main/java/org/apache/ratis/netty/NettyClient.java   | 17 ++++++++++++++---
 .../main/java/org/apache/ratis/netty/NettyRpcProxy.java |  8 ++++++--
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
index efea5fd0f..a42ddaca8 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.netty;
 
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
 import org.apache.ratis.thirdparty.io.netty.channel.Channel;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
@@ -64,9 +65,19 @@ public class NettyClient implements Closeable {
     lifeCycle.checkStateAndClose(() -> NettyUtils.closeChannel(channel, 
serverAddress));
   }
 
-  public ChannelFuture writeAndFlush(Object msg) {
-    lifeCycle.assertCurrentState(LifeCycle.States.RUNNING);
-    return channel.writeAndFlush(msg);
+  public ChannelFuture writeAndFlush(Object msg) throws AlreadyClosedException 
{
+    final LifeCycle.State state = lifeCycle.getCurrentState();
+    if (state.isRunning()) {
+      return channel.writeAndFlush(msg);
+    }
+    // For CLOSING, CLOSED, and EXCEPTION states, throw AlreadyClosedException 
to trigger reconnection
+    if (state.isClosingOrClosed() || state == LifeCycle.State.EXCEPTION) {
+      throw new AlreadyClosedException(
+        "Client is closed or failed: state=" + state + ", channel=" + channel);
+    }
+    // For other states (NEW, STARTING, PAUSING, PAUSED), this is a 
programming error
+    throw new IllegalStateException("Client is in unexpected state for 
writeAndFlush: " +
+      "state=" + state + ", channel=" + channel);
   }
 
   @Override
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 2c49de036..f77096e18 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -157,7 +157,7 @@ public class NettyRpcProxy implements Closeable {
     }
 
     synchronized ChannelFuture offer(RaftNettyServerRequestProto request,
-        CompletableFuture<RaftNettyServerReplyProto> reply) {
+        CompletableFuture<RaftNettyServerReplyProto> reply) throws 
AlreadyClosedException {
       replies.offer(reply);
       return client.writeAndFlush(request);
     }
@@ -199,7 +199,11 @@ public class NettyRpcProxy implements Closeable {
 
   public CompletableFuture<RaftNettyServerReplyProto> 
sendAsync(RaftNettyServerRequestProto proto) {
     final CompletableFuture<RaftNettyServerReplyProto> reply = new 
CompletableFuture<>();
-    connection.offer(proto, reply);
+    try {
+      connection.offer(proto, reply);
+    } catch (AlreadyClosedException e) {
+      reply.completeExceptionally(e);
+    }
     return reply;
   }
 

Reply via email to