This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 29ebff2c RATIS-1577. Install snapshot failure (#643)
29ebff2c is described below

commit 29ebff2c7406c22791d80fdb372afd04e1b631e5
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue May 10 17:07:48 2022 +0200

    RATIS-1577. Install snapshot failure (#643)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 13 ++++-----
 .../server/impl/SnapshotInstallationHandler.java   | 31 +++++++++++++---------
 2 files changed, 25 insertions(+), 19 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f6c45faa..c26bed77 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -82,6 +82,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
 import static 
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
 import static 
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
+import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
 import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
 import static org.apache.ratis.util.LifeCycle.State.NEW;
 import static org.apache.ratis.util.LifeCycle.State.PAUSED;
@@ -1025,7 +1026,7 @@ class RaftServerImpl implements RaftServer.Division,
     synchronized (this) {
       final long installSnapshot = 
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
       // check snapshot install/load
-      if (installSnapshot != 0) {
+      if (installSnapshot != INVALID_LOG_INDEX) {
         String msg = String.format("%s: Failed do snapshot as snapshot (%s) 
installation is in progress",
             getMemberId(), installSnapshot);
         LOG.warn(msg);
@@ -1314,7 +1315,7 @@ class RaftServerImpl implements RaftServer.Division,
       if (!recognized) {
         final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
             leaderId, getMemberId(), currentTerm, followerCommit, 
state.getNextIndex(), NOT_LEADER, callId,
-            RaftLog.INVALID_LOG_INDEX, isHeartbeat);
+            INVALID_LOG_INDEX, isHeartbeat);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} 
reply: {}",
               getMemberId(), leaderId, leaderTerm, state, 
ServerStringUtils.toAppendEntriesReplyString(reply));
@@ -1368,7 +1369,7 @@ class RaftServerImpl implements RaftServer.Division,
         updateCommitInfoCache();
         final long n = isHeartbeat? state.getLog().getNextIndex(): 
entries[entries.length - 1].getIndex() + 1;
         final long matchIndex = entries.length != 0 ? entries[entries.length - 
1].getIndex() :
-            RaftLog.INVALID_LOG_INDEX;
+            INVALID_LOG_INDEX;
         reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, 
getMemberId(), currentTerm,
             state.getLog().getLastCommittedIndex(), n, SUCCESS, callId, 
matchIndex,
             isHeartbeat);
@@ -1389,7 +1390,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
         leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex, 
INCONSISTENCY, callId,
-        RaftLog.INVALID_LOG_INDEX, isHeartbeat);
+        INVALID_LOG_INDEX, isHeartbeat);
     LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), 
ServerStringUtils.toAppendEntriesReplyString(reply));
     return reply;
   }
@@ -1397,7 +1398,7 @@ class RaftServerImpl implements RaftServer.Division,
   private long checkInconsistentAppendEntries(TermIndex previous, 
LogEntryProto... entries) {
     // Check if a snapshot installation through state machine is in progress.
     final long installSnapshot = 
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
-    if (installSnapshot != 0) {
+    if (installSnapshot != INVALID_LOG_INDEX) {
       LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in 
progress", getMemberId(), installSnapshot);
       return state.getNextIndex();
     }
@@ -1409,7 +1410,7 @@ class RaftServerImpl implements RaftServer.Division,
       final long snapshotIndex = state.getSnapshotIndex();
       final long commitIndex =  state.getLog().getLastCommittedIndex();
       final long nextIndex = Math.max(snapshotIndex, commitIndex);
-      if (nextIndex > 0 && nextIndex >= firstEntryIndex) {
+      if (nextIndex > INVALID_LOG_INDEX && nextIndex >= firstEntryIndex) {
         LOG.info("{}: Failed appendEntries as the first entry (index {})" +
                 " already exists (snapshotIndex: {}, commitIndex: {})",
             getMemberId(), firstEntryIndex, snapshotIndex, commitIndex);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 6c24d2f2..8e06cf78 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -49,15 +49,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
+
 class SnapshotInstallationHandler {
   static final Logger LOG = 
LoggerFactory.getLogger(SnapshotInstallationHandler.class);
 
+  static final TermIndex INVALID_TERM_INDEX = TermIndex.valueOf(0, 
INVALID_LOG_INDEX);
+
   private final RaftServerImpl server;
   private final ServerState state;
 
   private final boolean installSnapshotEnabled;
-  private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong();
-  private final AtomicReference<TermIndex> installedSnapshotTermIndex = new 
AtomicReference<>(TermIndex.valueOf(0,0));
+  private final AtomicLong inProgressInstallSnapshotIndex = new 
AtomicLong(INVALID_LOG_INDEX);
+  private final AtomicReference<TermIndex> installedSnapshotTermIndex =
+    new AtomicReference<>(INVALID_TERM_INDEX);
   private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
 
   SnapshotInstallationHandler(RaftServerImpl server, RaftProperties 
properties) {
@@ -206,16 +211,16 @@ class SnapshotInstallationHandler {
       state.setLeader(leaderId, "installSnapshot");
       
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
 
-      if (inProgressInstallSnapshotIndex.compareAndSet(0, 
firstAvailableLogIndex)) {
+      if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX, 
firstAvailableLogIndex)) {
         LOG.info("{}: Received notification to install snapshot at index {}", 
getMemberId(), firstAvailableLogIndex);
         // Check if snapshot index is already at par or ahead of the first
         // available log index of the Leader.
         final long snapshotIndex = state.getLog().getSnapshotIndex();
-        if (snapshotIndex + 1 >= firstAvailableLogIndex && 
firstAvailableLogIndex > 0) {
+        if (snapshotIndex + 1 >= firstAvailableLogIndex && 
firstAvailableLogIndex > INVALID_LOG_INDEX) {
           // State Machine has already installed the snapshot. Return the
           // latest snapshot index to the Leader.
 
-          inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
0);
+          inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
INVALID_LOG_INDEX);
           LOG.info("{}: InstallSnapshot notification result: {}, current 
snapshot index: {}", getMemberId(),
               InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
           return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(), currentTerm,
@@ -258,7 +263,7 @@ class SnapshotInstallationHandler {
               if (exception != null) {
                 LOG.error("{}: Failed to notify StateMachine to 
InstallSnapshot. Exception: {}",
                     getMemberId(), exception.getMessage());
-                
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 0);
+                
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
INVALID_LOG_INDEX);
                 return;
               }
 
@@ -284,8 +289,8 @@ class SnapshotInstallationHandler {
       }
 
       final long inProgressInstallSnapshotIndexValue = 
getInProgressInstallSnapshotIndex();
-      Preconditions.assertTrue(
-          inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex && 
inProgressInstallSnapshotIndexValue > 0,
+      Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= 
firstAvailableLogIndex
+              && inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX,
           "inProgressInstallSnapshotRequest: %s is not eligible, 
firstAvailableLogIndex: %s",
           getInProgressInstallSnapshotIndex(), firstAvailableLogIndex);
 
@@ -293,22 +298,22 @@ class SnapshotInstallationHandler {
       if (isSnapshotNull.compareAndSet(true, false)) {
         LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
-        inProgressInstallSnapshotIndex.set(0);
+        inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
       }
 
       // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the 
installed snapshot index and reset
-      // installedSnapshotIndex to (0,0).
+      // installedSnapshotIndex to (0,-1).
       final TermIndex latestInstalledSnapshotTermIndex = 
this.installedSnapshotTermIndex
-          .getAndSet(TermIndex.valueOf(0,0));
-      if (latestInstalledSnapshotTermIndex.getIndex() > 0) {
+          .getAndSet(INVALID_TERM_INDEX);
+      if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) {
         server.getStateMachine().pause();
         state.updateInstalledSnapshotIndex(latestInstalledSnapshotTermIndex);
         state.reloadStateMachine(latestInstalledSnapshotTermIndex.getIndex());
         LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", 
getMemberId(),
             InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex);
-        inProgressInstallSnapshotIndex.set(0);
+        inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex.getIndex());
       }

Reply via email to