This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1e53bda5a RATIS-2415. Fix queue corruption in NettyRpcProxy when
request sending fails. (#1356)
1e53bda5a is described below
commit 1e53bda5aaf26672051de98688bcbf362e443e32
Author: slfan1989 <[email protected]>
AuthorDate: Sun Feb 22 04:54:27 2026 +0800
RATIS-2415. Fix queue corruption in NettyRpcProxy when request sending
fails. (#1356)
---
.../java/org/apache/ratis/netty/NettyRpcProxy.java | 106 ++++++++++++++-----
.../apache/ratis/server/impl/RaftServerImpl.java | 7 +-
.../apache/ratis/server/impl/ServerProtoUtils.java | 8 +-
.../org/apache/ratis/netty/TestNettyRpcProxy.java | 112 +++++++++++++++++++++
.../apache/ratis/netty/TestRaftAsyncWithNetty.java | 10 +-
.../impl/TestLeaderElectionServerInterface.java | 5 +-
6 files changed, 212 insertions(+), 36 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 675e0e1fe..cfcabc274 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -34,7 +34,9 @@ import
org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -42,9 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -80,6 +83,41 @@ public class NettyRpcProxy implements Closeable {
}
}
+ static RaftRpcRequestProto getRequest(RaftNettyServerRequestProto proto) {
+ final RaftNettyServerRequestProto.RaftNettyServerRequestCase requestCase =
proto.getRaftNettyServerRequestCase();
+ switch (requestCase) {
+ case REQUESTVOTEREQUEST:
+ return proto.getRequestVoteRequest().getServerRequest();
+ case APPENDENTRIESREQUEST:
+ return proto.getAppendEntriesRequest().getServerRequest();
+ case INSTALLSNAPSHOTREQUEST:
+ return proto.getInstallSnapshotRequest().getServerRequest();
+ case RAFTCLIENTREQUEST:
+ return proto.getRaftClientRequest().getRpcRequest();
+ case SETCONFIGURATIONREQUEST:
+ return proto.getSetConfigurationRequest().getRpcRequest();
+ case GROUPMANAGEMENTREQUEST:
+ return proto.getGroupManagementRequest().getRpcRequest();
+ case GROUPLISTREQUEST:
+ return proto.getGroupListRequest().getRpcRequest();
+ case GROUPINFOREQUEST:
+ return proto.getGroupInfoRequest().getRpcRequest();
+ case TRANSFERLEADERSHIPREQUEST:
+ return proto.getTransferLeadershipRequest().getRpcRequest();
+ case STARTLEADERELECTIONREQUEST:
+ return proto.getStartLeaderElectionRequest().getServerRequest();
+ case SNAPSHOTMANAGEMENTREQUEST:
+ return proto.getSnapshotManagementRequest().getRpcRequest();
+ case LEADERELECTIONMANAGEMENTREQUEST:
+ return proto.getLeaderElectionManagementRequest().getRpcRequest();
+
+ case RAFTNETTYSERVERREQUEST_NOT_SET:
+ throw new IllegalArgumentException("Request case not set in proto: " +
requestCase);
+ default:
+ throw new UnsupportedOperationException("Request case not supported: "
+ requestCase);
+ }
+ }
+
public static long getCallId(RaftNettyServerReplyProto proto) {
switch (proto.getRaftNettyServerReplyCase()) {
case REQUESTVOTEREPLY:
@@ -92,6 +130,10 @@ public class NettyRpcProxy implements Closeable {
return proto.getInstallSnapshotReply().getServerReply().getCallId();
case RAFTCLIENTREPLY:
return proto.getRaftClientReply().getRpcReply().getCallId();
+ case GROUPLISTREPLY:
+ return proto.getGroupListReply().getRpcReply().getCallId();
+ case GROUPINFOREPLY:
+ return proto.getGroupInfoReply().getRpcReply().getCallId();
case EXCEPTIONREPLY:
return proto.getExceptionReply().getRpcReply().getCallId();
case RAFTNETTYSERVERREPLY_NOT_SET:
@@ -106,8 +148,7 @@ public class NettyRpcProxy implements Closeable {
class Connection implements Closeable {
private final NettyClient client = new NettyClient(peer.getAddress());
- private final Queue<CompletableFuture<RaftNettyServerReplyProto>> replies
- = new LinkedList<>();
+ private final Map<Long, CompletableFuture<RaftNettyServerReplyProto>>
replies = new ConcurrentHashMap<>();
Connection(EventLoopGroup group) throws InterruptedException {
final ChannelInboundHandler inboundHandler
@@ -115,11 +156,7 @@ public class NettyRpcProxy implements Closeable {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
RaftNettyServerReplyProto proto) {
- final CompletableFuture<RaftNettyServerReplyProto> future =
pollReply();
- if (future == null) {
- throw new IllegalStateException("Request #" + getCallId(proto)
- + " not found");
- }
+ final CompletableFuture<RaftNettyServerReplyProto> future =
getReplyFuture(getCallId(proto), null);
if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) {
final Object ioe =
ProtoUtils.toObject(proto.getExceptionReply().getException());
future.completeExceptionally((IOException)ioe);
@@ -159,14 +196,38 @@ public class NettyRpcProxy implements Closeable {
client.connect(group, initializer);
}
- synchronized ChannelFuture offer(RaftNettyServerRequestProto request,
- CompletableFuture<RaftNettyServerReplyProto> reply) throws
AlreadyClosedException {
- replies.offer(reply);
- return client.writeAndFlush(request);
+ private CompletableFuture<RaftNettyServerReplyProto> getReplyFuture(long
callId,
+ CompletableFuture<RaftNettyServerReplyProto> expected) {
+ final CompletableFuture<RaftNettyServerReplyProto> removed =
replies.remove(callId);
+ Objects.requireNonNull(removed, () -> "Request #" + callId + " not
found");
+ if (expected != null) {
+ Preconditions.assertSame(expected, removed, "removed");
+ }
+ return removed;
}
- synchronized CompletableFuture<RaftNettyServerReplyProto> pollReply() {
- return replies.poll();
+ synchronized CompletableFuture<RaftNettyServerReplyProto>
offer(RaftNettyServerRequestProto request) {
+ final ChannelFuture future;
+ try {
+ future = client.writeAndFlush(request);
+ } catch (AlreadyClosedException e) {
+ return JavaUtils.completeExceptionally(e);
+ }
+
+ final CompletableFuture<RaftNettyServerReplyProto> reply = new
CompletableFuture<>();
+ final long callId = getRequest(request).getCallId();
+ final CompletableFuture<RaftNettyServerReplyProto> previous =
replies.put(callId, reply);
+ Preconditions.assertNull(previous, "previous");
+
+ future.addListener(cf -> {
+ if (!cf.isSuccess()) {
+ // Remove from queue on async write failure to prevent reply
mismatch.
+ // Only complete exceptionally if removal succeeds (not already
polled).
+ getReplyFuture(callId, reply).completeExceptionally(cf.cause());
+ client.close();
+ }
+ });
+ return reply;
}
@Override
@@ -179,7 +240,7 @@ public class NettyRpcProxy implements Closeable {
if (!replies.isEmpty()) {
LOG.warn("Still have {} requests outstanding from {} connection: {}",
replies.size(), peer, cause.toString());
- replies.forEach(f -> f.completeExceptionally(cause));
+ replies.values().forEach(f -> f.completeExceptionally(cause));
replies.clear();
}
}
@@ -201,23 +262,14 @@ public class NettyRpcProxy implements Closeable {
}
public CompletableFuture<RaftNettyServerReplyProto>
sendAsync(RaftNettyServerRequestProto proto) {
- final CompletableFuture<RaftNettyServerReplyProto> reply = new
CompletableFuture<>();
- try {
- connection.offer(proto, reply);
- } catch (AlreadyClosedException e) {
- reply.completeExceptionally(e);
- }
- return reply;
+ return connection.offer(proto);
}
public RaftNettyServerReplyProto send(
RaftRpcRequestProto request, RaftNettyServerRequestProto proto)
throws IOException {
- final CompletableFuture<RaftNettyServerReplyProto> reply = new
CompletableFuture<>();
- final ChannelFuture channelFuture = connection.offer(proto, reply);
-
+ final CompletableFuture<RaftNettyServerReplyProto> reply =
sendAsync(proto);
try {
- channelFuture.sync();
TimeDuration newDuration =
requestTimeoutDuration.add(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
return reply.get(newDuration.getDuration(), newDuration.getUnit());
} catch (InterruptedException e) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e5ebfafb3..d9dd09d96 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1430,12 +1430,13 @@ class RaftServerImpl implements RaftServer.Division,
RaftPeerId.valueOf(request.getRequestorId()),
ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
r.getCandidateTerm(),
- TermIndex.valueOf(r.getCandidateLastEntry()));
+ TermIndex.valueOf(r.getCandidateLastEntry()),
+ request.getCallId());
}
private RequestVoteReplyProto requestVote(Phase phase,
RaftPeerId candidateId, RaftGroupId candidateGroupId,
- long candidateTerm, TermIndex candidateLastEntry) throws IOException {
+ long candidateTerm, TermIndex candidateLastEntry, long callId) throws
IOException {
CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
candidateId, candidateTerm, candidateLastEntry);
LOG.info("{}: receive requestVote({}, {}, {}, {}, {})",
@@ -1470,7 +1471,7 @@ class RaftServerImpl implements RaftServer.Division,
shouldShutdown = true;
}
reply = toRequestVoteReplyProto(candidateId, getMemberId(),
- voteGranted, state.getCurrentTerm(), shouldShutdown,
state.getLastEntry());
+ voteGranted, state.getCurrentTerm(), shouldShutdown,
state.getLastEntry(), callId);
if (LOG.isInfoEnabled()) {
LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
getMemberId(), phase, toRequestVoteReplyString(reply), state);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index f491aaee2..19d4ce6a7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.Preconditions;
@@ -44,9 +45,9 @@ final class ServerProtoUtils {
static RequestVoteReplyProto toRequestVoteReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long
term, boolean shouldShutdown,
- TermIndex lastEntry) {
+ TermIndex lastEntry, long callId) {
return RequestVoteReplyProto.newBuilder()
- .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId,
success))
+ .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId,
success).setCallId(callId))
.setTerm(term)
.setShouldShutdown(shouldShutdown)
.setLastEntry((lastEntry != null? lastEntry :
TermIndex.INITIAL_VALUE).toProto())
@@ -56,7 +57,8 @@ final class ServerProtoUtils {
static RequestVoteRequestProto toRequestVoteRequestProto(
RaftGroupMemberId requestorId, RaftPeerId replyId, long term, TermIndex
lastEntry, boolean preVote) {
final RequestVoteRequestProto.Builder b =
RequestVoteRequestProto.newBuilder()
-
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId,
replyId))
+
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId,
replyId)
+ .setCallId(CallId.getAndIncrement()))
.setCandidateTerm(term)
.setPreVote(preVote);
Optional.ofNullable(lastEntry).map(TermIndex::toProto).ifPresent(b::setCandidateLastEntry);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
new file mode 100644
index 000000000..780c249b6
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
+import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
+import org.apache.ratis.thirdparty.io.netty.channel.Channel;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
+import
org.apache.ratis.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
+import org.apache.ratis.util.JavaUtils;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestNettyRpcProxy extends BaseTest {
+ @Test
+ public void testOfferRollbackOnAlreadyClosed() throws Exception {
+ // Minimal netty server to allow client connect; we don't need to process
requests.
+ final EventLoopGroup bossGroup =
NettyUtils.newEventLoopGroup("test-netty-boss", 1, false);
+ final EventLoopGroup workerGroup =
NettyUtils.newEventLoopGroup("test-netty-worker", 1, false);
+ final EventLoopGroup clientGroup =
NettyUtils.newEventLoopGroup("test-netty-client", 1, false);
+ Channel serverChannel = null;
+ NettyRpcProxy proxy = null;
+ try {
+ final ChannelFuture bindFuture = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NettyUtils.getServerChannelClass(workerGroup))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object
msg) {
+ }
+ });
+ }
+ })
+ .bind(0)
+ .sync();
+ serverChannel = bindFuture.channel();
+
+ final InetSocketAddress address = (InetSocketAddress)
serverChannel.localAddress();
+ final String peerAddress = address.getHostString() + ":" +
address.getPort();
+ final RaftPeer peer =
RaftPeer.newBuilder().setId("s0").setAddress(peerAddress).build();
+ proxy = new NettyRpcProxy(peer, new RaftProperties(), clientGroup);
+
+ // Close to force AlreadyClosedException on write and trigger rollback
logic.
+ proxy.close();
+ final CompletableFuture<RaftNettyServerReplyProto> reply =
+ proxy.sendAsync(RaftNettyServerRequestProto.getDefaultInstance());
+
+ // Ensure the future completes exceptionally with AlreadyClosedException.
+ final Throwable thrown = assertThrows(CompletionException.class,
reply::join);
+ final Throwable unwrapped = JavaUtils.unwrapCompletionException(thrown);
+ assertInstanceOf(AlreadyClosedException.class, unwrapped);
+
+ // The replies queue must be empty after rollback; use reflection to
reach it.
+ final Object connection = getField(proxy, "connection");
+ final Map<?, ?> replies = getField(connection, "replies");
+ assertTrue(replies.isEmpty());
+ } finally {
+ if (proxy != null) {
+ proxy.close();
+ }
+ if (serverChannel != null) {
+ serverChannel.close().sync();
+ }
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ clientGroup.shutdownGracefully();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T getField(Object target, String name) throws Exception {
+ final Field field = target.getClass().getDeclaredField(name);
+ field.setAccessible(true);
+ return (T) field.get(target);
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
index c09b07876..d2f23e143 100644
---
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
@@ -18,10 +18,18 @@
package org.apache.ratis.netty;
import org.apache.ratis.RaftAsyncTests;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@Timeout(100)
public class TestRaftAsyncWithNetty
extends RaftAsyncTests<MiniRaftClusterWithNetty>
implements MiniRaftClusterWithNetty.FactoryGet {
-}
\ No newline at end of file
+
+ @Override
+ @Test
+ @Timeout(500)
+ public void testWithLoadAsync() throws Exception {
+ super.testWithLoadAsync();
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
index 3a91f9a34..92b7ad420 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
@@ -107,7 +107,8 @@ public class TestLeaderElectionServerInterface extends
BaseTest {
final long term = (lastEntry != null? lastEntry :
TermIndex.INITIAL_VALUE).getTerm();
// voter replies to candidate
- return ServerProtoUtils.toRequestVoteReplyProto(getId(), voter, true,
term, false, lastEntry);
+ return ServerProtoUtils.toRequestVoteReplyProto(
+ getId(), voter, true, term, false, lastEntry,
r.getServerRequest().getCallId());
}
@Override
@@ -190,4 +191,4 @@ public class TestLeaderElectionServerInterface extends
BaseTest {
election.startInForeground();
}
-}
\ No newline at end of file
+}