This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 6497f8e0b9f88f6205f2c7a688b54dc39af1a6c9 Author: Kaijie Chen <[email protected]> AuthorDate: Thu Mar 30 16:10:09 2023 +0800 RATIS-1827. Update installed snapshot index only when InstallSnapshot is done (#868) --- .../main/java/org/apache/ratis/server/impl/ServerState.java | 12 ++++-------- .../ratis/server/impl/SnapshotInstallationHandler.java | 8 ++++---- .../ratis/server/raftlog/segmented/SegmentedRaftLog.java | 1 + .../org/apache/ratis/InstallSnapshotFromLeaderTests.java | 13 ++++++++----- .../server/raftlog/segmented/TestSegmentedRaftLog.java | 2 +- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index fa685325e..ee0760f1e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -454,9 +454,11 @@ class ServerState implements Closeable { getStateMachineUpdater().notifyUpdater(); } - void reloadStateMachine(long lastIndexInSnapshot) { - getLog().updateSnapshotIndex(lastIndexInSnapshot); + void reloadStateMachine(TermIndex snapshotTermIndex) { getStateMachineUpdater().reloadStateMachine(); + + getLog().onSnapshotInstalled(snapshotTermIndex.getIndex()); + latestInstalledSnapshot.set(snapshotTermIndex); } @Override @@ -482,12 +484,6 @@ class ServerState implements Closeable { StateMachine sm = server.getStateMachine(); sm.pause(); // pause the SM to prepare for install snapshot snapshotManager.installSnapshot(request, sm); - updateInstalledSnapshotIndex(TermIndex.valueOf(request.getSnapshotChunk().getTermIndex())); - } - - void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) { - getLog().onSnapshotInstalled(lastTermIndexInSnapshot.getIndex()); - latestInstalledSnapshot.set(lastTermIndexInSnapshot); } private SnapshotInfo getLatestSnapshot() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index e7b574cf3..abb398367 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -155,7 +155,8 @@ class SnapshotInstallationHandler { final long currentTerm; final long leaderTerm = request.getLeaderTerm(); final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); - final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); + final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); + final long lastIncludedIndex = lastIncluded.getIndex(); synchronized (this) { final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); @@ -183,7 +184,7 @@ class SnapshotInstallationHandler { // update the committed index // re-load the state machine if this is the last chunk if (snapshotChunkRequest.getDone()) { - state.reloadStateMachine(lastIncludedIndex); + state.reloadStateMachine(lastIncluded); } } finally { server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); @@ -316,8 +317,7 @@ class SnapshotInstallationHandler { .getAndSet(INVALID_TERM_INDEX); if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) { server.getStateMachine().pause(); - state.updateInstalledSnapshotIndex(latestInstalledSnapshotTermIndex); - state.reloadStateMachine(latestInstalledSnapshotTermIndex.getIndex()); + state.reloadStateMachine(latestInstalledSnapshotTermIndex); LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(), InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex); inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 5913f4adf..41fb8608f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -477,6 +477,7 @@ public class SegmentedRaftLog extends RaftLogBase { @Override public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) { + updateSnapshotIndex(lastSnapshotIndex); fileLogWorker.syncWithSnapshot(lastSnapshotIndex); // TODO purge normal/tmp/corrupt snapshot files // if the last index in snapshot is larger than the index of the last diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java index 4101c1256..f8486d109 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java @@ -36,6 +36,7 @@ import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.FileListSnapshotInfo; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.SizeInBytes; import org.junit.Assert; @@ -116,11 +117,13 @@ public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu // Check the installed snapshot file number on each Follower matches with the // leader snapshot. - for (RaftServer.Division follower : cluster.getFollowers()) { - final SnapshotInfo info = follower.getStateMachine().getLatestSnapshot(); - Assert.assertNotNull(info); - Assert.assertEquals(3, info.getFiles().size()); - } + JavaUtils.attempt(() -> { + for (RaftServer.Division follower : cluster.getFollowers()) { + final SnapshotInfo info = follower.getStateMachine().getLatestSnapshot(); + Assert.assertNotNull(info); + Assert.assertEquals(3, info.getFiles().size()); + } + }, 10, ONE_SECOND, "check snapshot", LOG); } finally { cluster.shutdown(); } diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java index 8de01a271..d91f9840f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java @@ -320,7 +320,7 @@ public class TestSegmentedRaftLog extends BaseTest { ex = e; } assertTrue(ex.getMessage().contains("Difference between entry index and RaftLog's latest snapshot " + - "index -1 is greater than 1")); + "index 999 is greater than 1")); } }
