This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7167faf  RATIS-1481. make state upgradate in 
notifyStateMachineToInstallSnapshot serialized (#573)
7167faf is described below

commit 7167fafe75f8c50798561a135405caa984edc7e3
Author: Nibiru <[email protected]>
AuthorDate: Wed Mar 30 13:45:37 2022 +0800

    RATIS-1481. make state upgradate in notifyStateMachineToInstallSnapshot 
serialized (#573)
---
 .../org/apache/ratis/server/impl/ServerState.java  |   2 +-
 .../server/impl/SnapshotInstallationHandler.java   | 112 +++++++++++----------
 2 files changed, 59 insertions(+), 55 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index f05776a..b66ba73 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -470,7 +470,7 @@ class ServerState implements Closeable {
 
   long getNextIndex() {
     final long logNextIndex = log.getNextIndex();
-    final long snapshotNextIndex = getSnapshotIndex() + 1;
+    final long snapshotNextIndex = log.getSnapshotIndex() + 1;
     return Math.max(logNextIndex, snapshotNextIndex);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index ad6f5a4..6c24d2f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.LifeCycle;
@@ -44,10 +45,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 class SnapshotInstallationHandler {
   static final Logger LOG = 
LoggerFactory.getLogger(SnapshotInstallationHandler.class);
@@ -57,7 +57,7 @@ class SnapshotInstallationHandler {
 
   private final boolean installSnapshotEnabled;
   private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong();
-  private final AtomicLong installedSnapshotIndex = new AtomicLong();
+  private final AtomicReference<TermIndex> installedSnapshotTermIndex = new 
AtomicReference<>(TermIndex.valueOf(0,0));
   private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
 
   SnapshotInstallationHandler(RaftServerImpl server, RaftProperties 
properties) {
@@ -120,12 +120,13 @@ class SnapshotInstallationHandler {
       if (request.hasLastRaftConfigurationLogEntryProto()) {
         // Set the configuration included in the snapshot
         final LogEntryProto proto = 
request.getLastRaftConfigurationLogEntryProto();
-        LOG.info("{}: set new configuration {} from snapshot", getMemberId(), 
proto);
-
-        state.setRaftConf(proto);
-        state.writeRaftConfiguration(proto);
-        server.getStateMachine().event().notifyConfigurationChanged(
-            proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry());
+        if 
(!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
+          LOG.info("{}: set new configuration {} from snapshot", 
getMemberId(), proto);
+          state.setRaftConf(proto);
+          state.writeRaftConfiguration(proto);
+          server.getStateMachine().event().notifyConfigurationChanged(
+              proto.getTerm(), proto.getIndex(), 
proto.getConfigurationEntry());
+        }
       }
       return reply;
     }
@@ -203,13 +204,13 @@ class SnapshotInstallationHandler {
       }
       server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
-      long snapshotIndex = state.getSnapshotIndex();
-
       
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
+
       if (inProgressInstallSnapshotIndex.compareAndSet(0, 
firstAvailableLogIndex)) {
         LOG.info("{}: Received notification to install snapshot at index {}", 
getMemberId(), firstAvailableLogIndex);
         // Check if snapshot index is already at par or ahead of the first
         // available log index of the Leader.
+        final long snapshotIndex = state.getLog().getSnapshotIndex();
         if (snapshotIndex + 1 >= firstAvailableLogIndex && 
firstAvailableLogIndex > 0) {
           // State Machine has already installed the snapshot. Return the
           // latest snapshot index to the Leader.
@@ -241,44 +242,37 @@ class SnapshotInstallationHandler {
         // index. Notify the state machine to install the snapshot.
         LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's 
first available index is {}.",
             getMemberId(), state.getLog().getNextIndex(), 
firstAvailableLogIndex);
-        try {
-          
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, 
firstAvailableLogTermIndex)
-              .whenComplete((reply, exception) -> {
-                if (exception != null) {
-                  LOG.warn("{}: Failed to notify StateMachine to 
InstallSnapshot. Exception: {}",
-                      getMemberId(), exception.getMessage());
-                  
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 0);
-                  return;
-                }
+        // If notifyInstallSnapshotFromLeader future is done asynchronously, 
the main thread will go through the
+        // downside part. As the time consumed by user-defined statemachine is 
uncontrollable(e.g. the RocksDB
+        // checkpoint could be constantly increasing, the transmission will 
always exceed one boundary), we expect that
+        // once snapshot installed, follower could work ASAP. For the rest of 
time, server can respond snapshot
+        // installation in progress.
 
-                if (reply != null) {
-                  LOG.info("{}: StateMachine successfully installed snapshot 
index {}. Reloading the StateMachine.",
-                      getMemberId(), reply.getIndex());
-                  server.getStateMachine().pause();
-                  state.updateInstalledSnapshotIndex(reply);
-                  state.reloadStateMachine(reply.getIndex());
-                  installedSnapshotIndex.set(reply.getIndex());
-                } else {
-                  isSnapshotNull.set(true);
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug("{}: StateMachine could not install snapshot as 
it is not available", this);
-                  }
+        // There is another appendLog thread appending raft entries, which 
returns inconsistency entries with
+        // nextIndex and commitIndex to the leader when install snapshot in 
progress. The nextIndex on follower side
+        // is updated when state.reloadStateMachine. We shall keep this index 
upgraded synchronously with main thread,
+        // otherwise leader could get this follower's latest nextIndex from 
appendEntries instead of after
+        // acknowledging the SNAPSHOT_INSTALLED.
+        
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, 
firstAvailableLogTermIndex)
+            .whenComplete((reply, exception) -> {
+              if (exception != null) {
+                LOG.error("{}: Failed to notify StateMachine to 
InstallSnapshot. Exception: {}",
+                    getMemberId(), exception.getMessage());
+                
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 0);
+                return;
+              }
+
+              if (reply != null) {
+                LOG.info("{}: StateMachine successfully installed snapshot 
index {}. Reloading the StateMachine.",
+                    getMemberId(), reply.getIndex());
+                installedSnapshotTermIndex.set(reply);
+              } else {
+                isSnapshotNull.set(true);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("{}: StateMachine could not install snapshot as it 
is not available", this);
                 }
-                // wait for 1 seconds for statemachine to install snapshot
-              }).get(1, TimeUnit.SECONDS);
-        } catch (InterruptedException | TimeoutException t) {
-          //nothing to do
-        } catch (Exception t) {
-          // there are two cases:
-          //1 `get()` may throw ExecutionException if `whenComplete` throw an 
exception
-          //2 when generating completeFuture, 
`statemachine#notifyInstallSnapshotFromLeader`
-          // may throw an uncertain exception, which is determined by the 
implementation of
-          // user statemachine.
-          inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
0);
-          final String err = getMemberId() + ": Failed to notify StateMachine 
to InstallSnapshot.";
-          LOG.warn(err + " " + t);
-          throw new IOException(err, t);
-        }
+              }
+            });
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: StateMachine is processing Snapshot Installation 
Request.", getMemberId());
@@ -289,24 +283,34 @@ class SnapshotInstallationHandler {
         }
       }
 
+      final long inProgressInstallSnapshotIndexValue = 
getInProgressInstallSnapshotIndex();
+      Preconditions.assertTrue(
+          inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex && 
inProgressInstallSnapshotIndexValue > 0,
+          "inProgressInstallSnapshotRequest: %s is not eligible, 
firstAvailableLogIndex: %s",
+          getInProgressInstallSnapshotIndex(), firstAvailableLogIndex);
+
       // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
       if (isSnapshotNull.compareAndSet(true, false)) {
         LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
-        inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
0);
+        inProgressInstallSnapshotIndex.set(0);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
       }
 
       // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the 
installed snapshot index and reset
-      // installedSnapshotIndex to 0.
-      long latestInstalledSnapshotIndex = 
this.installedSnapshotIndex.getAndSet(0);
-      if (latestInstalledSnapshotIndex > 0) {
+      // installedSnapshotIndex to (0,0).
+      final TermIndex latestInstalledSnapshotTermIndex = 
this.installedSnapshotTermIndex
+          .getAndSet(TermIndex.valueOf(0,0));
+      if (latestInstalledSnapshotTermIndex.getIndex() > 0) {
+        server.getStateMachine().pause();
+        state.updateInstalledSnapshotIndex(latestInstalledSnapshotTermIndex);
+        state.reloadStateMachine(latestInstalledSnapshotTermIndex.getIndex());
         LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", 
getMemberId(),
-            InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
-        inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
0);
+            InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex);
+        inProgressInstallSnapshotIndex.set(0);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
-            currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
+            currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex.getIndex());
       }
 
       // Otherwise, Snapshot installation is in progress.

Reply via email to