This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new cc5f2e5 RATIS-1390. Bootstrapping Peer should always try to install a
snapshot the first time. (#489)
cc5f2e5 is described below
commit cc5f2e58bcfdf2232dad5b9f89745489aadd4f80
Author: Hanisha Koneru <[email protected]>
AuthorDate: Sun Aug 22 08:03:26 2021 -0700
RATIS-1390. Bootstrapping Peer should always try to install a snapshot the
first time. (#489)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 36 +++++++++--
ratis-proto/src/main/proto/Raft.proto | 2 +
.../apache/ratis/server/leader/FollowerInfo.java | 8 +++
.../apache/ratis/server/leader/LeaderState.java | 4 ++
.../apache/ratis/server/leader/LogAppender.java | 16 ++++-
.../apache/ratis/server/impl/FollowerInfoImpl.java | 12 ++++
.../apache/ratis/server/impl/LeaderStateImpl.java | 8 ++-
.../apache/ratis/server/impl/RaftServerImpl.java | 59 ++++++++++++++----
.../ratis/server/leader/LogAppenderBase.java | 2 +-
.../ratis/server/leader/LogAppenderDefault.java | 19 ++++--
.../ratis/InstallSnapshotNotificationTests.java | 72 ++++++++++++++++++++++
.../ratis/statemachine/RaftSnapshotBaseTest.java | 52 ++++++++++++++++
12 files changed, 267 insertions(+), 23 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index f947f1a..f9968e7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -407,10 +407,11 @@ public class GrpcLogAppender extends LogAppenderBase {
if (!firstResponseReceived) {
firstResponseReceived = true;
}
-
+ final long followerSnapshotIndex;
switch (reply.getResult()) {
case SUCCESS:
LOG.info("{}: Completed InstallSnapshot. Reply: {}", this, reply);
+ getFollower().setAttemptedToInstallSnapshot();
removePending(reply);
break;
case IN_PROGRESS:
@@ -418,9 +419,10 @@ public class GrpcLogAppender extends LogAppenderBase {
removePending(reply);
break;
case ALREADY_INSTALLED:
- final long followerSnapshotIndex = reply.getSnapshotIndex();
- LOG.info("{}: Already Installed Snapshot Index {}.", this,
followerSnapshotIndex);
+ followerSnapshotIndex = reply.getSnapshotIndex();
+ LOG.info("{}: Follower snapshot is already at index {}.", this,
followerSnapshotIndex);
getFollower().setSnapshotIndex(followerSnapshotIndex);
+ getFollower().setAttemptedToInstallSnapshot();
getLeaderState().onFollowerCommitIndex(getFollower(),
followerSnapshotIndex);
increaseNextIndex(followerSnapshotIndex);
removePending(reply);
@@ -433,6 +435,20 @@ public class GrpcLogAppender extends LogAppenderBase {
this,
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
getServer().getId(), installSnapshotEnabled, getFollowerId(),
!installSnapshotEnabled);
break;
+ case SNAPSHOT_INSTALLED:
+ followerSnapshotIndex = reply.getSnapshotIndex();
+ LOG.info("{}: Follower installed snapshot at index {}", this,
followerSnapshotIndex);
+ getFollower().setSnapshotIndex(followerSnapshotIndex);
+ getFollower().setAttemptedToInstallSnapshot();
+ getLeaderState().onFollowerCommitIndex(getFollower(),
followerSnapshotIndex);
+ increaseNextIndex(followerSnapshotIndex);
+ removePending(reply);
+ break;
+ case SNAPSHOT_UNAVAILABLE:
+ LOG.info("{}: Follower could not install snapshot as it is not
available.", this);
+ getFollower().setAttemptedToInstallSnapshot();
+ removePending(reply);
+ break;
case UNRECOGNIZED:
LOG.error("Unrecongnized the reply result {}: Leader is {}, follower
is {}",
reply.getResult(), getServer().getId(), getFollowerId());
@@ -562,9 +578,21 @@ public class GrpcLogAppender extends LogAppenderBase {
* @return the first available log's start term index
*/
private TermIndex shouldNotifyToInstallSnapshot() {
- final long followerNextIndex = getFollower().getNextIndex();
+ final FollowerInfo follower = getFollower();
final long leaderNextIndex = getRaftLog().getNextIndex();
+ final boolean isFollowerBootstrapping =
getLeaderState().isFollowerBootstrapping(follower);
+
+ if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
+ // If the follower is bootstrapping and has not yet installed any
snapshot from leader, then the follower should
+ // be notified to install a snapshot. Every follower should try to
install at least one snapshot during
+ // bootstrapping, if available.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Notify follower to install snapshot as it is
bootstrapping.", this);
+ }
+ return getRaftLog().getLastEntryTermIndex();
+ }
+ final long followerNextIndex = follower.getNextIndex();
if (followerNextIndex >= leaderNextIndex) {
return null;
}
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 837b2f5..c98f0d7 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -148,6 +148,8 @@ enum InstallSnapshotResult {
IN_PROGRESS = 2;
ALREADY_INSTALLED = 3;
CONF_MISMATCH = 4;
+ SNAPSHOT_INSTALLED = 5;
+ SNAPSHOT_UNAVAILABLE = 6;
}
message RequestVoteRequestProto {
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index 487576f..2de24b0 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -52,6 +52,14 @@ public interface FollowerInfo {
/** Set follower's snapshotIndex. */
void setSnapshotIndex(long newSnapshotIndex);
+ /** Acknowledge that Follower attempted to install a snapshot. It does not
guarantee that the installation was
+ * successful. This helps to determine whether Follower can come out of
bootstrap process. */
+ void setAttemptedToInstallSnapshot();
+
+ /** Return true if install snapshot has been attempted by the Follower at
least once. Used to verify if
+ * Follower tried to install snapshot during bootstrap process. */
+ boolean hasAttemptedToInstallSnapshot();
+
/** @return the nextIndex for this follower. */
long getNextIndex();
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index 5460392..240c9a1 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -58,4 +58,8 @@ public interface LeaderState {
/** Handle the event that the follower has replied a success append entries.
*/
void onFollowerSuccessAppendEntries(FollowerInfo follower);
+
+ /** Check if a follower is bootstrapping. */
+ boolean isFollowerBootstrapping(FollowerInfo follower);
+
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 01caf72..5b45ca2 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -108,10 +108,24 @@ public interface LogAppender {
// we should install snapshot if the follower needs to catch up and:
// 1. there is no local log entry but there is snapshot
// 2. or the follower's next index is smaller than the log start index
+ // 3. or the follower is bootstrapping and has not installed any snapshot
yet
+ final FollowerInfo follower = getFollower();
+ final boolean isFollowerBootstrapping =
getLeaderState().isFollowerBootstrapping(follower);
+ final SnapshotInfo snapshot =
getServer().getStateMachine().getLatestSnapshot();
+
+ if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
+ if (snapshot == null) {
+ // Leader cannot send null snapshot to follower. Hence, acknowledge
InstallSnapshot attempt (even though it
+ // was not attempted) so that follower can come out of staging state
after appending log entries.
+ follower.setAttemptedToInstallSnapshot();
+ } else {
+ return snapshot;
+ }
+ }
+
final long followerNextIndex = getFollower().getNextIndex();
if (followerNextIndex < getRaftLog().getNextIndex()) {
final long logStartIndex = getRaftLog().getStartIndex();
- final SnapshotInfo snapshot =
getServer().getStateMachine().getLatestSnapshot();
if (followerNextIndex < logStartIndex || (logStartIndex ==
RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
return snapshot;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 0f6c1ab..1499903 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -40,6 +40,7 @@ class FollowerInfoImpl implements FollowerInfo {
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex",
RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex",
0L);
private volatile boolean attendVote;
+ private volatile boolean ackInstallSnapshotAttempt = false;
FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime,
long nextIndex, boolean attendVote) {
this.name = id + "->" + peer.getId();
@@ -111,6 +112,17 @@ class FollowerInfoImpl implements FollowerInfo {
}
@Override
+ public void setAttemptedToInstallSnapshot() {
+ LOG.info("Follower {} acknowledged installing snapshot", name);
+ ackInstallSnapshotAttempt = true;
+ }
+
+ @Override
+ public boolean hasAttemptedToInstallSnapshot() {
+ return ackInstallSnapshotAttempt;
+ }
+
+ @Override
public String getName() {
return name;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 601d53e..8bbae24 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -619,7 +619,8 @@ class LeaderStateImpl implements LeaderState {
LOG.debug("{} detects a follower {} timeout ({}) for bootstrapping",
this, follower, timeoutTime);
return BootStrapProgress.NOPROGRESS;
} else if (follower.getMatchIndex() + stagingCatchupGap > committed
- && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
+ && follower.getLastRpcResponseTime().compareTo(progressTime) > 0
+ && follower.hasAttemptedToInstallSnapshot()) {
return BootStrapProgress.CAUGHTUP;
} else {
return BootStrapProgress.PROGRESSING;
@@ -643,6 +644,11 @@ class LeaderStateImpl implements LeaderState {
}
}
+ @Override
+ public boolean isFollowerBootstrapping(FollowerInfo follower) {
+ return isBootStrappingPeer(follower.getPeer().getId());
+ }
+
private void checkStaging() {
if (!inStagingState()) {
// it is possible that the bootstrapping is done. Then, fallback to
UPDATE_COMMIT
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 483d0e1..11b7602 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.impl;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
@@ -166,6 +167,8 @@ class RaftServerImpl implements RaftServer.Division,
private final RaftServerMetricsImpl raftServerMetrics;
private final AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
+ private final AtomicLong installedSnapshotIndex;
+ private final AtomicBoolean isSnapshotNull;
// To avoid append entry before complete start() method
// For example, if thread1 start(), but before thread1 startAsFollower(),
thread2 receive append entry
@@ -194,6 +197,8 @@ class RaftServerImpl implements RaftServer.Division,
this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = new RetryCacheImpl(properties);
this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
+ this.installedSnapshotIndex = new AtomicLong();
+ this.isSnapshotNull = new AtomicBoolean(false);
this.dataStreamMap = new DataStreamMapImpl(id);
this.jmxAdapter = new RaftServerJmxAdapter();
@@ -1530,7 +1535,6 @@ class RaftServerImpl implements RaftServer.Division,
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
-
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
@@ -1542,23 +1546,22 @@ class RaftServerImpl implements RaftServer.Division,
}
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
+ long snapshotIndex = state.getSnapshotIndex();
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
-
if (inProgressInstallSnapshotRequest.compareAndSet(null,
firstAvailableLogTermIndex)) {
-
+ LOG.info("{}: Received notification to install snapshot at index {}",
getMemberId(), firstAvailableLogIndex);
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
- long snapshotIndex = state.getSnapshotIndex();
- if (snapshotIndex + 1 >= firstAvailableLogIndex) {
+ if (snapshotIndex + 1 >= firstAvailableLogIndex &&
firstAvailableLogIndex > 0) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
- final InstallSnapshotReplyProto reply =
ServerProtoUtils.toInstallSnapshotReplyProto(
- leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
- LOG.info("{}: StateMachine snapshotIndex is {}", getMemberId(),
snapshotIndex);
- return reply;
+ LOG.info("{}: InstallSnapshot notification result: {}, current
snapshot index: {}", getMemberId(),
+ InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+ return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(), currentTerm,
+ InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
}
Optional<RaftPeerProto> leaderPeerInfo = null;
@@ -1596,8 +1599,13 @@ class RaftServerImpl implements RaftServer.Division,
stateMachine.pause();
state.updateInstalledSnapshotIndex(reply);
state.reloadStateMachine(reply.getIndex());
+ installedSnapshotIndex.set(reply.getIndex());
+ } else {
+ isSnapshotNull.set(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: StateMachine could not install snapshot as
it is not available", this);
+ }
}
-
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
});
} catch (Throwable t) {
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
@@ -1605,12 +1613,39 @@ class RaftServerImpl implements RaftServer.Division,
}
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot Installation Request received and is in
progress", getMemberId());
+ LOG.debug("{}: StateMachine is processing Snapshot Installation
Request.", getMemberId());
}
} else {
- LOG.info("{}: Snapshot Installation by StateMachine is in progress.",
getMemberId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: StateMachine is already installing a snapshot.",
getMemberId());
+ }
}
+ // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
+ if (isSnapshotNull.compareAndSet(true, false)) {
+ LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
+ InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
+
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+ return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
+ currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
+ }
+
+ // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the
installed snapshot index and reset
+ // installedSnapshotIndex to 0.
+ long latestInstalledSnapshotIndex =
this.installedSnapshotIndex.getAndSet(0);
+ if (latestInstalledSnapshotIndex > 0) {
+ LOG.info("{}: InstallSnapshot notification result: {}, at index: {}",
getMemberId(),
+ InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotIndex);
+
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+ return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
+ currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotIndex);
+ }
+
+ // Otherwise, Snapshot installation is in progress.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
+ InstallSnapshotResult.IN_PROGRESS);
+ }
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index c433e3d..b706e87 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -176,7 +176,7 @@ public abstract class LogAppenderBase implements
LogAppender {
@Override
public InstallSnapshotRequestProto
newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
- Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
+ Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0);
synchronized (server) {
return LeaderProtoUtils.toInstallSnapshotRequestProto(server,
getFollowerId(), firstAvailableLogTermIndex);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 8087395..ad05c51 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -22,7 +22,6 @@ import
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -123,9 +122,21 @@ class LogAppenderDefault extends LogAppenderBase {
this, getFollower().getNextIndex(),
getRaftLog().getStartIndex(), snapshot);
final InstallSnapshotReplyProto r = installSnapshot(snapshot);
- if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
- onFollowerTerm(r.getTerm());
- } // otherwise if r is null, retry the snapshot installation
+ if (r != null) {
+ switch (r.getResult()) {
+ case NOT_LEADER:
+ onFollowerTerm(r.getTerm());
+ break;
+ case SUCCESS:
+ case SNAPSHOT_UNAVAILABLE:
+ case ALREADY_INSTALLED:
+ getFollower().setAttemptedToInstallSnapshot();
+ break;
+ default:
+ break;
+ }
+ }
+ // otherwise if r is null, retry the snapshot installation
} else {
final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
if (r != null) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 38f2e43..5f7f4a7 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -350,4 +350,76 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
cluster.shutdown();
}
}
+
+ /**
+ * Basic test for install snapshot notification: start a one node cluster
+ * (disable install snapshot option) and let it generate a snapshot. Then
+ * delete the log and restart the node, and add more nodes as followers.
+ * The new follower nodes should get a install snapshot notification.
+ */
+ /**
+ * Test for install snapshot during a peer bootstrap: start a one node
cluster
+ * (disable install snapshot option) and let it generate a snapshot. Add
+ * another node and verify that the new node receives a install snapshot
+ * notification.
+ */
+ @Test
+ public void testInstallSnapshotDuringBootstrap() throws Exception {
+ runWithNewCluster(1, this::testInstallSnapshotDuringBootstrap);
+ }
+
+ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws
Exception {
+ leaderSnapshotInfoRef.set(null);
+ numSnapshotRequests.set(0);
+ int i = 0;
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try(final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+ RaftClientReply
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("m" +
i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+ }
+
+ // wait for the snapshot to be done
+ final RaftServer.Division leader = cluster.getLeader();
+ final long nextIndex = leader.getRaftLog().getNextIndex();
+ LOG.info("nextIndex = {}", nextIndex);
+ final List<File> snapshotFiles =
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+ nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+ JavaUtils.attemptRepeatedly(() -> {
+
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+ return null;
+ }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+
+ RaftSnapshotBaseTest.assertLeaderContent(cluster);
+
+ final SnapshotInfo leaderSnapshotInfo =
cluster.getLeader().getStateMachine().getLatestSnapshot();
+ final boolean set = leaderSnapshotInfoRef.compareAndSet(null,
leaderSnapshotInfo);
+ Assert.assertTrue(set);
+
+ // add two more peers
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+ true);
+ // trigger setConfiguration
+ cluster.setConfiguration(change.allPeersInNewConf);
+
+ RaftServerTestUtil.waitAndCheckNewConf(cluster,
change.allPeersInNewConf, 0, null);
+
+ // Check the installed snapshot index on each Follower matches with the
+ // leader snapshot.
+ for (RaftServer.Division follower : cluster.getFollowers()) {
+ Assert.assertEquals(leaderSnapshotInfo.getIndex(),
+ RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
+ }
+
+ // Make sure each new peer got one snapshot notification.
+ Assert.assertEquals(2, numSnapshotRequests.get());
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 9837fe3..9538be6 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -258,6 +258,58 @@ public abstract class RaftSnapshotBaseTest extends
BaseTest {
}
}
+ /**
+ * Test for install snapshot during a peer bootstrap: start a one node
cluster
+ * and let it generate a snapshot. Add another node and verify that the new
+ * node installs a snapshot from the old node.
+ */
+ @Test
+ public void testInstallSnapshotDuringBootstrap() throws Exception {
+ int i = 0;
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try(final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+ RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+ }
+
+ // wait for the snapshot to be done
+ final long nextIndex = cluster.getLeader().getRaftLog().getNextIndex();
+ LOG.info("nextIndex = {}", nextIndex);
+ final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex -
SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+ JavaUtils.attemptRepeatedly(() -> {
+
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+ return null;
+ }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+ verifyTakeSnapshotMetric(cluster.getLeader());
+
+ assertLeaderContent(cluster);
+
+ // add two more peers
+ String[] newPeers = new String[]{"s3", "s4"};
+ MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
+ newPeers, true, false);
+ // trigger setConfiguration
+ cluster.setConfiguration(change.allPeersInNewConf);
+
+ for (String newPeer : newPeers) {
+ final RaftServer.Division s =
cluster.getDivision(RaftPeerId.valueOf(newPeer));
+ SimpleStateMachine4Testing simpleStateMachine =
SimpleStateMachine4Testing.get(s);
+ Assert.assertSame(LifeCycle.State.RUNNING,
simpleStateMachine.getLifeCycleState());
+ }
+
+ // Verify installSnapshot counter on leader
+ verifyInstallSnapshotMetric(cluster.getLeader());
+ RaftServerTestUtil.waitAndCheckNewConf(cluster,
change.allPeersInNewConf, 0, null);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
protected void verifyInstallSnapshotMetric(RaftServer.Division leader) {
final Counter installSnapshotCounter =
((RaftServerMetricsImpl)leader.getRaftServerMetrics())
.getCounter(RATIS_SERVER_INSTALL_SNAPSHOT_COUNT);