This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit e5e0a84bd62253fd6d760c33cd6730fc276c224e Author: Nibiru <[email protected]> AuthorDate: Thu Jun 9 13:51:48 2022 +0800 RATIS-1582. Add notify install snapshot finished event to inform the (#647) (cherry picked from commit 4879f1b677b5bcaf802f6349dc463c0334b197e1) --- .../apache/ratis/grpc/server/GrpcLogAppender.java | 18 +++ .../apache/ratis/statemachine/StateMachine.java | 21 ++-- .../apache/ratis/server/impl/RaftServerImpl.java | 5 + .../server/impl/SnapshotInstallationHandler.java | 11 ++ .../ratis/InstallSnapshotNotificationTests.java | 129 ++++++++++++++++++++- 5 files changed, 174 insertions(+), 10 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index bb996c25..4c7fd6e2 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.grpc.metrics.GrpcServerMetrics; +import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; @@ -382,6 +383,21 @@ public class GrpcLogAppender extends LogAppenderBase { } } + //compare follower's latest installed snapshot index with leader's start index + void onFollowerCatchup(long followerSnapshotIndex) { + final long leaderStartIndex = getRaftLog().getStartIndex(); + final long followerNextIndex = followerSnapshotIndex + 1; + if (followerNextIndex >= leaderStartIndex) { + LOG.info("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}", + this, followerNextIndex); + notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex); + } + } + + void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIndex) { + getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex); + } + boolean isDone() { return done.get(); } @@ -445,11 +461,13 @@ public class GrpcLogAppender extends LogAppenderBase { getFollower().setAttemptedToInstallSnapshot(); getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex); increaseNextIndex(followerSnapshotIndex); + onFollowerCatchup(followerSnapshotIndex); removePending(reply); break; case SNAPSHOT_UNAVAILABLE: LOG.info("{}: Follower could not install snapshot as it is not available.", this); getFollower().setAttemptedToInstallSnapshot(); + notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX); removePending(reply); break; case UNRECOGNIZED: diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 3a0ca400..6373fa9b 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -18,10 +18,7 @@ package org.apache.ratis.statemachine; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.apache.ratis.proto.RaftProtos; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto; -import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; @@ -157,9 +154,9 @@ public interface StateMachine extends Closeable { /** * Notify the {@link StateMachine} a term-index update event. - * This method will be invoked when a {@link RaftProtos.MetadataProto} - * or {@link RaftProtos.RaftConfigurationProto} is processed. - * For {@link RaftProtos.StateMachineLogEntryProto}, this method will not be invoked. + * This method will be invoked when a {@link MetadataProto} + * or {@link RaftConfigurationProto} is processed. + * For {@link StateMachineLogEntryProto}, this method will not be invoked. * * @param term The term of the log entry * @param index The index of the log entry @@ -168,7 +165,7 @@ public interface StateMachine extends Closeable { /** * Notify the {@link StateMachine} a configuration change. - * This method will be invoked when a {@link RaftProtos.RaftConfigurationProto} is processed. + * This method will be invoked when a {@link RaftConfigurationProto} is processed. * * @param term term of the current log entry * @param index index which is being updated @@ -189,6 +186,12 @@ public interface StateMachine extends Closeable { * @param failedEntry The failed log entry, if there is any. */ default void notifyLogFailed(Throwable cause, LogEntryProto failedEntry) {} + + /** + * Notify the {@link StateMachine} that the progress of install snapshot is + * completely done. Could trigger the cleanup of snapshots. + */ + default void notifySnapshotInstalled(InstallSnapshotResult result, long snapshotIndex) {} } /** @@ -517,7 +520,7 @@ public interface StateMachine extends Closeable { * @param proto state machine proto * @return the string representation of the proto. */ - default String toStateMachineLogEntryString(RaftProtos.StateMachineLogEntryProto proto) { + default String toStateMachineLogEntryString(StateMachineLogEntryProto proto) { return JavaUtils.getClassSimpleName(proto.getClass()) + ":" + ClientInvocationId.valueOf(proto); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c26bed77..fe54dd08 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1358,6 +1358,11 @@ class RaftServerImpl implements RaftServer.Division, if (!isHeartbeat) { CodeInjectionForTesting.execute(LOG_SYNC, getId(), null); + final long installedIndex = snapshotInstallationHandler.getInstalledIndex(); + if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) { + LOG.info("{}: Follower has completed install the snapshot {}.", this, installedIndex); + stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex); + } } return JavaUtils.allOf(futures).whenCompleteAsync( (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 8e06cf78..afe6e8fa 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -64,6 +64,7 @@ class SnapshotInstallationHandler { private final AtomicReference<TermIndex> installedSnapshotTermIndex = new AtomicReference<>(INVALID_TERM_INDEX); private final AtomicBoolean isSnapshotNull = new AtomicBoolean(); + private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX); SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) { this.server = server; @@ -75,6 +76,10 @@ class SnapshotInstallationHandler { return state.getMemberId(); } + long getInstalledIndex() { + return installedIndex.getAndSet(INVALID_LOG_INDEX); + } + long getInProgressInstallSnapshotIndex() { return inProgressInstallSnapshotIndex.get(); } @@ -299,6 +304,8 @@ class SnapshotInstallationHandler { LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); + server.getStateMachine().event().notifySnapshotInstalled( + InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX); return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1); } @@ -314,6 +321,10 @@ class SnapshotInstallationHandler { LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(), InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex); inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); + final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex(); + server.getStateMachine().event().notifySnapshotInstalled( + InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex); + installedIndex.set(latestInstalledIndex); return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); } diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index e15ffb99..592a7ed1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -81,6 +81,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC private static final AtomicReference<SnapshotInfo> leaderSnapshotInfoRef = new AtomicReference<>(); private static final AtomicInteger numSnapshotRequests = new AtomicInteger(); + private static final AtomicInteger numNotifyInstallSnapshotFinished = new AtomicInteger(); private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing { @Override @@ -120,6 +121,39 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC return CompletableFuture.supplyAsync(supplier); } + + @Override + public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result, long installIndex) { + if (result != RaftProtos.InstallSnapshotResult.SUCCESS && + result != RaftProtos.InstallSnapshotResult.SNAPSHOT_UNAVAILABLE) { + return; + } + numNotifyInstallSnapshotFinished.incrementAndGet(); + final SingleFileSnapshotInfo leaderSnapshotInfo = (SingleFileSnapshotInfo) leaderSnapshotInfoRef.get(); + File leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath().toFile(); + synchronized (this) { + try { + if (getServer().get().getDivision(this.getGroupId()).getInfo().isLeader()) { + LOG.info("Receive the notification to clean up snapshot as leader for {}", result); + if (leaderSnapshotFile.exists()) { + // For test purpose, we do not delete the leader's snapshot actually, which could be + // sent to more than one peer during the test + LOG.info("leader snapshot {} existed", leaderSnapshotFile); + } + } else { + LOG.info("Receive the notification to clean up snapshot as follower for {}", result); + File followerSnapshotFile = new File(getSMdir(), leaderSnapshotFile.getName()); + if (followerSnapshotFile.exists()) { + FileUtils.deleteFile(followerSnapshotFile); + LOG.info("follower snapshot {} deleted", followerSnapshotFile); + } + } + } catch (Exception ex) { + LOG.error("Failed to notify installSnapshot Finished", ex); + } + } + } + } /** @@ -182,6 +216,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC } final SnapshotInfo leaderSnapshotInfo = cluster.getLeader().getStateMachine().getLatestSnapshot(); + LOG.info("LeaderSnapshotInfo: {}", leaderSnapshotInfo.getTermIndex()); final boolean set = leaderSnapshotInfoRef.compareAndSet(null, leaderSnapshotInfo); Assert.assertTrue(set); @@ -361,7 +396,99 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC cluster.shutdown(); } } - + + @Test + public void testInstallSnapshotFinishedEvent() throws Exception{ + runWithNewCluster(1, this::testInstallSnapshotFinishedEvent); + } + + private void testInstallSnapshotFinishedEvent(CLUSTER cluster) throws Exception{ + leaderSnapshotInfoRef.set(null); + numNotifyInstallSnapshotFinished.set(0); + final List<LogSegmentPath> logs; + int i = 0; + try { + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + try(final RaftClient client = cluster.createClient(leaderId)) { + for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { + RaftClientReply + reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + } + + // wait for the snapshot to be done + final RaftServer.Division leader = cluster.getLeader(); + final long nextIndex = leader.getRaftLog().getNextIndex(); + LOG.info("nextIndex = {}", nextIndex); + final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster, + nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); + JavaUtils.attemptRepeatedly(() -> { + Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists)); + return null; + }, 10, ONE_SECOND, "snapshotFile.exist", LOG); + logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage()); + } finally { + cluster.shutdown(); + } + + // delete the log segments from the leader + LOG.info("Delete logs {}", logs); + for (LogSegmentPath path : logs) { + FileUtils.deleteFully(path.getPath()); // the log may be already puged + } + + // restart the peer + LOG.info("Restarting the cluster"); + cluster.restart(false); + try { + RaftSnapshotBaseTest.assertLeaderContent(cluster); + + // generate some more traffic + try(final RaftClient client = cluster.createClient(cluster.getLeader().getId())) { + Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" + i)).isSuccess()); + } + + final SnapshotInfo leaderSnapshotInfo = cluster.getLeader().getStateMachine().getLatestSnapshot(); + LOG.info("LeaderSnapshotInfo: {}", leaderSnapshotInfo.getTermIndex()); + final boolean set = leaderSnapshotInfoRef.compareAndSet(null, leaderSnapshotInfo); + Assert.assertTrue(set); + + // add one new peer + final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true, true); + // trigger setConfiguration + cluster.setConfiguration(change.allPeersInNewConf); + + RaftServerTestUtil + .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); + + // Check the installed snapshot index on each Follower matches with the + // leader snapshot. + for (RaftServer.Division follower : cluster.getFollowers()) { + Assert.assertEquals(leaderSnapshotInfo.getIndex(), + RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower)); + } + + // notification should be sent to both the leader and the follower + File leaderSnapshotFile = leaderSnapshotInfo.getFiles().get(0).getPath().toFile(); + SimpleStateMachine4Testing followerStateMachine = + (SimpleStateMachine4Testing) cluster.getFollowers().get(0).getStateMachine(); + File followerSnapshotFile = new File(followerStateMachine.getStateMachineStorage().getSmDir(), + leaderSnapshotFile.getName()); + Assert.assertEquals(numNotifyInstallSnapshotFinished.get(), 2); + Assert.assertTrue(leaderSnapshotFile.exists()); + Assert.assertFalse(followerSnapshotFile.exists()); + + // restart the peer and check if it can correctly handle conf change + cluster.restartServer(cluster.getLeader().getId(), false); + RaftSnapshotBaseTest.assertLeaderContent(cluster); + } finally { + cluster.shutdown(); + } + } + /** * Test for install snapshot during a peer bootstrap: start a one node cluster * (disable install snapshot option) and let it generate a snapshot. Add
