This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 6497f8e0b9f88f6205f2c7a688b54dc39af1a6c9
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Mar 30 16:10:09 2023 +0800

    RATIS-1827. Update installed snapshot index only when InstallSnapshot is 
done (#868)
---
 .../main/java/org/apache/ratis/server/impl/ServerState.java | 12 ++++--------
 .../ratis/server/impl/SnapshotInstallationHandler.java      |  8 ++++----
 .../ratis/server/raftlog/segmented/SegmentedRaftLog.java    |  1 +
 .../org/apache/ratis/InstallSnapshotFromLeaderTests.java    | 13 ++++++++-----
 .../server/raftlog/segmented/TestSegmentedRaftLog.java      |  2 +-
 5 files changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index fa685325e..ee0760f1e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -454,9 +454,11 @@ class ServerState implements Closeable {
     getStateMachineUpdater().notifyUpdater();
   }
 
-  void reloadStateMachine(long lastIndexInSnapshot) {
-    getLog().updateSnapshotIndex(lastIndexInSnapshot);
+  void reloadStateMachine(TermIndex snapshotTermIndex) {
     getStateMachineUpdater().reloadStateMachine();
+
+    getLog().onSnapshotInstalled(snapshotTermIndex.getIndex());
+    latestInstalledSnapshot.set(snapshotTermIndex);
   }
 
   @Override
@@ -482,12 +484,6 @@ class ServerState implements Closeable {
     StateMachine sm = server.getStateMachine();
     sm.pause(); // pause the SM to prepare for install snapshot
     snapshotManager.installSnapshot(request, sm);
-    
updateInstalledSnapshotIndex(TermIndex.valueOf(request.getSnapshotChunk().getTermIndex()));
-  }
-
-  void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
-    getLog().onSnapshotInstalled(lastTermIndexInSnapshot.getIndex());
-    latestInstalledSnapshot.set(lastTermIndexInSnapshot);
   }
 
   private SnapshotInfo getLatestSnapshot() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index e7b574cf3..abb398367 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -155,7 +155,8 @@ class SnapshotInstallationHandler {
     final long currentTerm;
     final long leaderTerm = request.getLeaderTerm();
     final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest 
= request.getSnapshotChunk();
-    final long lastIncludedIndex = 
snapshotChunkRequest.getTermIndex().getIndex();
+    final TermIndex lastIncluded = 
TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
+    final long lastIncludedIndex = lastIncluded.getIndex();
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
@@ -183,7 +184,7 @@ class SnapshotInstallationHandler {
         // update the committed index
         // re-load the state machine if this is the last chunk
         if (snapshotChunkRequest.getDone()) {
-          state.reloadStateMachine(lastIncludedIndex);
+          state.reloadStateMachine(lastIncluded);
         }
       } finally {
         
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
@@ -316,8 +317,7 @@ class SnapshotInstallationHandler {
           .getAndSet(INVALID_TERM_INDEX);
       if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) {
         server.getStateMachine().pause();
-        state.updateInstalledSnapshotIndex(latestInstalledSnapshotTermIndex);
-        state.reloadStateMachine(latestInstalledSnapshotTermIndex.getIndex());
+        state.reloadStateMachine(latestInstalledSnapshotTermIndex);
         LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", 
getMemberId(),
             InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex);
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 5913f4adf..41fb8608f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -477,6 +477,7 @@ public class SegmentedRaftLog extends RaftLogBase {
 
   @Override
   public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
+    updateSnapshotIndex(lastSnapshotIndex);
     fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
     // TODO purge normal/tmp/corrupt snapshot files
     // if the last index in snapshot is larger than the index of the last
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index 4101c1256..f8486d109 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -36,6 +36,7 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.SizeInBytes;
 import org.junit.Assert;
@@ -116,11 +117,13 @@ public abstract class 
InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu
 
       // Check the installed snapshot file number on each Follower matches 
with the
       // leader snapshot.
-      for (RaftServer.Division follower : cluster.getFollowers()) {
-        final SnapshotInfo info = 
follower.getStateMachine().getLatestSnapshot();
-        Assert.assertNotNull(info);
-        Assert.assertEquals(3, info.getFiles().size());
-      }
+      JavaUtils.attempt(() -> {
+        for (RaftServer.Division follower : cluster.getFollowers()) {
+          final SnapshotInfo info = 
follower.getStateMachine().getLatestSnapshot();
+          Assert.assertNotNull(info);
+          Assert.assertEquals(3, info.getFiles().size());
+        }
+      }, 10, ONE_SECOND, "check snapshot", LOG);
     } finally {
       cluster.shutdown();
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 8de01a271..d91f9840f 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -320,7 +320,7 @@ public class TestSegmentedRaftLog extends BaseTest {
         ex = e;
       }
       assertTrue(ex.getMessage().contains("Difference between entry index and 
RaftLog's latest snapshot " +
-          "index -1 is greater than 1"));
+          "index 999 is greater than 1"));
     }
   }
 

Reply via email to