This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push: new 8ab57e1c0 RATIS-2278. Follower Fails to Append Entries Due to Index Validation Race Condition in NavigableIndices (#1248) 8ab57e1c0 is described below commit 8ab57e1c064c6ccfec504597289590ac7ce7b106 Author: GewuNewOne <89496957+rkg...@users.noreply.github.com> AuthorDate: Wed Apr 23 00:23:24 2025 +0800 RATIS-2278. Follower Fails to Append Entries Due to Index Validation Race Condition in NavigableIndices (#1248) --- .../org/apache/ratis/server/impl/RaftServerImpl.java | 5 ++++- .../org/apache/ratis/server/impl/ServerImplUtils.java | 19 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index a6798c48b..a3652f497 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1694,7 +1694,10 @@ class RaftServerImpl implements RaftServer.Division, final List<ConsecutiveIndices> entriesTermIndices; try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries = entriesRef.retainAndReleaseOnClose()) { entriesTermIndices = ConsecutiveIndices.convert(entries.get()); - appendLogTermIndices.append(entriesTermIndices); + if (!appendLogTermIndices.append(entriesTermIndices)) { + // index already exists, return the last future + return appendLogFuture.get(); + } } entriesRef.retain(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index c5010a534..ce4702d95 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -136,15 +136,20 @@ public final class ServerImplUtils { return floorEntry.getValue().getTerm(index); } - synchronized void append(List<ConsecutiveIndices> entriesTermIndices) { - for(ConsecutiveIndices indices : entriesTermIndices) { - // validate startIndex - final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry(); - if (lastEntry != null) { - Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex"); + synchronized boolean append(List<ConsecutiveIndices> entriesTermIndices) { + for(int i = 0; i < entriesTermIndices.size(); i++) { + final ConsecutiveIndices indices = entriesTermIndices.get(i); + final ConsecutiveIndices previous = map.put(indices.startIndex, indices); + if (previous != null) { + // index already exists, revert this append + map.put(previous.startIndex, previous); + for(int j = 0; j < i; j++) { + map.remove(entriesTermIndices.get(j).startIndex); + } + return false; } - map.put(indices.startIndex, indices); } + return true; } synchronized void removeExisting(List<ConsecutiveIndices> entriesTermIndices) {