This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 29ebff2c RATIS-1577. Install snapshot failure (#643)
29ebff2c is described below
commit 29ebff2c7406c22791d80fdb372afd04e1b631e5
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue May 10 17:07:48 2022 +0200
RATIS-1577. Install snapshot failure (#643)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 13 ++++-----
.../server/impl/SnapshotInstallationHandler.java | 31 +++++++++++++---------
2 files changed, 25 insertions(+), 19 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f6c45faa..c26bed77 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -82,6 +82,7 @@ import java.util.stream.Collectors;
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
+import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
import static org.apache.ratis.util.LifeCycle.State.NEW;
import static org.apache.ratis.util.LifeCycle.State.PAUSED;
@@ -1025,7 +1026,7 @@ class RaftServerImpl implements RaftServer.Division,
synchronized (this) {
final long installSnapshot =
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
// check snapshot install/load
- if (installSnapshot != 0) {
+ if (installSnapshot != INVALID_LOG_INDEX) {
String msg = String.format("%s: Failed do snapshot as snapshot (%s)
installation is in progress",
getMemberId(), installSnapshot);
LOG.warn(msg);
@@ -1314,7 +1315,7 @@ class RaftServerImpl implements RaftServer.Division,
if (!recognized) {
final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit,
state.getNextIndex(), NOT_LEADER, callId,
- RaftLog.INVALID_LOG_INDEX, isHeartbeat);
+ INVALID_LOG_INDEX, isHeartbeat);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Not recognize {} (term={}) as leader, state: {}
reply: {}",
getMemberId(), leaderId, leaderTerm, state,
ServerStringUtils.toAppendEntriesReplyString(reply));
@@ -1368,7 +1369,7 @@ class RaftServerImpl implements RaftServer.Division,
updateCommitInfoCache();
final long n = isHeartbeat? state.getLog().getNextIndex():
entries[entries.length - 1].getIndex() + 1;
final long matchIndex = entries.length != 0 ? entries[entries.length -
1].getIndex() :
- RaftLog.INVALID_LOG_INDEX;
+ INVALID_LOG_INDEX;
reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId,
getMemberId(), currentTerm,
state.getLog().getLastCommittedIndex(), n, SUCCESS, callId,
matchIndex,
isHeartbeat);
@@ -1389,7 +1390,7 @@ class RaftServerImpl implements RaftServer.Division,
final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex,
INCONSISTENCY, callId,
- RaftLog.INVALID_LOG_INDEX, isHeartbeat);
+ INVALID_LOG_INDEX, isHeartbeat);
LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(),
ServerStringUtils.toAppendEntriesReplyString(reply));
return reply;
}
@@ -1397,7 +1398,7 @@ class RaftServerImpl implements RaftServer.Division,
private long checkInconsistentAppendEntries(TermIndex previous,
LogEntryProto... entries) {
// Check if a snapshot installation through state machine is in progress.
final long installSnapshot =
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
- if (installSnapshot != 0) {
+ if (installSnapshot != INVALID_LOG_INDEX) {
LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in
progress", getMemberId(), installSnapshot);
return state.getNextIndex();
}
@@ -1409,7 +1410,7 @@ class RaftServerImpl implements RaftServer.Division,
final long snapshotIndex = state.getSnapshotIndex();
final long commitIndex = state.getLog().getLastCommittedIndex();
final long nextIndex = Math.max(snapshotIndex, commitIndex);
- if (nextIndex > 0 && nextIndex >= firstEntryIndex) {
+ if (nextIndex > INVALID_LOG_INDEX && nextIndex >= firstEntryIndex) {
LOG.info("{}: Failed appendEntries as the first entry (index {})" +
" already exists (snapshotIndex: {}, commitIndex: {})",
getMemberId(), firstEntryIndex, snapshotIndex, commitIndex);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 6c24d2f2..8e06cf78 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -49,15 +49,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
+
class SnapshotInstallationHandler {
static final Logger LOG =
LoggerFactory.getLogger(SnapshotInstallationHandler.class);
+ static final TermIndex INVALID_TERM_INDEX = TermIndex.valueOf(0,
INVALID_LOG_INDEX);
+
private final RaftServerImpl server;
private final ServerState state;
private final boolean installSnapshotEnabled;
- private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong();
- private final AtomicReference<TermIndex> installedSnapshotTermIndex = new
AtomicReference<>(TermIndex.valueOf(0,0));
+ private final AtomicLong inProgressInstallSnapshotIndex = new
AtomicLong(INVALID_LOG_INDEX);
+ private final AtomicReference<TermIndex> installedSnapshotTermIndex =
+ new AtomicReference<>(INVALID_TERM_INDEX);
private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
SnapshotInstallationHandler(RaftServerImpl server, RaftProperties
properties) {
@@ -206,16 +211,16 @@ class SnapshotInstallationHandler {
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
- if (inProgressInstallSnapshotIndex.compareAndSet(0,
firstAvailableLogIndex)) {
+ if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX,
firstAvailableLogIndex)) {
LOG.info("{}: Received notification to install snapshot at index {}",
getMemberId(), firstAvailableLogIndex);
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
final long snapshotIndex = state.getLog().getSnapshotIndex();
- if (snapshotIndex + 1 >= firstAvailableLogIndex &&
firstAvailableLogIndex > 0) {
+ if (snapshotIndex + 1 >= firstAvailableLogIndex &&
firstAvailableLogIndex > INVALID_LOG_INDEX) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
- inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex,
0);
+ inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex,
INVALID_LOG_INDEX);
LOG.info("{}: InstallSnapshot notification result: {}, current
snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(), currentTerm,
@@ -258,7 +263,7 @@ class SnapshotInstallationHandler {
if (exception != null) {
LOG.error("{}: Failed to notify StateMachine to
InstallSnapshot. Exception: {}",
getMemberId(), exception.getMessage());
-
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 0);
+
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex,
INVALID_LOG_INDEX);
return;
}
@@ -284,8 +289,8 @@ class SnapshotInstallationHandler {
}
final long inProgressInstallSnapshotIndexValue =
getInProgressInstallSnapshotIndex();
- Preconditions.assertTrue(
- inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex &&
inProgressInstallSnapshotIndexValue > 0,
+ Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <=
firstAvailableLogIndex
+ && inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX,
"inProgressInstallSnapshotRequest: %s is not eligible,
firstAvailableLogIndex: %s",
getInProgressInstallSnapshotIndex(), firstAvailableLogIndex);
@@ -293,22 +298,22 @@ class SnapshotInstallationHandler {
if (isSnapshotNull.compareAndSet(true, false)) {
LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
- inProgressInstallSnapshotIndex.set(0);
+ inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
}
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the
installed snapshot index and reset
- // installedSnapshotIndex to (0,0).
+ // installedSnapshotIndex to (0,-1).
final TermIndex latestInstalledSnapshotTermIndex =
this.installedSnapshotTermIndex
- .getAndSet(TermIndex.valueOf(0,0));
- if (latestInstalledSnapshotTermIndex.getIndex() > 0) {
+ .getAndSet(INVALID_TERM_INDEX);
+ if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) {
server.getStateMachine().pause();
state.updateInstalledSnapshotIndex(latestInstalledSnapshotTermIndex);
state.reloadStateMachine(latestInstalledSnapshotTermIndex.getIndex());
LOG.info("{}: InstallSnapshot notification result: {}, at index: {}",
getMemberId(),
InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotTermIndex);
- inProgressInstallSnapshotIndex.set(0);
+ inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotTermIndex.getIndex());
}