This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch snapshot-3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 59b40376ea62e584fb3eac04de592e923214f97d Author: William Song <48054931+szywill...@users.noreply.github.com> AuthorDate: Fri Feb 21 03:38:50 2025 +0800 RATIS-2242 change consistency criteria of heartbeat during appendLog (#1215) --- .../apache/ratis/server/impl/RaftServerImpl.java | 18 +++- .../apache/ratis/server/impl/RaftServerProxy.java | 10 +- .../apache/ratis/server/impl/ServerImplUtils.java | 110 +++++++++++++++++++++ 3 files changed, 130 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 02e038ef8..b18960575 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.server.impl; -import java.util.concurrent.CountDownLatch; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.Timekeeper; @@ -25,11 +24,11 @@ import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; -import org.apache.ratis.proto.RaftProtos.LogInfoProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.LogInfoProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase; import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto; @@ -82,6 +81,8 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.LeaderElection.Phase; import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry; +import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices; +import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices; import org.apache.ratis.server.leader.LeaderState; import org.apache.ratis.server.metrics.LeaderElectionMetrics; import org.apache.ratis.server.metrics.RaftServerMetricsImpl; @@ -112,6 +113,7 @@ import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedSupplier; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import java.io.File; import java.io.IOException; @@ -126,6 +128,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -250,6 +253,7 @@ class RaftServerImpl implements RaftServer.Division, private final ThreadGroup threadGroup; private final AtomicReference<CompletableFuture<Void>> appendLogFuture; + private final NavigableIndices appendLogTermIndices = new NavigableIndices(); RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { @@ -1621,9 +1625,15 @@ class RaftServerImpl implements RaftServer.Division, return reply; }); } + private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) { + final List<ConsecutiveIndices> entriesTermIndices = ConsecutiveIndices.convert(entries); + appendLogTermIndices.append(entriesTermIndices); return appendLogFuture.updateAndGet(f -> f.thenCompose( - ignored -> JavaUtils.allOf(state.getLog().append(entries)))); + ignored -> JavaUtils.allOf(state.getLog().append(entries)))) + .whenComplete((v, e) -> { + appendLogTermIndices.removeExisting(entriesTermIndices); + }); } private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) { @@ -1650,7 +1660,7 @@ class RaftServerImpl implements RaftServer.Division, } // Check if "previous" is contained in current state. - if (previous != null && !state.containsTermIndex(previous)) { + if (previous != null && !(appendLogTermIndices.contains(previous) || state.containsTermIndex(previous))) { final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex()); LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous); return replyNextIndex; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 242bb377b..ac4ba64f8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -640,10 +640,12 @@ class RaftServerProxy implements RaftServer { } @Override - public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) { - final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId()); - return getImplFuture(groupId) - .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(request))); + public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( + AppendEntriesRequestProto request) { + final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId()); + return getImplFuture(groupId) + .thenCompose(impl -> JavaUtils.callAsUnchecked( + () -> impl.appendEntriesAsync(request), CompletionException::new)); } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index e26c6e0ab..c5010a534 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -40,11 +40,121 @@ import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; /** Server utilities for internal use. */ public final class ServerImplUtils { + /** The consecutive indices within the same term. */ + static class ConsecutiveIndices { + /** Convert the given entries to a list of {@link ConsecutiveIndices} */ + static List<ConsecutiveIndices> convert(List<LogEntryProto> entries) { + if (entries == null || entries.isEmpty()) { + return Collections.emptyList(); + } + + List<ConsecutiveIndices> indices = null; + + LogEntryProto previous = entries.get(0); + long startIndex = previous.getIndex(); + int count = 1; + + for (int i = 1; i < entries.size(); i++) { + final LogEntryProto current = entries.get(i); + // validate if the indices are consecutive + Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(), "index"); + + if (current.getTerm() == previous.getTerm()) { + count++; + } else { + // validate if the terms are increasing + Preconditions.assertTrue(previous.getTerm() < current.getTerm(), "term"); + if (indices == null) { + indices = new ArrayList<>(); + } + indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex, count)); + + startIndex = current.getIndex(); + count = 1; + } + previous = current; + } + + final ConsecutiveIndices last = new ConsecutiveIndices(previous.getTerm(), startIndex, count); + if (indices == null) { + return Collections.singletonList(last); + } else { + indices.add(last); + return indices; + } + } + + private final long term; + private final long startIndex; + private final int count; + + ConsecutiveIndices(long term, long startIndex, int count) { + Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 "); + this.term = term; + this.startIndex = startIndex; + this.count = count; + } + + long getNextIndex() { + return startIndex + count; + } + + Long getTerm(long index) { + final long diff = index - startIndex; + return diff < 0 || diff >= count ? null: term; + } + } + + /** A data structure to support the {@link #contains(TermIndex)} method. */ + static class NavigableIndices { + private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>(); + + boolean contains(TermIndex ti) { + final Long term = getTerm(ti.getIndex()); + return term != null && term == ti.getTerm(); + } + + synchronized Long getTerm(long index) { + if (map.isEmpty()) { + return null; + } + + final Map.Entry<Long, ConsecutiveIndices> floorEntry = map.floorEntry(index); + if (floorEntry == null) { + return null; + } + return floorEntry.getValue().getTerm(index); + } + + synchronized void append(List<ConsecutiveIndices> entriesTermIndices) { + for(ConsecutiveIndices indices : entriesTermIndices) { + // validate startIndex + final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry(); + if (lastEntry != null) { + Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex"); + } + map.put(indices.startIndex, indices); + } + } + + synchronized void removeExisting(List<ConsecutiveIndices> entriesTermIndices) { + for(ConsecutiveIndices indices : entriesTermIndices) { + final ConsecutiveIndices removed = map.remove(indices.startIndex); + Preconditions.assertSame(indices, removed, "removed"); + } + } + } + private ServerImplUtils() { //Never constructed }