This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new cc5f2e5  RATIS-1390. Bootstrapping Peer should always try to install a 
snapshot the first time. (#489)
cc5f2e5 is described below

commit cc5f2e58bcfdf2232dad5b9f89745489aadd4f80
Author: Hanisha Koneru <[email protected]>
AuthorDate: Sun Aug 22 08:03:26 2021 -0700

    RATIS-1390. Bootstrapping Peer should always try to install a snapshot the 
first time. (#489)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 36 +++++++++--
 ratis-proto/src/main/proto/Raft.proto              |  2 +
 .../apache/ratis/server/leader/FollowerInfo.java   |  8 +++
 .../apache/ratis/server/leader/LeaderState.java    |  4 ++
 .../apache/ratis/server/leader/LogAppender.java    | 16 ++++-
 .../apache/ratis/server/impl/FollowerInfoImpl.java | 12 ++++
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  8 ++-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 59 ++++++++++++++----
 .../ratis/server/leader/LogAppenderBase.java       |  2 +-
 .../ratis/server/leader/LogAppenderDefault.java    | 19 ++++--
 .../ratis/InstallSnapshotNotificationTests.java    | 72 ++++++++++++++++++++++
 .../ratis/statemachine/RaftSnapshotBaseTest.java   | 52 ++++++++++++++++
 12 files changed, 267 insertions(+), 23 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index f947f1a..f9968e7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -407,10 +407,11 @@ public class GrpcLogAppender extends LogAppenderBase {
       if (!firstResponseReceived) {
         firstResponseReceived = true;
       }
-
+      final long followerSnapshotIndex;
       switch (reply.getResult()) {
         case SUCCESS:
           LOG.info("{}: Completed InstallSnapshot. Reply: {}", this, reply);
+          getFollower().setAttemptedToInstallSnapshot();
           removePending(reply);
           break;
         case IN_PROGRESS:
@@ -418,9 +419,10 @@ public class GrpcLogAppender extends LogAppenderBase {
           removePending(reply);
           break;
         case ALREADY_INSTALLED:
-          final long followerSnapshotIndex = reply.getSnapshotIndex();
-          LOG.info("{}: Already Installed Snapshot Index {}.", this, 
followerSnapshotIndex);
+          followerSnapshotIndex = reply.getSnapshotIndex();
+          LOG.info("{}: Follower snapshot is already at index {}.", this, 
followerSnapshotIndex);
           getFollower().setSnapshotIndex(followerSnapshotIndex);
+          getFollower().setAttemptedToInstallSnapshot();
           getLeaderState().onFollowerCommitIndex(getFollower(), 
followerSnapshotIndex);
           increaseNextIndex(followerSnapshotIndex);
           removePending(reply);
@@ -433,6 +435,20 @@ public class GrpcLogAppender extends LogAppenderBase {
               this, 
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
               getServer().getId(), installSnapshotEnabled, getFollowerId(), 
!installSnapshotEnabled);
           break;
+        case SNAPSHOT_INSTALLED:
+          followerSnapshotIndex = reply.getSnapshotIndex();
+          LOG.info("{}: Follower installed snapshot at index {}", this, 
followerSnapshotIndex);
+          getFollower().setSnapshotIndex(followerSnapshotIndex);
+          getFollower().setAttemptedToInstallSnapshot();
+          getLeaderState().onFollowerCommitIndex(getFollower(), 
followerSnapshotIndex);
+          increaseNextIndex(followerSnapshotIndex);
+          removePending(reply);
+          break;
+        case SNAPSHOT_UNAVAILABLE:
+          LOG.info("{}: Follower could not install snapshot as it is not 
available.", this);
+          getFollower().setAttemptedToInstallSnapshot();
+          removePending(reply);
+          break;
         case UNRECOGNIZED:
           LOG.error("Unrecongnized the reply result {}: Leader is {}, follower 
is {}",
               reply.getResult(), getServer().getId(), getFollowerId());
@@ -562,9 +578,21 @@ public class GrpcLogAppender extends LogAppenderBase {
    * @return the first available log's start term index
    */
   private TermIndex shouldNotifyToInstallSnapshot() {
-    final long followerNextIndex = getFollower().getNextIndex();
+    final FollowerInfo follower = getFollower();
     final long leaderNextIndex = getRaftLog().getNextIndex();
+    final boolean isFollowerBootstrapping = 
getLeaderState().isFollowerBootstrapping(follower);
+
+    if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
+      // If the follower is bootstrapping and has not yet installed any 
snapshot from leader, then the follower should
+      // be notified to install a snapshot. Every follower should try to 
install at least one snapshot during
+      // bootstrapping, if available.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: Notify follower to install snapshot as it is 
bootstrapping.", this);
+      }
+      return getRaftLog().getLastEntryTermIndex();
+    }
 
+    final long followerNextIndex = follower.getNextIndex();
     if (followerNextIndex >= leaderNextIndex) {
       return null;
     }
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 837b2f5..c98f0d7 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -148,6 +148,8 @@ enum InstallSnapshotResult {
   IN_PROGRESS = 2;
   ALREADY_INSTALLED = 3;
   CONF_MISMATCH = 4;
+  SNAPSHOT_INSTALLED = 5;
+  SNAPSHOT_UNAVAILABLE = 6;
 }
 
 message RequestVoteRequestProto {
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index 487576f..2de24b0 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -52,6 +52,14 @@ public interface FollowerInfo {
   /** Set follower's snapshotIndex. */
   void setSnapshotIndex(long newSnapshotIndex);
 
+  /** Acknowledge that Follower attempted to install a snapshot. It does not 
guarantee that the installation was
+   * successful. This helps to determine whether Follower can come out of 
bootstrap process. */
+  void setAttemptedToInstallSnapshot();
+
+  /** Return true if install snapshot has been attempted by the Follower at 
least once. Used to verify if
+   * Follower tried to install snapshot during bootstrap process. */
+  boolean hasAttemptedToInstallSnapshot();
+
   /** @return the nextIndex for this follower. */
   long getNextIndex();
 
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index 5460392..240c9a1 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -58,4 +58,8 @@ public interface LeaderState {
 
   /** Handle the event that the follower has replied a success append entries. 
*/
   void onFollowerSuccessAppendEntries(FollowerInfo follower);
+
+  /** Check if a follower is bootstrapping. */
+  boolean isFollowerBootstrapping(FollowerInfo follower);
+
 }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 01caf72..5b45ca2 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -108,10 +108,24 @@ public interface LogAppender {
     // we should install snapshot if the follower needs to catch up and:
     // 1. there is no local log entry but there is snapshot
     // 2. or the follower's next index is smaller than the log start index
+    // 3. or the follower is bootstrapping and has not installed any snapshot 
yet
+    final FollowerInfo follower = getFollower();
+    final boolean isFollowerBootstrapping = 
getLeaderState().isFollowerBootstrapping(follower);
+    final SnapshotInfo snapshot = 
getServer().getStateMachine().getLatestSnapshot();
+
+    if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
+      if (snapshot == null) {
+        // Leader cannot send null snapshot to follower. Hence, acknowledge 
InstallSnapshot attempt (even though it
+        // was not attempted) so that follower can come out of staging state 
after appending log entries.
+        follower.setAttemptedToInstallSnapshot();
+      } else {
+        return snapshot;
+      }
+    }
+
     final long followerNextIndex = getFollower().getNextIndex();
     if (followerNextIndex < getRaftLog().getNextIndex()) {
       final long logStartIndex = getRaftLog().getStartIndex();
-      final SnapshotInfo snapshot = 
getServer().getStateMachine().getLatestSnapshot();
       if (followerNextIndex < logStartIndex || (logStartIndex == 
RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
         return snapshot;
       }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 0f6c1ab..1499903 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -40,6 +40,7 @@ class FollowerInfoImpl implements FollowerInfo {
   private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", 
RaftLog.INVALID_LOG_INDEX);
   private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 
0L);
   private volatile boolean attendVote;
+  private volatile boolean ackInstallSnapshotAttempt = false;
 
   FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, 
long nextIndex, boolean attendVote) {
     this.name = id + "->" + peer.getId();
@@ -111,6 +112,17 @@ class FollowerInfoImpl implements FollowerInfo {
   }
 
   @Override
+  public void setAttemptedToInstallSnapshot() {
+    LOG.info("Follower {} acknowledged installing snapshot", name);
+    ackInstallSnapshotAttempt = true;
+  }
+
+  @Override
+  public boolean hasAttemptedToInstallSnapshot() {
+    return ackInstallSnapshotAttempt;
+  }
+
+  @Override
   public String getName() {
     return name;
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 601d53e..8bbae24 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -619,7 +619,8 @@ class LeaderStateImpl implements LeaderState {
       LOG.debug("{} detects a follower {} timeout ({}) for bootstrapping", 
this, follower, timeoutTime);
       return BootStrapProgress.NOPROGRESS;
     } else if (follower.getMatchIndex() + stagingCatchupGap > committed
-        && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
+        && follower.getLastRpcResponseTime().compareTo(progressTime) > 0
+        && follower.hasAttemptedToInstallSnapshot()) {
       return BootStrapProgress.CAUGHTUP;
     } else {
       return BootStrapProgress.PROGRESSING;
@@ -643,6 +644,11 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
+  @Override
+  public boolean isFollowerBootstrapping(FollowerInfo follower) {
+    return isBootStrappingPeer(follower.getPeer().getId());
+  }
+
   private void checkStaging() {
     if (!inStagingState()) {
       // it is possible that the bootstrapping is done. Then, fallback to 
UPDATE_COMMIT
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 483d0e1..11b7602 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
@@ -166,6 +167,8 @@ class RaftServerImpl implements RaftServer.Division,
   private final RaftServerMetricsImpl raftServerMetrics;
 
   private final AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
+  private final AtomicLong installedSnapshotIndex;
+  private final AtomicBoolean isSnapshotNull;
 
   // To avoid append entry before complete start() method
   // For example, if thread1 start(), but before thread1 startAsFollower(), 
thread2 receive append entry
@@ -194,6 +197,8 @@ class RaftServerImpl implements RaftServer.Division,
     this.state = new ServerState(id, group, properties, this, stateMachine);
     this.retryCache = new RetryCacheImpl(properties);
     this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
+    this.installedSnapshotIndex = new AtomicLong();
+    this.isSnapshotNull = new AtomicBoolean(false);
     this.dataStreamMap = new DataStreamMapImpl(id);
 
     this.jmxAdapter = new RaftServerJmxAdapter();
@@ -1530,7 +1535,6 @@ class RaftServerImpl implements RaftServer.Division,
     final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
         request.getNotification().getFirstAvailableTermIndex());
     final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
-
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
@@ -1542,23 +1546,22 @@ class RaftServerImpl implements RaftServer.Division,
       }
       changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
+      long snapshotIndex = state.getSnapshotIndex();
 
       
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
-
       if (inProgressInstallSnapshotRequest.compareAndSet(null, 
firstAvailableLogTermIndex)) {
-
+        LOG.info("{}: Received notification to install snapshot at index {}", 
getMemberId(), firstAvailableLogIndex);
         // Check if snapshot index is already at par or ahead of the first
         // available log index of the Leader.
-        long snapshotIndex = state.getSnapshotIndex();
-        if (snapshotIndex + 1 >= firstAvailableLogIndex) {
+        if (snapshotIndex + 1 >= firstAvailableLogIndex && 
firstAvailableLogIndex > 0) {
           // State Machine has already installed the snapshot. Return the
           // latest snapshot index to the Leader.
 
           
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
-          final InstallSnapshotReplyProto reply = 
ServerProtoUtils.toInstallSnapshotReplyProto(
-              leaderId, getMemberId(), currentTerm, 
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
-          LOG.info("{}: StateMachine snapshotIndex is {}", getMemberId(), 
snapshotIndex);
-          return reply;
+          LOG.info("{}: InstallSnapshot notification result: {}, current 
snapshot index: {}", getMemberId(),
+              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+          return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(), currentTerm,
+              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
         }
 
         Optional<RaftPeerProto> leaderPeerInfo = null;
@@ -1596,8 +1599,13 @@ class RaftServerImpl implements RaftServer.Division,
                   stateMachine.pause();
                   state.updateInstalledSnapshotIndex(reply);
                   state.reloadStateMachine(reply.getIndex());
+                  installedSnapshotIndex.set(reply.getIndex());
+                } else {
+                  isSnapshotNull.set(true);
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("{}: StateMachine could not install snapshot as 
it is not available", this);
+                  }
                 }
-                
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
               });
         } catch (Throwable t) {
           
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
@@ -1605,12 +1613,39 @@ class RaftServerImpl implements RaftServer.Division,
         }
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: Snapshot Installation Request received and is in 
progress", getMemberId());
+          LOG.debug("{}: StateMachine is processing Snapshot Installation 
Request.", getMemberId());
         }
       } else {
-        LOG.info("{}: Snapshot Installation by StateMachine is in progress.", 
getMemberId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{}: StateMachine is already installing a snapshot.", 
getMemberId());
+        }
       }
 
+      // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
+      if (isSnapshotNull.compareAndSet(true, false)) {
+        LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
+            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
+        
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
+            currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
+      }
+
+      // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the 
installed snapshot index and reset
+      // installedSnapshotIndex to 0.
+      long latestInstalledSnapshotIndex = 
this.installedSnapshotIndex.getAndSet(0);
+      if (latestInstalledSnapshotIndex > 0) {
+        LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", 
getMemberId(),
+            InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
+        
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
+            currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
+      }
+
+      // Otherwise, Snapshot installation is in progress.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
+            InstallSnapshotResult.IN_PROGRESS);
+      }
       return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
           currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index c433e3d..b706e87 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -176,7 +176,7 @@ public abstract class LogAppenderBase implements 
LogAppender {
 
   @Override
   public InstallSnapshotRequestProto 
newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
-    Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
+    Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0);
     synchronized (server) {
       return LeaderProtoUtils.toInstallSnapshotRequestProto(server, 
getFollowerId(), firstAvailableLogTermIndex);
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 8087395..ad05c51 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -22,7 +22,6 @@ import 
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
 import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -123,9 +122,21 @@ class LogAppenderDefault extends LogAppenderBase {
               this, getFollower().getNextIndex(), 
getRaftLog().getStartIndex(), snapshot);
 
           final InstallSnapshotReplyProto r = installSnapshot(snapshot);
-          if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
-            onFollowerTerm(r.getTerm());
-          } // otherwise if r is null, retry the snapshot installation
+          if (r != null) {
+            switch (r.getResult()) {
+              case NOT_LEADER:
+                onFollowerTerm(r.getTerm());
+                break;
+              case SUCCESS:
+              case SNAPSHOT_UNAVAILABLE:
+              case ALREADY_INSTALLED:
+                getFollower().setAttemptedToInstallSnapshot();
+                break;
+              default:
+                break;
+            }
+          }
+          // otherwise if r is null, retry the snapshot installation
         } else {
           final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
           if (r != null) {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 38f2e43..5f7f4a7 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -350,4 +350,76 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       cluster.shutdown();
     }
   }
+
+  /**
+   * Basic test for install snapshot notification: start a one node cluster
+   * (disable install snapshot option) and let it generate a snapshot. Then
+   * delete the log and restart the node, and add more nodes as followers.
+   * The new follower nodes should get a install snapshot notification.
+   */
+  /**
+   * Test for install snapshot during a peer bootstrap: start a one node 
cluster
+   * (disable install snapshot option) and let it generate a snapshot. Add
+   * another node and verify that the new node receives a install snapshot
+   * notification.
+   */
+  @Test
+  public void testInstallSnapshotDuringBootstrap() throws Exception {
+    runWithNewCluster(1, this::testInstallSnapshotDuringBootstrap);
+  }
+
+  private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws 
Exception {
+    leaderSnapshotInfoRef.set(null);
+    numSnapshotRequests.set(0);
+    int i = 0;
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+          RaftClientReply
+              reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + 
i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+      }
+
+      // wait for the snapshot to be done
+      final RaftServer.Division leader = cluster.getLeader();
+      final long nextIndex = leader.getRaftLog().getNextIndex();
+      LOG.info("nextIndex = {}", nextIndex);
+      final List<File> snapshotFiles = 
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+          nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+      JavaUtils.attemptRepeatedly(() -> {
+        
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+        return null;
+      }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+
+      RaftSnapshotBaseTest.assertLeaderContent(cluster);
+
+      final SnapshotInfo leaderSnapshotInfo = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
+      final boolean set = leaderSnapshotInfoRef.compareAndSet(null, 
leaderSnapshotInfo);
+      Assert.assertTrue(set);
+
+      // add two more peers
+      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+          true);
+      // trigger setConfiguration
+      cluster.setConfiguration(change.allPeersInNewConf);
+
+      RaftServerTestUtil.waitAndCheckNewConf(cluster, 
change.allPeersInNewConf, 0, null);
+
+      // Check the installed snapshot index on each Follower matches with the
+      // leader snapshot.
+      for (RaftServer.Division follower : cluster.getFollowers()) {
+        Assert.assertEquals(leaderSnapshotInfo.getIndex(),
+            RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
+      }
+
+      // Make sure each new peer got one snapshot notification.
+      Assert.assertEquals(2, numSnapshotRequests.get());
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 9837fe3..9538be6 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -258,6 +258,58 @@ public abstract class RaftSnapshotBaseTest extends 
BaseTest {
     }
   }
 
+  /**
+   * Test for install snapshot during a peer bootstrap: start a one node 
cluster
+   * and let it generate a snapshot. Add another node and verify that the new
+   * node installs a snapshot from the old node.
+   */
+  @Test
+  public void testInstallSnapshotDuringBootstrap() throws Exception {
+    int i = 0;
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+          RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+      }
+
+      // wait for the snapshot to be done
+      final long nextIndex = cluster.getLeader().getRaftLog().getNextIndex();
+      LOG.info("nextIndex = {}", nextIndex);
+      final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - 
SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+      JavaUtils.attemptRepeatedly(() -> {
+        
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+        return null;
+      }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+      verifyTakeSnapshotMetric(cluster.getLeader());
+
+      assertLeaderContent(cluster);
+
+      // add two more peers
+      String[] newPeers = new String[]{"s3", "s4"};
+      MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
+          newPeers, true, false);
+      // trigger setConfiguration
+      cluster.setConfiguration(change.allPeersInNewConf);
+
+      for (String newPeer : newPeers) {
+        final RaftServer.Division s = 
cluster.getDivision(RaftPeerId.valueOf(newPeer));
+        SimpleStateMachine4Testing simpleStateMachine = 
SimpleStateMachine4Testing.get(s);
+        Assert.assertSame(LifeCycle.State.RUNNING, 
simpleStateMachine.getLifeCycleState());
+      }
+
+      // Verify installSnapshot counter on leader
+      verifyInstallSnapshotMetric(cluster.getLeader());
+      RaftServerTestUtil.waitAndCheckNewConf(cluster, 
change.allPeersInNewConf, 0, null);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   protected void verifyInstallSnapshotMetric(RaftServer.Division leader) {
     final Counter installSnapshotCounter = 
((RaftServerMetricsImpl)leader.getRaftServerMetrics())
         .getCounter(RATIS_SERVER_INSTALL_SNAPSHOT_COUNT);

Reply via email to