This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d1e76cf2b RATIS-2148. Snapshot transfer may cause followers to trigger
reloadStateMachine incorrectly (#1145)
d1e76cf2b is described below
commit d1e76cf2b0f5ce31a6e802d57e01537b7e920612
Author: 133tosakarin <[email protected]>
AuthorDate: Sat Sep 7 10:29:47 2024 +0800
RATIS-2148. Snapshot transfer may cause followers to trigger
reloadStateMachine incorrectly (#1145)
---
.../apache/ratis/server/impl/SnapshotInstallationHandler.java | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 4a63e64ee..9b84e8f1d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,6 +69,7 @@ class SnapshotInstallationHandler {
new AtomicReference<>(INVALID_TERM_INDEX);
private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
+ private final AtomicInteger nextChunkIndex = new AtomicInteger(-1);
SnapshotInstallationHandler(RaftServerImpl server, RaftProperties
properties) {
this.server = server;
@@ -172,6 +174,12 @@ class SnapshotInstallationHandler {
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
+ if (snapshotChunkRequest.getRequestIndex() == 0) {
+ nextChunkIndex.set(0);
+ } else if (nextChunkIndex.get() !=
snapshotChunkRequest.getRequestIndex()) {
+ throw new IOException("Snapshot request already failed at chunk index
" + nextChunkIndex.get()
+ + "; ignoring request with chunk index " +
snapshotChunkRequest.getRequestIndex());
+ }
try {
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does
not
@@ -184,6 +192,8 @@ class SnapshotInstallationHandler {
//TODO: We should only update State with installed snapshot once the
request is done.
state.installSnapshot(request);
+ int idx = nextChunkIndex.getAndIncrement();
+ Preconditions.assertEquals(snapshotChunkRequest.getRequestIndex(),
idx, "nextChunkIndex");
// update the committed index
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {