This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch release-3.1.2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 7c3942d1d09bdfa6a062b06e4cef6cf01c4a2e1f
Author: 133tosakarin <[email protected]>
AuthorDate: Fri Nov 1 23:29:18 2024 +0800

    RATIS-2183. Detect staled snapshot request. (#1173)
---
 .../org/apache/ratis/grpc/server/GrpcLogAppender.java   |  2 ++
 ratis-proto/src/main/proto/Raft.proto                   |  1 +
 .../ratis/server/impl/SnapshotInstallationHandler.java  | 17 +++++++++++++++++
 .../apache/ratis/server/leader/LogAppenderDefault.java  |  1 +
 4 files changed, 21 insertions(+)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 192bc7564..ce7bd315c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -708,6 +708,8 @@ public class GrpcLogAppender extends LogAppenderBase {
           LOG.error("Unrecognized the reply result {}: Leader is {}, follower 
is {}",
               reply.getResult(), getServer().getId(), getFollowerId());
           break;
+        case SNAPSHOT_EXPIRED:
+          LOG.warn("{}: Follower could not install snapshot as it is 
expired.", this);
         default:
           break;
       }
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index b2e96e283..7cf2fd87c 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -155,6 +155,7 @@ enum InstallSnapshotResult {
   CONF_MISMATCH = 4;
   SNAPSHOT_INSTALLED = 5;
   SNAPSHOT_UNAVAILABLE = 6;
+  SNAPSHOT_EXPIRED = 7;
 }
 
 message RequestVoteRequestProto {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 8de9a3756..70027e6dd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -71,6 +71,8 @@ class SnapshotInstallationHandler {
   private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
   private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
   private final AtomicInteger nextChunkIndex = new AtomicInteger(-1);
+  /** The callId of the chunk with index 0. */
+  private final AtomicLong chunk0CallId = new AtomicLong(-1);
 
   SnapshotInstallationHandler(RaftServerImpl server, RaftProperties 
properties) {
     this.server = server;
@@ -176,8 +178,22 @@ class SnapshotInstallationHandler {
       state.setLeader(leaderId, "installSnapshot");
 
       
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
+      long callId = chunk0CallId.get();
+      // 1. leaderTerm < currentTerm will never come here
+      // 2. leaderTerm == currentTerm && callId == request.getCallId()
+      //    means the snapshotRequest is staled with the same leader
+      // 3. leaderTerm > currentTerm means this is a new snapshot request from 
a new leader,
+      //    chunk0CallId will be reset when a snapshot request with 
requestIndex == 0 is received .
+      if (callId > request.getServerRequest().getCallId() && currentTerm == 
leaderTerm) {
+        LOG.warn("{}: Snapshot Request Staled: chunk 0 callId is {} but {}", 
getMemberId(), callId,
+            ServerStringUtils.toInstallSnapshotRequestString(request));
+        InstallSnapshotReplyProto reply =  
toInstallSnapshotReplyProto(leaderId, getMemberId(),
+            currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.SNAPSHOT_EXPIRED);
+        return future.thenApply(dummy -> reply);
+      }
       if (snapshotChunkRequest.getRequestIndex() == 0) {
         nextChunkIndex.set(0);
+        chunk0CallId.set(request.getServerRequest().getCallId());
       } else if (nextChunkIndex.get() != 
snapshotChunkRequest.getRequestIndex()) {
         throw new IOException("Snapshot request already failed at chunk index 
" + nextChunkIndex.get()
                 + "; ignoring request with chunk index " + 
snapshotChunkRequest.getRequestIndex());
@@ -205,6 +221,7 @@ class SnapshotInstallationHandler {
         // re-load the state machine if this is the last chunk
         if (snapshotChunkRequest.getDone()) {
           state.reloadStateMachine(lastIncluded);
+          chunk0CallId.set(-1);
         }
       } finally {
         
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 1b21bb7e4..9d1edd469 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -150,6 +150,7 @@ class LogAppenderDefault extends LogAppenderBase {
               case SUCCESS:
               case SNAPSHOT_UNAVAILABLE:
               case ALREADY_INSTALLED:
+              case SNAPSHOT_EXPIRED:
                 getFollower().setAttemptedToInstallSnapshot();
                 break;
               default:

Reply via email to