This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e53bda5a RATIS-2415. Fix queue corruption in NettyRpcProxy when 
request sending fails. (#1356)
1e53bda5a is described below

commit 1e53bda5aaf26672051de98688bcbf362e443e32
Author: slfan1989 <[email protected]>
AuthorDate: Sun Feb 22 04:54:27 2026 +0800

    RATIS-2415. Fix queue corruption in NettyRpcProxy when request sending 
fails. (#1356)
---
 .../java/org/apache/ratis/netty/NettyRpcProxy.java | 106 ++++++++++++++-----
 .../apache/ratis/server/impl/RaftServerImpl.java   |   7 +-
 .../apache/ratis/server/impl/ServerProtoUtils.java |   8 +-
 .../org/apache/ratis/netty/TestNettyRpcProxy.java  | 112 +++++++++++++++++++++
 .../apache/ratis/netty/TestRaftAsyncWithNetty.java |  10 +-
 .../impl/TestLeaderElectionServerInterface.java    |   5 +-
 6 files changed, 212 insertions(+), 36 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 675e0e1fe..cfcabc274 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -34,7 +34,9 @@ import 
org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
 import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -42,9 +44,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -80,6 +83,41 @@ public class NettyRpcProxy implements Closeable {
     }
   }
 
+  static RaftRpcRequestProto getRequest(RaftNettyServerRequestProto proto) {
+    final RaftNettyServerRequestProto.RaftNettyServerRequestCase requestCase = 
proto.getRaftNettyServerRequestCase();
+    switch (requestCase) {
+      case REQUESTVOTEREQUEST:
+        return proto.getRequestVoteRequest().getServerRequest();
+      case APPENDENTRIESREQUEST:
+        return proto.getAppendEntriesRequest().getServerRequest();
+      case INSTALLSNAPSHOTREQUEST:
+        return proto.getInstallSnapshotRequest().getServerRequest();
+      case RAFTCLIENTREQUEST:
+        return proto.getRaftClientRequest().getRpcRequest();
+      case SETCONFIGURATIONREQUEST:
+        return proto.getSetConfigurationRequest().getRpcRequest();
+      case GROUPMANAGEMENTREQUEST:
+        return proto.getGroupManagementRequest().getRpcRequest();
+      case GROUPLISTREQUEST:
+        return proto.getGroupListRequest().getRpcRequest();
+      case GROUPINFOREQUEST:
+        return proto.getGroupInfoRequest().getRpcRequest();
+      case TRANSFERLEADERSHIPREQUEST:
+        return proto.getTransferLeadershipRequest().getRpcRequest();
+      case STARTLEADERELECTIONREQUEST:
+        return proto.getStartLeaderElectionRequest().getServerRequest();
+      case SNAPSHOTMANAGEMENTREQUEST:
+        return proto.getSnapshotManagementRequest().getRpcRequest();
+      case LEADERELECTIONMANAGEMENTREQUEST:
+        return proto.getLeaderElectionManagementRequest().getRpcRequest();
+
+      case RAFTNETTYSERVERREQUEST_NOT_SET:
+        throw new IllegalArgumentException("Request case not set in proto: " + 
requestCase);
+      default:
+        throw new UnsupportedOperationException("Request case not supported: " 
+ requestCase);
+    }
+  }
+
   public static long getCallId(RaftNettyServerReplyProto proto) {
     switch (proto.getRaftNettyServerReplyCase()) {
       case REQUESTVOTEREPLY:
@@ -92,6 +130,10 @@ public class NettyRpcProxy implements Closeable {
         return proto.getInstallSnapshotReply().getServerReply().getCallId();
       case RAFTCLIENTREPLY:
         return proto.getRaftClientReply().getRpcReply().getCallId();
+      case GROUPLISTREPLY:
+        return proto.getGroupListReply().getRpcReply().getCallId();
+      case GROUPINFOREPLY:
+        return proto.getGroupInfoReply().getRpcReply().getCallId();
       case EXCEPTIONREPLY:
         return proto.getExceptionReply().getRpcReply().getCallId();
       case RAFTNETTYSERVERREPLY_NOT_SET:
@@ -106,8 +148,7 @@ public class NettyRpcProxy implements Closeable {
 
   class Connection implements Closeable {
     private final NettyClient client = new NettyClient(peer.getAddress());
-    private final Queue<CompletableFuture<RaftNettyServerReplyProto>> replies
-        = new LinkedList<>();
+    private final Map<Long, CompletableFuture<RaftNettyServerReplyProto>> 
replies = new ConcurrentHashMap<>();
 
     Connection(EventLoopGroup group) throws InterruptedException {
       final ChannelInboundHandler inboundHandler
@@ -115,11 +156,7 @@ public class NettyRpcProxy implements Closeable {
         @Override
         protected void channelRead0(ChannelHandlerContext ctx,
                                     RaftNettyServerReplyProto proto) {
-          final CompletableFuture<RaftNettyServerReplyProto> future = 
pollReply();
-          if (future == null) {
-            throw new IllegalStateException("Request #" + getCallId(proto)
-                + " not found");
-          }
+          final CompletableFuture<RaftNettyServerReplyProto> future = 
getReplyFuture(getCallId(proto), null);
           if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) {
             final Object ioe = 
ProtoUtils.toObject(proto.getExceptionReply().getException());
             future.completeExceptionally((IOException)ioe);
@@ -159,14 +196,38 @@ public class NettyRpcProxy implements Closeable {
       client.connect(group, initializer);
     }
 
-    synchronized ChannelFuture offer(RaftNettyServerRequestProto request,
-        CompletableFuture<RaftNettyServerReplyProto> reply) throws 
AlreadyClosedException {
-      replies.offer(reply);
-      return client.writeAndFlush(request);
+    private CompletableFuture<RaftNettyServerReplyProto> getReplyFuture(long 
callId,
+        CompletableFuture<RaftNettyServerReplyProto> expected) {
+      final CompletableFuture<RaftNettyServerReplyProto> removed = 
replies.remove(callId);
+      Objects.requireNonNull(removed, () -> "Request #" + callId + " not 
found");
+      if (expected != null) {
+        Preconditions.assertSame(expected, removed, "removed");
+      }
+      return removed;
     }
 
-    synchronized CompletableFuture<RaftNettyServerReplyProto> pollReply() {
-      return replies.poll();
+    synchronized CompletableFuture<RaftNettyServerReplyProto> 
offer(RaftNettyServerRequestProto request) {
+      final ChannelFuture future;
+      try {
+        future = client.writeAndFlush(request);
+      } catch (AlreadyClosedException e) {
+        return JavaUtils.completeExceptionally(e);
+      }
+
+      final CompletableFuture<RaftNettyServerReplyProto> reply = new 
CompletableFuture<>();
+      final long callId = getRequest(request).getCallId();
+      final CompletableFuture<RaftNettyServerReplyProto> previous = 
replies.put(callId, reply);
+      Preconditions.assertNull(previous, "previous");
+
+      future.addListener(cf -> {
+        if (!cf.isSuccess()) {
+          // Remove from queue on async write failure to prevent reply 
mismatch.
+          // Only complete exceptionally if removal succeeds (not already 
polled).
+          getReplyFuture(callId, reply).completeExceptionally(cf.cause());
+          client.close();
+        }
+      });
+      return reply;
     }
 
     @Override
@@ -179,7 +240,7 @@ public class NettyRpcProxy implements Closeable {
       if (!replies.isEmpty()) {
         LOG.warn("Still have {} requests outstanding from {} connection: {}",
             replies.size(), peer, cause.toString());
-        replies.forEach(f -> f.completeExceptionally(cause));
+        replies.values().forEach(f -> f.completeExceptionally(cause));
         replies.clear();
       }
     }
@@ -201,23 +262,14 @@ public class NettyRpcProxy implements Closeable {
   }
 
   public CompletableFuture<RaftNettyServerReplyProto> 
sendAsync(RaftNettyServerRequestProto proto) {
-    final CompletableFuture<RaftNettyServerReplyProto> reply = new 
CompletableFuture<>();
-    try {
-      connection.offer(proto, reply);
-    } catch (AlreadyClosedException e) {
-      reply.completeExceptionally(e);
-    }
-    return reply;
+    return connection.offer(proto);
   }
 
   public RaftNettyServerReplyProto send(
       RaftRpcRequestProto request, RaftNettyServerRequestProto proto)
       throws IOException {
-    final CompletableFuture<RaftNettyServerReplyProto> reply = new 
CompletableFuture<>();
-    final ChannelFuture channelFuture = connection.offer(proto, reply);
-
+    final CompletableFuture<RaftNettyServerReplyProto> reply = 
sendAsync(proto);
     try {
-      channelFuture.sync();
       TimeDuration newDuration = 
requestTimeoutDuration.add(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
       return reply.get(newDuration.getDuration(), newDuration.getUnit());
     } catch (InterruptedException e) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e5ebfafb3..d9dd09d96 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1430,12 +1430,13 @@ class RaftServerImpl implements RaftServer.Division,
         RaftPeerId.valueOf(request.getRequestorId()),
         ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
         r.getCandidateTerm(),
-        TermIndex.valueOf(r.getCandidateLastEntry()));
+        TermIndex.valueOf(r.getCandidateLastEntry()),
+        request.getCallId());
   }
 
   private RequestVoteReplyProto requestVote(Phase phase,
       RaftPeerId candidateId, RaftGroupId candidateGroupId,
-      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
+      long candidateTerm, TermIndex candidateLastEntry, long callId) throws 
IOException {
     CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
         candidateId, candidateTerm, candidateLastEntry);
     LOG.info("{}: receive requestVote({}, {}, {}, {}, {})",
@@ -1470,7 +1471,7 @@ class RaftServerImpl implements RaftServer.Division,
         shouldShutdown = true;
       }
       reply = toRequestVoteReplyProto(candidateId, getMemberId(),
-          voteGranted, state.getCurrentTerm(), shouldShutdown, 
state.getLastEntry());
+          voteGranted, state.getCurrentTerm(), shouldShutdown, 
state.getLastEntry(), callId);
       if (LOG.isInfoEnabled()) {
         LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
             getMemberId(), phase, toRequestVoteReplyString(reply), state);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index f491aaee2..19d4ce6a7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.util.Preconditions;
@@ -44,9 +45,9 @@ final class ServerProtoUtils {
 
   static RequestVoteReplyProto toRequestVoteReplyProto(
       RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long 
term, boolean shouldShutdown,
-      TermIndex lastEntry) {
+      TermIndex lastEntry, long callId) {
     return RequestVoteReplyProto.newBuilder()
-        .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, 
success))
+        .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, 
success).setCallId(callId))
         .setTerm(term)
         .setShouldShutdown(shouldShutdown)
         .setLastEntry((lastEntry != null? lastEntry : 
TermIndex.INITIAL_VALUE).toProto())
@@ -56,7 +57,8 @@ final class ServerProtoUtils {
   static RequestVoteRequestProto toRequestVoteRequestProto(
       RaftGroupMemberId requestorId, RaftPeerId replyId, long term, TermIndex 
lastEntry, boolean preVote) {
     final RequestVoteRequestProto.Builder b = 
RequestVoteRequestProto.newBuilder()
-        
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, 
replyId))
+        
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, 
replyId)
+            .setCallId(CallId.getAndIncrement()))
         .setCandidateTerm(term)
         .setPreVote(preVote);
     
Optional.ofNullable(lastEntry).map(TermIndex::toProto).ifPresent(b::setCandidateLastEntry);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java 
b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
new file mode 100644
index 000000000..780c249b6
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
+import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
+import org.apache.ratis.thirdparty.io.netty.channel.Channel;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
+import 
org.apache.ratis.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
+import org.apache.ratis.util.JavaUtils;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestNettyRpcProxy extends BaseTest {
+  @Test
+  public void testOfferRollbackOnAlreadyClosed() throws Exception {
+    // Minimal netty server to allow client connect; we don't need to process 
requests.
+    final EventLoopGroup bossGroup = 
NettyUtils.newEventLoopGroup("test-netty-boss", 1, false);
+    final EventLoopGroup workerGroup = 
NettyUtils.newEventLoopGroup("test-netty-worker", 1, false);
+    final EventLoopGroup clientGroup = 
NettyUtils.newEventLoopGroup("test-netty-client", 1, false);
+    Channel serverChannel = null;
+    NettyRpcProxy proxy = null;
+    try {
+      final ChannelFuture bindFuture = new ServerBootstrap()
+          .group(bossGroup, workerGroup)
+          .channel(NettyUtils.getServerChannelClass(workerGroup))
+          .childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            protected void initChannel(SocketChannel ch) {
+              ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
+                @Override
+                protected void channelRead0(ChannelHandlerContext ctx, Object 
msg) {
+                }
+              });
+            }
+          })
+          .bind(0)
+          .sync();
+      serverChannel = bindFuture.channel();
+
+      final InetSocketAddress address = (InetSocketAddress) 
serverChannel.localAddress();
+      final String peerAddress = address.getHostString() + ":" + 
address.getPort();
+      final RaftPeer peer = 
RaftPeer.newBuilder().setId("s0").setAddress(peerAddress).build();
+      proxy = new NettyRpcProxy(peer, new RaftProperties(), clientGroup);
+
+      // Close to force AlreadyClosedException on write and trigger rollback 
logic.
+      proxy.close();
+      final CompletableFuture<RaftNettyServerReplyProto> reply =
+          proxy.sendAsync(RaftNettyServerRequestProto.getDefaultInstance());
+
+      // Ensure the future completes exceptionally with AlreadyClosedException.
+      final Throwable thrown = assertThrows(CompletionException.class, 
reply::join);
+      final Throwable unwrapped = JavaUtils.unwrapCompletionException(thrown);
+      assertInstanceOf(AlreadyClosedException.class, unwrapped);
+
+      // The replies queue must be empty after rollback; use reflection to 
reach it.
+      final Object connection = getField(proxy, "connection");
+      final Map<?, ?> replies = getField(connection, "replies");
+      assertTrue(replies.isEmpty());
+    } finally {
+      if (proxy != null) {
+        proxy.close();
+      }
+      if (serverChannel != null) {
+        serverChannel.close().sync();
+      }
+      bossGroup.shutdownGracefully();
+      workerGroup.shutdownGracefully();
+      clientGroup.shutdownGracefully();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T getField(Object target, String name) throws Exception {
+    final Field field = target.getClass().getDeclaredField(name);
+    field.setAccessible(true);
+    return (T) field.get(target);
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
index c09b07876..d2f23e143 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
@@ -18,10 +18,18 @@
 package org.apache.ratis.netty;
 
 import org.apache.ratis.RaftAsyncTests;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 @Timeout(100)
 public class TestRaftAsyncWithNetty
     extends RaftAsyncTests<MiniRaftClusterWithNetty>
     implements MiniRaftClusterWithNetty.FactoryGet {
-}
\ No newline at end of file
+
+  @Override
+  @Test
+  @Timeout(500)
+  public void testWithLoadAsync() throws Exception {
+    super.testWithLoadAsync();
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
index 3a91f9a34..92b7ad420 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
@@ -107,7 +107,8 @@ public class TestLeaderElectionServerInterface extends 
BaseTest {
         final long term = (lastEntry != null? lastEntry : 
TermIndex.INITIAL_VALUE).getTerm();
 
         // voter replies to candidate
-        return ServerProtoUtils.toRequestVoteReplyProto(getId(), voter, true, 
term, false, lastEntry);
+        return ServerProtoUtils.toRequestVoteReplyProto(
+            getId(), voter, true, term, false, lastEntry, 
r.getServerRequest().getCallId());
       }
 
       @Override
@@ -190,4 +191,4 @@ public class TestLeaderElectionServerInterface extends 
BaseTest {
     election.startInForeground();
   }
 
-}
\ No newline at end of file
+}

Reply via email to