This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch release-3.1.1 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 54a991623ec34d8386cda18ec2c2951ccb08a70b Author: 133tosakarin <[email protected]> AuthorDate: Sat Sep 7 10:29:47 2024 +0800 RATIS-2148. Snapshot transfer may cause followers to trigger reloadStateMachine incorrectly (#1145) --- .../apache/ratis/server/impl/SnapshotInstallationHandler.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 4a63e64ee..9b84e8f1d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -68,6 +69,7 @@ class SnapshotInstallationHandler { new AtomicReference<>(INVALID_TERM_INDEX); private final AtomicBoolean isSnapshotNull = new AtomicBoolean(); private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX); + private final AtomicInteger nextChunkIndex = new AtomicInteger(-1); SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) { this.server = server; @@ -172,6 +174,12 @@ class SnapshotInstallationHandler { state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); + if (snapshotChunkRequest.getRequestIndex() == 0) { + nextChunkIndex.set(0); + } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { + throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() + + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); + } try { // Check and append the snapshot chunk. We simply put this in lock // considering a follower peer requiring a snapshot installation does not @@ -184,6 +192,8 @@ class SnapshotInstallationHandler { //TODO: We should only update State with installed snapshot once the request is done. state.installSnapshot(request); + int idx = nextChunkIndex.getAndIncrement(); + Preconditions.assertEquals(snapshotChunkRequest.getRequestIndex(), idx, "nextChunkIndex"); // update the committed index // re-load the state machine if this is the last chunk if (snapshotChunkRequest.getDone()) {
