This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e75a0d51d RATIS-2183. Detect staled snapshot request. (#1173)
e75a0d51d is described below
commit e75a0d51deab734bdaff4a9a743995c9feb242bc
Author: 133tosakarin <[email protected]>
AuthorDate: Fri Nov 1 23:29:18 2024 +0800
RATIS-2183. Detect staled snapshot request. (#1173)
---
.../org/apache/ratis/grpc/server/GrpcLogAppender.java | 2 ++
ratis-proto/src/main/proto/Raft.proto | 1 +
.../ratis/server/impl/SnapshotInstallationHandler.java | 17 +++++++++++++++++
.../apache/ratis/server/leader/LogAppenderDefault.java | 1 +
4 files changed, 21 insertions(+)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 45bc4c888..0784eaf04 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -717,6 +717,8 @@ public class GrpcLogAppender extends LogAppenderBase {
LOG.error("Unrecognized the reply result {}: Leader is {}, follower
is {}",
reply.getResult(), getServer().getId(), getFollowerId());
break;
+ case SNAPSHOT_EXPIRED:
+ LOG.warn("{}: Follower could not install snapshot as it is
expired.", this);
default:
break;
}
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index b2e96e283..7cf2fd87c 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -155,6 +155,7 @@ enum InstallSnapshotResult {
CONF_MISMATCH = 4;
SNAPSHOT_INSTALLED = 5;
SNAPSHOT_UNAVAILABLE = 6;
+ SNAPSHOT_EXPIRED = 7;
}
message RequestVoteRequestProto {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 8de9a3756..70027e6dd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -71,6 +71,8 @@ class SnapshotInstallationHandler {
private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
private final AtomicInteger nextChunkIndex = new AtomicInteger(-1);
+ /** The callId of the chunk with index 0. */
+ private final AtomicLong chunk0CallId = new AtomicLong(-1);
SnapshotInstallationHandler(RaftServerImpl server, RaftProperties
properties) {
this.server = server;
@@ -176,8 +178,22 @@ class SnapshotInstallationHandler {
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
+ long callId = chunk0CallId.get();
+ // 1. leaderTerm < currentTerm will never come here
+ // 2. leaderTerm == currentTerm && callId == request.getCallId()
+ // means the snapshotRequest is staled with the same leader
+ // 3. leaderTerm > currentTerm means this is a new snapshot request from
a new leader,
+ // chunk0CallId will be reset when a snapshot request with
requestIndex == 0 is received .
+ if (callId > request.getServerRequest().getCallId() && currentTerm ==
leaderTerm) {
+ LOG.warn("{}: Snapshot Request Staled: chunk 0 callId is {} but {}",
getMemberId(), callId,
+ ServerStringUtils.toInstallSnapshotRequestString(request));
+ InstallSnapshotReplyProto reply =
toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SNAPSHOT_EXPIRED);
+ return future.thenApply(dummy -> reply);
+ }
if (snapshotChunkRequest.getRequestIndex() == 0) {
nextChunkIndex.set(0);
+ chunk0CallId.set(request.getServerRequest().getCallId());
} else if (nextChunkIndex.get() !=
snapshotChunkRequest.getRequestIndex()) {
throw new IOException("Snapshot request already failed at chunk index
" + nextChunkIndex.get()
+ "; ignoring request with chunk index " +
snapshotChunkRequest.getRequestIndex());
@@ -205,6 +221,7 @@ class SnapshotInstallationHandler {
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
state.reloadStateMachine(lastIncluded);
+ chunk0CallId.set(-1);
}
} finally {
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 8ec6c19db..8c1675c7c 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -155,6 +155,7 @@ class LogAppenderDefault extends LogAppenderBase {
case SUCCESS:
case SNAPSHOT_UNAVAILABLE:
case ALREADY_INSTALLED:
+ case SNAPSHOT_EXPIRED:
getFollower().setAttemptedToInstallSnapshot();
break;
default: