This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit e5e0a84bd62253fd6d760c33cd6730fc276c224e
Author: Nibiru <[email protected]>
AuthorDate: Thu Jun 9 13:51:48 2022 +0800

    RATIS-1582. Add notify install snapshot finished event to inform the (#647)
    
    (cherry picked from commit 4879f1b677b5bcaf802f6349dc463c0334b197e1)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  18 +++
 .../apache/ratis/statemachine/StateMachine.java    |  21 ++--
 .../apache/ratis/server/impl/RaftServerImpl.java   |   5 +
 .../server/impl/SnapshotInstallationHandler.java   |  11 ++
 .../ratis/InstallSnapshotNotificationTests.java    | 129 ++++++++++++++++++++-
 5 files changed, 174 insertions(+), 10 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index bb996c25..4c7fd6e2 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -382,6 +383,21 @@ public class GrpcLogAppender extends LogAppenderBase {
       }
     }
 
+    //compare follower's latest installed snapshot index with leader's start 
index
+    void onFollowerCatchup(long followerSnapshotIndex) {
+      final long leaderStartIndex = getRaftLog().getStartIndex();
+      final long followerNextIndex = followerSnapshotIndex + 1;
+      if (followerNextIndex >= leaderStartIndex) {
+        LOG.info("{}: Follower can catch up leader after install the snapshot, 
as leader's start index is {}",
+            this, followerNextIndex);
+        notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, 
followerSnapshotIndex);
+      }
+    }
+
+    void notifyInstallSnapshotFinished(InstallSnapshotResult result, long 
snapshotIndex) {
+      getServer().getStateMachine().event().notifySnapshotInstalled(result, 
snapshotIndex);
+    }
+
     boolean isDone() {
       return done.get();
     }
@@ -445,11 +461,13 @@ public class GrpcLogAppender extends LogAppenderBase {
           getFollower().setAttemptedToInstallSnapshot();
           getLeaderState().onFollowerCommitIndex(getFollower(), 
followerSnapshotIndex);
           increaseNextIndex(followerSnapshotIndex);
+          onFollowerCatchup(followerSnapshotIndex);
           removePending(reply);
           break;
         case SNAPSHOT_UNAVAILABLE:
           LOG.info("{}: Follower could not install snapshot as it is not 
available.", this);
           getFollower().setAttemptedToInstallSnapshot();
+          
notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, 
RaftLog.INVALID_LOG_INDEX);
           removePending(reply);
           break;
         case UNRECOGNIZED:
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 3a0ca400..6373fa9b 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -18,10 +18,7 @@
 package org.apache.ratis.statemachine;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
-import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -157,9 +154,9 @@ public interface StateMachine extends Closeable {
 
     /**
      * Notify the {@link StateMachine} a term-index update event.
-     * This method will be invoked when a {@link RaftProtos.MetadataProto}
-     * or {@link RaftProtos.RaftConfigurationProto} is processed.
-     * For {@link RaftProtos.StateMachineLogEntryProto}, this method will not 
be invoked.
+     * This method will be invoked when a {@link MetadataProto}
+     * or {@link RaftConfigurationProto} is processed.
+     * For {@link StateMachineLogEntryProto}, this method will not be invoked.
      *
      * @param term The term of the log entry
      * @param index The index of the log entry
@@ -168,7 +165,7 @@ public interface StateMachine extends Closeable {
 
     /**
      * Notify the {@link StateMachine} a configuration change.
-     * This method will be invoked when a {@link 
RaftProtos.RaftConfigurationProto} is processed.
+     * This method will be invoked when a {@link RaftConfigurationProto} is 
processed.
      *
      * @param term term of the current log entry
      * @param index index which is being updated
@@ -189,6 +186,12 @@ public interface StateMachine extends Closeable {
      * @param failedEntry The failed log entry, if there is any.
      */
     default void notifyLogFailed(Throwable cause, LogEntryProto failedEntry) {}
+
+    /**
+     * Notify the {@link StateMachine} that the progress of install snapshot is
+     * completely done. Could trigger the cleanup of snapshots.
+     */
+    default void notifySnapshotInstalled(InstallSnapshotResult result, long 
snapshotIndex) {}
   }
 
   /**
@@ -517,7 +520,7 @@ public interface StateMachine extends Closeable {
    * @param proto state machine proto
    * @return the string representation of the proto.
    */
-  default String 
toStateMachineLogEntryString(RaftProtos.StateMachineLogEntryProto proto) {
+  default String toStateMachineLogEntryString(StateMachineLogEntryProto proto) 
{
     return JavaUtils.getClassSimpleName(proto.getClass()) +  ":" + 
ClientInvocationId.valueOf(proto);
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c26bed77..fe54dd08 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1358,6 +1358,11 @@ class RaftServerImpl implements RaftServer.Division,
 
     if (!isHeartbeat) {
       CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
+      final long installedIndex = 
snapshotInstallationHandler.getInstalledIndex();
+      if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) {
+        LOG.info("{}: Follower has completed install the snapshot {}.", this, 
installedIndex);
+        
stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, 
installedIndex);
+      }
     }
     return JavaUtils.allOf(futures).whenCompleteAsync(
         (r, t) -> followerState.ifPresent(fs -> 
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 8e06cf78..afe6e8fa 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -64,6 +64,7 @@ class SnapshotInstallationHandler {
   private final AtomicReference<TermIndex> installedSnapshotTermIndex =
     new AtomicReference<>(INVALID_TERM_INDEX);
   private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
+  private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
 
   SnapshotInstallationHandler(RaftServerImpl server, RaftProperties 
properties) {
     this.server = server;
@@ -75,6 +76,10 @@ class SnapshotInstallationHandler {
     return state.getMemberId();
   }
 
+  long getInstalledIndex() {
+    return installedIndex.getAndSet(INVALID_LOG_INDEX);
+  }
+
   long getInProgressInstallSnapshotIndex() {
     return inProgressInstallSnapshotIndex.get();
   }
@@ -299,6 +304,8 @@ class SnapshotInstallationHandler {
         LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
+        server.getStateMachine().event().notifySnapshotInstalled(
+            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
       }
@@ -314,6 +321,10 @@ class SnapshotInstallationHandler {
         LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", 
getMemberId(),
             InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex);
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
+        final long latestInstalledIndex = 
latestInstalledSnapshotTermIndex.getIndex();
+        server.getStateMachine().event().notifySnapshotInstalled(
+            InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex);
+        installedIndex.set(latestInstalledIndex);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex.getIndex());
       }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index e15ffb99..592a7ed1 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -81,6 +81,7 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
   private static final AtomicReference<SnapshotInfo> leaderSnapshotInfoRef = 
new AtomicReference<>();
 
   private static final AtomicInteger numSnapshotRequests = new AtomicInteger();
+  private static final AtomicInteger numNotifyInstallSnapshotFinished = new 
AtomicInteger();
 
   private static class StateMachine4InstallSnapshotNotificationTests extends 
SimpleStateMachine4Testing {
     @Override
@@ -120,6 +121,39 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
 
       return CompletableFuture.supplyAsync(supplier);
     }
+
+    @Override
+    public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult 
result, long installIndex) {
+      if (result != RaftProtos.InstallSnapshotResult.SUCCESS &&
+          result != RaftProtos.InstallSnapshotResult.SNAPSHOT_UNAVAILABLE) {
+        return;
+      }
+      numNotifyInstallSnapshotFinished.incrementAndGet();
+      final SingleFileSnapshotInfo leaderSnapshotInfo = 
(SingleFileSnapshotInfo) leaderSnapshotInfoRef.get();
+      File leaderSnapshotFile = 
leaderSnapshotInfo.getFile().getPath().toFile();
+      synchronized (this) {
+        try {
+          if 
(getServer().get().getDivision(this.getGroupId()).getInfo().isLeader()) {
+            LOG.info("Receive the notification to clean up snapshot as leader 
for {}", result);
+            if (leaderSnapshotFile.exists()) {
+              // For test purpose, we do not delete the leader's snapshot 
actually, which could be
+              // sent to more than one peer during the test
+              LOG.info("leader snapshot {} existed", leaderSnapshotFile);
+            }
+          } else {
+            LOG.info("Receive the notification to clean up snapshot as 
follower for {}", result);
+            File followerSnapshotFile = new File(getSMdir(), 
leaderSnapshotFile.getName());
+            if (followerSnapshotFile.exists()) {
+              FileUtils.deleteFile(followerSnapshotFile);
+              LOG.info("follower snapshot {} deleted", followerSnapshotFile);
+            }
+          }
+        } catch (Exception ex) {
+          LOG.error("Failed to notify installSnapshot Finished", ex);
+        }
+      }
+    }
+
   }
 
   /**
@@ -182,6 +216,7 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       }
 
       final SnapshotInfo leaderSnapshotInfo = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
+      LOG.info("LeaderSnapshotInfo: {}", leaderSnapshotInfo.getTermIndex());
       final boolean set = leaderSnapshotInfoRef.compareAndSet(null, 
leaderSnapshotInfo);
       Assert.assertTrue(set);
 
@@ -361,7 +396,99 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       cluster.shutdown();
     }
   }
-  
+
+  @Test
+  public void testInstallSnapshotFinishedEvent() throws Exception{
+    runWithNewCluster(1, this::testInstallSnapshotFinishedEvent);
+  }
+
+  private void testInstallSnapshotFinishedEvent(CLUSTER cluster) throws 
Exception{
+    leaderSnapshotInfoRef.set(null);
+    numNotifyInstallSnapshotFinished.set(0);
+    final List<LogSegmentPath> logs;
+    int i = 0;
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+          RaftClientReply
+              reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + 
i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+      }
+
+      // wait for the snapshot to be done
+      final RaftServer.Division leader = cluster.getLeader();
+      final long nextIndex = leader.getRaftLog().getNextIndex();
+      LOG.info("nextIndex = {}", nextIndex);
+      final List<File> snapshotFiles = 
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+          nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+      JavaUtils.attemptRepeatedly(() -> {
+        
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+        return null;
+      }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+      logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
+    } finally {
+      cluster.shutdown();
+    }
+
+    // delete the log segments from the leader
+    LOG.info("Delete logs {}", logs);
+    for (LogSegmentPath path : logs) {
+      FileUtils.deleteFully(path.getPath()); // the log may be already puged
+    }
+
+    // restart the peer
+    LOG.info("Restarting the cluster");
+    cluster.restart(false);
+    try {
+      RaftSnapshotBaseTest.assertLeaderContent(cluster);
+
+      // generate some more traffic
+      try(final RaftClient client = 
cluster.createClient(cluster.getLeader().getId())) {
+        Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" 
+ i)).isSuccess());
+      }
+
+      final SnapshotInfo leaderSnapshotInfo = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
+      LOG.info("LeaderSnapshotInfo: {}", leaderSnapshotInfo.getTermIndex());
+      final boolean set = leaderSnapshotInfoRef.compareAndSet(null, 
leaderSnapshotInfo);
+      Assert.assertTrue(set);
+
+      // add one new peer
+      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true, 
true);
+      // trigger setConfiguration
+      cluster.setConfiguration(change.allPeersInNewConf);
+
+      RaftServerTestUtil
+          .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+
+      // Check the installed snapshot index on each Follower matches with the
+      // leader snapshot.
+      for (RaftServer.Division follower : cluster.getFollowers()) {
+        Assert.assertEquals(leaderSnapshotInfo.getIndex(),
+            RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
+      }
+
+      // notification should be sent to both the leader and the follower
+      File leaderSnapshotFile = 
leaderSnapshotInfo.getFiles().get(0).getPath().toFile();
+      SimpleStateMachine4Testing followerStateMachine =
+          (SimpleStateMachine4Testing) 
cluster.getFollowers().get(0).getStateMachine();
+      File followerSnapshotFile = new 
File(followerStateMachine.getStateMachineStorage().getSmDir(),
+          leaderSnapshotFile.getName());
+      Assert.assertEquals(numNotifyInstallSnapshotFinished.get(), 2);
+      Assert.assertTrue(leaderSnapshotFile.exists());
+      Assert.assertFalse(followerSnapshotFile.exists());
+
+      // restart the peer and check if it can correctly handle conf change
+      cluster.restartServer(cluster.getLeader().getId(), false);
+      RaftSnapshotBaseTest.assertLeaderContent(cluster);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Test for install snapshot during a peer bootstrap: start a one node 
cluster
    * (disable install snapshot option) and let it generate a snapshot. Add

Reply via email to