This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch release-3.1.1
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 54a991623ec34d8386cda18ec2c2951ccb08a70b
Author: 133tosakarin <[email protected]>
AuthorDate: Sat Sep 7 10:29:47 2024 +0800

    RATIS-2148. Snapshot transfer may cause followers to trigger 
reloadStateMachine incorrectly (#1145)
---
 .../apache/ratis/server/impl/SnapshotInstallationHandler.java  | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 4a63e64ee..9b84e8f1d 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -68,6 +69,7 @@ class SnapshotInstallationHandler {
     new AtomicReference<>(INVALID_TERM_INDEX);
   private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
   private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
+  private final AtomicInteger nextChunkIndex = new AtomicInteger(-1);
 
   SnapshotInstallationHandler(RaftServerImpl server, RaftProperties 
properties) {
     this.server = server;
@@ -172,6 +174,12 @@ class SnapshotInstallationHandler {
       state.setLeader(leaderId, "installSnapshot");
 
       
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
+      if (snapshotChunkRequest.getRequestIndex() == 0) {
+        nextChunkIndex.set(0);
+      } else if (nextChunkIndex.get() != 
snapshotChunkRequest.getRequestIndex()) {
+        throw new IOException("Snapshot request already failed at chunk index 
" + nextChunkIndex.get()
+                + "; ignoring request with chunk index " + 
snapshotChunkRequest.getRequestIndex());
+      }
       try {
         // Check and append the snapshot chunk. We simply put this in lock
         // considering a follower peer requiring a snapshot installation does 
not
@@ -184,6 +192,8 @@ class SnapshotInstallationHandler {
         //TODO: We should only update State with installed snapshot once the 
request is done.
         state.installSnapshot(request);
 
+        int idx = nextChunkIndex.getAndIncrement();
+        Preconditions.assertEquals(snapshotChunkRequest.getRequestIndex(), 
idx, "nextChunkIndex");
         // update the committed index
         // re-load the state machine if this is the last chunk
         if (snapshotChunkRequest.getDone()) {

Reply via email to