This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6bdfb58 RATIS-498. Notify Follower to Install Snapshot through state
machine. Contributed by Hanisha Koneru
6bdfb58 is described below
commit 6bdfb587763d9e6faeeaf8918b8b6316c4cb7fe0
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Apr 3 21:15:48 2019 +0800
RATIS-498. Notify Follower to Install Snapshot through state machine.
Contributed by Hanisha Koneru
---
.../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 19 --
.../apache/ratis/grpc/server/GrpcLogAppender.java | 93 +++++++++-
ratis-proto/src/main/proto/Raft.proto | 7 +-
.../apache/ratis/server/RaftServerConfigKeys.java | 10 ++
.../org/apache/ratis/server/impl/FollowerInfo.java | 6 +-
.../apache/ratis/server/impl/FollowerState.java | 1 +
.../org/apache/ratis/server/impl/LogAppender.java | 7 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 192 +++++++++++++++++++--
.../apache/ratis/server/impl/ServerProtoUtils.java | 24 +++
.../org/apache/ratis/server/impl/ServerState.java | 24 +++
.../apache/ratis/statemachine/StateMachine.java | 14 +-
.../impl/SimpleStateMachineStorage.java | 6 +
.../ratis/statemachine/RaftSnapshotBaseTest.java | 6 +-
.../statemachine/SimpleStateMachine4Testing.java | 4 +
.../ratis/grpc/TestInstallSnapshotWithGrpc.java | 188 ++++++++++++++++++++
15 files changed, 550 insertions(+), 51 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 08ca49e..1f18810 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -159,25 +159,6 @@ public interface GrpcConfigKeys {
}
}
- interface LogAppender {
- Logger LOG = LoggerFactory.getLogger(Server.class);
- static Consumer<String> getDefaultLog() {
- return LOG::info;
- }
-
- String PREFIX = GrpcConfigKeys.PREFIX + ".log.appender";
-
- String INSTALL_SNAPSHOT_ENABLED_KEY = PREFIX + ".install.snapshot.enabled";
- boolean INSTALL_SNAPSHOT_ENABLED_DEFAULT = true;
- static boolean installSnapshotEnabled(RaftProperties properties) {
- return getBoolean(properties::getBoolean,
- INSTALL_SNAPSHOT_ENABLED_KEY, INSTALL_SNAPSHOT_ENABLED_DEFAULT,
getDefaultLog());
- }
- static void setInstallSnapshotEnabled(RaftProperties properties, boolean
shouldInstallSnapshot) {
- setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY,
shouldInstallSnapshot);
- }
- }
-
String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB");
static SizeInBytes messageSizeMax(RaftProperties properties,
Consumer<String> logger) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index da803c4..007284e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -25,6 +25,7 @@ import org.apache.ratis.server.impl.LeaderState;
import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
@@ -68,7 +69,7 @@ public class GrpcLogAppender extends LogAppender {
server.getProxy().getProperties());
requestTimeoutDuration =
RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
pendingRequests = new ConcurrentHashMap<>();
- installSnapshotEnabled = GrpcConfigKeys.LogAppender.installSnapshotEnabled(
+ installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(
server.getProxy().getProperties());
}
@@ -99,6 +100,12 @@ public class GrpcLogAppender extends LogAppender {
installSnapshot(snapshot);
shouldAppendLog = false;
}
+ } else {
+ TermIndex installSnapshotNotificationTermIndex =
shouldNotifyToInstallSnapshot();
+ if (installSnapshotNotificationTermIndex != null) {
+ installSnapshot(installSnapshotNotificationTermIndex);
+ shouldAppendLog = false;
+ }
}
if (shouldAppendLog && !shouldWait()) {
// keep appending log entries or sending heartbeats
@@ -163,7 +170,7 @@ public class GrpcLogAppender extends LogAppender {
return;
}
pendingRequests.put(pending.getServerRequest().getCallId(), pending);
- updateNextIndex(pending);
+ increaseNextIndex(pending);
if (appendLogRequestObserver == null) {
appendLogRequestObserver = getClient().appendEntries(new
AppendLogResponseHandler());
}
@@ -193,10 +200,10 @@ public class GrpcLogAppender extends LogAppender {
}
}
- private void updateNextIndex(AppendEntriesRequestProto request) {
+ private void increaseNextIndex(AppendEntriesRequestProto request) {
final int count = request.getEntriesCount();
if (count > 0) {
- follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1);
+ follower.increaseNextIndex(request.getEntries(count - 1).getIndex() + 1);
}
}
@@ -210,8 +217,8 @@ public class GrpcLogAppender extends LogAppender {
* 1. If the reply is success, update the follower's match index and submit
* an event to leaderState
* 2. If the reply is NOT_LEADER, step down
- * 3. If the reply is INCONSISTENCY, decrease the follower's next index
- * based on the response
+ * 3. If the reply is INCONSISTENCY, increase/ decrease the follower's next
+ * index based on the response
*/
@Override
public void onNext(AppendEntriesReplyProto reply) {
@@ -325,7 +332,8 @@ public class GrpcLogAppender extends LogAppender {
}
Preconditions.assertTrue(request.hasPreviousLog());
if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
- clearPendingRequests(reply.getNextIndex());
+ pendingRequests.clear();
+ follower.updateNextIndex(reply.getNextIndex());
}
}
@@ -375,6 +383,14 @@ public class GrpcLogAppender extends LogAppender {
switch (reply.getResult()) {
case SUCCESS:
+ case IN_PROGRESS:
+ removePending(reply);
+ break;
+ case ALREADY_INSTALLED:
+ long followerLatestSnapshotIndex = reply.getSnapshotIndex();
+ LOG.info("{}: Latest snapshot index on follower {} is {}.",
+ server.getId(), follower.getPeer(), followerLatestSnapshotIndex);
+ follower.setSnapshotIndex(followerLatestSnapshotIndex);
removePending(reply);
break;
case NOT_LEADER:
@@ -399,12 +415,15 @@ public class GrpcLogAppender extends LogAppender {
@Override
public void onCompleted() {
- LOG.info("{} stops sending snapshots to follower {}", server.getId(),
- follower);
+ LOG.info("Snapshot(s) sent from {} to follower {}", server.getId(),
follower);
close();
}
}
+ /**
+ * Send installSnapshot request to Follower with a snapshot.
+ * @param snapshot the snapshot to be sent to Follower
+ */
private void installSnapshot(SnapshotInfo snapshot) {
LOG.info("{}: follower {}'s next index is {}," +
" log's start index is {}, need to install snapshot",
@@ -451,4 +470,60 @@ public class GrpcLogAppender extends LogAppender {
server.getId(), snapshot.getTermIndex().getIndex(),
follower.getPeer());
}
}
+
+ /**
+ * Send installSnapshot request to Follower with only a notification that a
snapshot needs to be installed.
+ * @param firstAvailableLogTermIndex the first available log's index on the
Leader
+ */
+ private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
+ LOG.info("{}: follower {}'s next index is {}, log's start index is {}, " +
+ "need to notify follower to install snapshot",
+ server.getId(), follower.getPeer(), follower.getNextIndex(),
+ raftLog.getStartIndex());
+
+ final InstallSnapshotResponseHandler responseHandler = new
InstallSnapshotResponseHandler();
+ StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
+ // prepare and enqueue the notify install snapshot request.
+ InstallSnapshotRequestProto request =
+ createInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
+ try {
+ snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+ snapshotRequestObserver.onNext(request);
+ follower.updateLastRpcSendTime();
+ responseHandler.addPending(request);
+ snapshotRequestObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.warn("{} failed to notify follower {} to install snapshot. " +
+ "Exception: {}", this, follower, e);
+ if (snapshotRequestObserver != null) {
+ snapshotRequestObserver.onError(e);
+ }
+ return;
+ }
+
+ synchronized (this) {
+ if (isAppenderRunning() && !responseHandler.isDone()) {
+ try {
+ wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }
+
+ /**
+ * Should the Leader notify the Follower to install the snapshot through
+ * its own State Machine.
+ * @return the first available log's start term index
+ */
+ protected TermIndex shouldNotifyToInstallSnapshot() {
+ if (follower.getNextIndex() < raftLog.getStartIndex()) {
+ // The Leader does not have the logs from the Follower's last log
+ // index onwards. And install snapshot is disabled. So the Follower
+ // should be notified to install the latest snapshot through its
+ // State Machine.
+ return raftLog.getTermIndex(raftLog.getStartIndex());
+ }
+ return null;
+ }
}
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index b081352..96b91ec 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -124,6 +124,8 @@ message FileChunkProto {
enum InstallSnapshotResult {
SUCCESS = 0;
NOT_LEADER = 1;
+ IN_PROGRESS = 2;
+ ALREADY_INSTALLED = 3;
}
message RequestVoteRequestProto {
@@ -158,7 +160,8 @@ message AppendEntriesReplyProto {
enum AppendResult {
SUCCESS = 0;
NOT_LEADER = 1; // the requester's term is not large enough
- INCONSISTENCY = 2; // gap between the local log and the entries
+ INCONSISTENCY = 2; // gap between the local log and the entries or
snapshot installation in progress or
+ // overlap between local snapshot and the entries
}
RaftRpcReplyProto serverReply = 1;
@@ -178,6 +181,7 @@ message InstallSnapshotRequestProto {
repeated FileChunkProto fileChunks = 7;
uint64 totalSize = 8;
bool done = 9; // whether this is the final chunk for the same req.
+ TermIndexProto firstAvailableLogIndex = 11; // first available log index to
notify Follower to install snapshot
}
message InstallSnapshotReplyProto {
@@ -185,6 +189,7 @@ message InstallSnapshotReplyProto {
uint32 requestIndex = 2;
uint64 term = 3;
InstallSnapshotResult result = 4;
+ uint64 snapshotIndex = 5;
}
message ClientMessageEntryProto {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9cbdf2f..b4f7faf 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -267,6 +267,16 @@ public interface RaftServerConfigKeys {
static void setSnapshotChunkSizeMax(RaftProperties properties,
SizeInBytes maxChunkSize) {
setSizeInBytes(properties::set, SNAPSHOT_CHUNK_SIZE_MAX_KEY,
maxChunkSize);
}
+
+ String INSTALL_SNAPSHOT_ENABLED_KEY = PREFIX +
".install.snapshot.enabled";
+ boolean INSTALL_SNAPSHOT_ENABLED_DEFAULT = true;
+ static boolean installSnapshotEnabled(RaftProperties properties) {
+ return getBoolean(properties::getBoolean,
+ INSTALL_SNAPSHOT_ENABLED_KEY, INSTALL_SNAPSHOT_ENABLED_DEFAULT,
getDefaultLog());
+ }
+ static void setInstallSnapshotEnabled(RaftProperties properties, boolean
shouldInstallSnapshot) {
+ setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY,
shouldInstallSnapshot);
+ }
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index cad9620..5206528 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -79,7 +79,7 @@ public class FollowerInfo {
return nextIndex.get();
}
- public void updateNextIndex(long newNextIndex) {
+ public void increaseNextIndex(long newNextIndex) {
nextIndex.updateIncreasingly(newNextIndex, debugIndexChange);
}
@@ -87,6 +87,10 @@ public class FollowerInfo {
nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1,
newNextIndex), infoIndexChange);
}
+ public void updateNextIndex(long newNextIndex) {
+ nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex :
old, infoIndexChange);
+ }
+
public void setSnapshotIndex(long snapshotIndex) {
matchIndex.setUnconditionally(snapshotIndex, infoIndexChange);
nextIndex.setUnconditionally(snapshotIndex + 1, infoIndexChange);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index f5d55dd..1d2afda 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -34,6 +34,7 @@ class FollowerState extends Daemon {
APPEND_COMPLETE(AtomicInteger::decrementAndGet),
INSTALL_SNAPSHOT_START(AtomicInteger::incrementAndGet),
INSTALL_SNAPSHOT_COMPLETE(AtomicInteger::decrementAndGet),
+ INSTALL_SNAPSHOT_NOTIFICATION(AtomicInteger::get),
REQUEST_VOTE(AtomicInteger::get);
private final ToIntFunction<AtomicInteger> updateFunction;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 17a4073..136b4d9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -316,6 +316,11 @@ public class LogAppender {
}
}
+ protected InstallSnapshotRequestProto
createInstallSnapshotNotificationRequest(
+ TermIndex firstLogStartTermIndex) {
+ return server.createInstallSnapshotRequest(getFollowerId(),
firstLogStartTermIndex);
+ }
+
private FileChunkProto readFileChunk(FileInfo fileInfo,
FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
throws IOException {
@@ -425,7 +430,7 @@ public class LogAppender {
if (nextIndex > oldNextIndex) {
follower.updateMatchIndex(nextIndex - 1);
- follower.updateNextIndex(nextIndex);
+ follower.increaseNextIndex(nextIndex);
submitEventOnSuccessAppend();
}
break;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fb57b6c..6788e96 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -70,6 +71,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
private final int minTimeoutMs;
private final int maxTimeoutMs;
private final int rpcSlownessTimeoutMs;
+ private final boolean installSnapshotEnabled;
private final LifeCycle lifeCycle;
private final ServerState state;
@@ -82,6 +84,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
private final RaftServerJmxAdapter jmxAdapter;
+ private AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
+
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy
proxy) throws IOException {
final RaftPeerId id = proxy.getId();
LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
@@ -94,12 +98,14 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
minTimeoutMs =
RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
maxTimeoutMs =
RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
rpcSlownessTimeoutMs =
RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
+ installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
"max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
this.proxy = proxy;
this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = initRetryCache(properties);
+ this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
this.jmxAdapter = new RaftServerJmxAdapter();
}
@@ -872,8 +878,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
final List<CompletableFuture<Long>> futures;
final long currentTerm;
- final long nextIndex = state.getLog().getNextIndex();
final long followerCommit = state.getLog().getLastCommittedIndex();
+ final long nextIndex = state.getNextIndex();
final Optional<FollowerState> followerState;
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
@@ -899,22 +905,20 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
}
followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
- // We need to check if "previous" is in the local peer. Note that it is
- // possible that "previous" is covered by the latest snapshot: e.g.,
- // it's possible there's no log entries outside of the latest snapshot.
- // However, it is not possible that "previous" index is smaller than the
- // last index included in snapshot. This is because indices <= snapshot's
- // last index should have been committed.
- if (previous != null && !containPrevious(previous)) {
- final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), groupId, currentTerm, followerCommit,
Math.min(nextIndex, previous.getIndex()),
- INCONSISTENCY, callId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
- getId(), previous, ServerProtoUtils.toString(reply));
- }
+ // Check that the append entries are not inconsistent. There are 3
+ // scenarios which can result in inconsistency:
+ // 1. There is a snapshot installation in progress
+ // 2. There is an overlap between the snapshot index and the entries
+ // 3. There is a gap between the local log and the entries
+ // In any of these scenarios, we should retrun an INCONSISTENCY reply
+ // back to leader so that the leader can update this follower's next
+ // index.
+
+ AppendEntriesReplyProto inconsistencyReply =
checkInconsistentAppendEntries(
+ leaderId, currentTerm, followerCommit, previous, nextIndex, callId,
entries);
+ if (inconsistencyReply != null) {
followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
- return CompletableFuture.completedFuture(reply);
+ return CompletableFuture.completedFuture(inconsistencyReply);
}
state.updateConfiguration(entries);
@@ -942,6 +946,64 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
});
}
+ private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId
leaderId, long currentTerm,
+ long followerCommit, TermIndex previous, long nextIndex, long callId,
LogEntryProto... entries) {
+ long replyNextIndex = -1;
+
+ // Check if a snapshot installation through state machine is in progress.
+ if (inProgressInstallSnapshotRequest.get() != null) {
+ replyNextIndex = Math.min(nextIndex, previous.getIndex());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Cannot append entries as snapshot installation is in " +
+ "progress. Follower next index: {}", getId(), replyNextIndex);
+ }
+ }
+
+ // If a snapshot installation has happened, the new snapshot might
+ // include the log entry indices sent as part of the
+ // AppendEntriesRequestProto. Check that the first log entry proto is
+ // greater than the last index included in the latest snapshot. If not,
+ // the leader should be informed about the new snapshot index so that
+ // it can send log entries only from the next log index
+ long snapshotIndex = state.getSnapshotIndex();
+ if (snapshotIndex > 0 && entries != null && entries.length > 0
+ && entries[0].getIndex() <= snapshotIndex) {
+ replyNextIndex = snapshotIndex + 1;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Cannot append entries as latest snapshot already has " +
+ "the append entries. Snapshot index: {}, first append entry " +
+ "index: {}.", getId(), snapshotIndex, entries[0].getIndex());
+ }
+ }
+
+ // We need to check if "previous" is in the local peer. Note that it is
+ // possible that "previous" is covered by the latest snapshot: e.g.,
+ // it's possible there's no log entries outside of the latest snapshot.
+ // However, it is not possible that "previous" index is smaller than the
+ // last index included in snapshot. This is because indices <= snapshot's
+ // last index should have been committed.
+ if (previous != null && !containPrevious(previous)) {
+ replyNextIndex = Math.min(nextIndex, previous.getIndex());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Cannot append entries as there is a gap between " +
+ "local log and append entries. Previous is not present. " +
+ "Previous: {}, follower next index: {}", getId(), previous,
replyNextIndex);
+ }
+ }
+
+ if (replyNextIndex != -1) {
+ final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
+ leaderId, getId(), groupId, currentTerm, followerCommit,
replyNextIndex,
+ INCONSISTENCY, callId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: inconsistency entries. Reply:{}", getId(),
ServerProtoUtils.toString(reply));
+ }
+ return reply;
+ }
+
+ return null;
+ }
+
private boolean containPrevious(TermIndex previous) {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}",
@@ -967,6 +1029,18 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
assertLifeCycleState(STARTING, RUNNING);
assertGroup(leaderId, leaderGroupId);
+ // Check if install snapshot from Leader is enabled
+ if (installSnapshotEnabled) {
+ // Leader has sent InstallSnapshot request with SnapshotInfo. Install
the snapshot.
+ return checkAndInstallSnapshot(request, leaderId);
+ } else {
+ // Leader has only sent a notification to install snapshot. Inform State
Machine to install snapshot.
+ return notifyStateMachineToInstallSnapshot(request, leaderId);
+ }
+ }
+
+ private InstallSnapshotReplyProto checkAndInstallSnapshot(
+ InstallSnapshotRequestProto request, RaftPeerId leaderId) throws
IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
@@ -1017,6 +1091,84 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
}
+ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
+ InstallSnapshotRequestProto request, RaftPeerId leaderId) throws
IOException {
+ final long currentTerm;
+ final long leaderTerm = request.getLeaderTerm();
+ final TermIndex firstAvailableLogTermIndex = ServerProtoUtils.toTermIndex(
+ request.getFirstAvailableLogIndex());
+ final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
+
+ synchronized (this) {
+ final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+ currentTerm = state.getCurrentTerm();
+ if (!recognized) {
+ final InstallSnapshotReplyProto reply = ServerProtoUtils
+ .toInstallSnapshotReplyProto(leaderId, getId(), groupId,
currentTerm,
+ request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+ LOG.debug("{}: do not recognize leader for installing snapshot." +
+ " Reply: {}", getId(), reply);
+ return reply;
+ }
+ changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
+ state.setLeader(leaderId, "installSnapshot");
+
+
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
+
+ if (inProgressInstallSnapshotRequest.compareAndSet(null,
firstAvailableLogTermIndex)) {
+
+ // Check if snapshot index is already at par or ahead of the first
+ // available log index of the Leader.
+ long snapshotIndex = state.getSnapshotIndex();
+ if (snapshotIndex + 1 >= firstAvailableLogIndex) {
+ // State Machine has already installed the snapshot. Return the
+ // latest snapshot index to the Leader.
+
+
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+ final InstallSnapshotReplyProto reply =
ServerProtoUtils.toInstallSnapshotReplyProto(
+ leaderId, getId(), groupId, currentTerm,
+ InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+ LOG.info("{}: StateMachine latest installed snapshot index: {}.
Reply: {}",
+ getId(), snapshotIndex, reply);
+
+ return reply;
+ }
+
+ // This is the first installSnapshot notify request for this term and
+ // index. Notify the state machine to install the snapshot.
+ LOG.debug("{}: notifying state machine to install snapshot. Next log "
+
+ "index is {} but the leader's first available log index is
{}.",
+ getId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
+
+
stateMachine.notifyInstallSnapshotFromLeader(firstAvailableLogTermIndex)
+ .whenComplete((reply, exception) -> {
+ if (exception != null) {
+ LOG.error(getId() + ": State Machine failed to install
snapshot", exception);
+
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+ return;
+ }
+
+ if (reply != null) {
+ stateMachine.pause();
+ state.reloadStateMachine(reply.getIndex(), leaderTerm);
+ state.updateInstalledSnapshotIndex(reply);
+ }
+
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+ return;
+ });
+
+ LOG.info("{}: StateMachine notified to install snapshot, Request: {}");
+ return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
groupId,
+ currentTerm, InstallSnapshotResult.SUCCESS, -1);
+ }
+
+ LOG.debug("{}: StateMachine snapshot installation is in progress. " +
+ "InProgress Request: {}", getId(),
inProgressInstallSnapshotRequest.get());
+ return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
groupId,
+ currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
+ }
+ }
+
synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
RaftPeerId targetId, String requestId, int requestIndex,
SnapshotInfo snapshot, List<FileChunkProto> chunks, boolean done) {
@@ -1028,6 +1180,14 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
chunks, totalSize.getAsLong(), done);
}
+ synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
+ RaftPeerId targetId, TermIndex firstAvailableLogTermIndex) {
+
+ assert (firstAvailableLogTermIndex.getIndex() > 0);
+ return ServerProtoUtils.toInstallSnapshotRequestProto(getId(),
+ targetId, groupId, state.getCurrentTerm(), firstAvailableLogTermIndex);
+ }
+
synchronized RequestVoteRequestProto createRequestVoteRequest(
RaftPeerId targetId, long term, TermIndex lastEntry) {
return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId,
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 62c21e9..bb5a1fa 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -285,6 +285,19 @@ public interface ServerProtoUtils {
return builder.build();
}
+ static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
+ RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
+ long term, InstallSnapshotResult result, long installedSnapshotIndex) {
+ final RaftRpcReplyProto.Builder rb =
toRaftRpcReplyProtoBuilder(requestorId,
+ replyId, groupId, result == InstallSnapshotResult.SUCCESS);
+ final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
+ .newBuilder().setServerReply(rb).setTerm(term).setResult(result);
+ if (installedSnapshotIndex > 0) {
+ builder.setSnapshotIndex(installedSnapshotIndex);
+ }
+ return builder.build();
+ }
+
static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, String
requestId, int requestIndex,
long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
@@ -301,6 +314,17 @@ public interface ServerProtoUtils {
.setDone(done).build();
}
+ static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
+ RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
+ long leaderTerm, TermIndex firstAvailable) {
+ return InstallSnapshotRequestProto.newBuilder()
+ .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId,
groupId))
+ // .setRaftConfiguration() TODO: save and pass RaftConfiguration
+ .setLeaderTerm(leaderTerm)
+ .setFirstAvailableLogIndex(toTermIndexProto(firstAvailable))
+ .build();
+ }
+
static AppendEntriesReplyProto toAppendEntriesReplyProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long
term,
long followerCommit, long nextIndex, AppendResult result, long callId) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 0a343b2..8fb55f2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -412,6 +412,11 @@ public class ServerState implements Closeable {
request.getTermIndex());
}
+ void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
+ log.syncWithSnapshot(lastTermIndexInSnapshot.getIndex());
+ this.latestInstalledSnapshot = lastTermIndexInSnapshot;
+ }
+
SnapshotInfo getLatestSnapshot() {
return
server.getStateMachine().getStateMachineStorage().getLatestSnapshot();
}
@@ -420,6 +425,25 @@ public class ServerState implements Closeable {
return latestInstalledSnapshot;
}
+ /**
+ * The last index included in either the latestSnapshot or the
+ * latestInsalledSnapshot
+ * @return the last snapshot index
+ */
+ public long getSnapshotIndex() {
+ final long latestSnapshotIndex = getLatestSnapshot() != null ?
+ getLatestSnapshot().getIndex() : 0;
+ final long latestInstalledSnapshotIndex = latestInstalledSnapshot != null ?
+ latestInstalledSnapshot.getIndex() : 0;
+ return Math.max(latestSnapshotIndex, latestInstalledSnapshotIndex);
+ }
+
+ public long getNextIndex() {
+ final long logNextIndex = log.getNextIndex();
+ final long snapshotNextIndex = getSnapshotIndex() + 1;
+ return Math.max(logNextIndex, snapshotNextIndex);
+ }
+
public long getLastAppliedIndex() {
return stateMachineUpdater.getLastAppliedIndex();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 2c51afb..ab7897c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -23,7 +23,6 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
@@ -254,4 +253,17 @@ public interface StateMachine extends Closeable {
default CompletableFuture<Void> truncateStateMachineData(long index) {
return CompletableFuture.completedFuture(null);
}
+
+ /**
+ * Notify the Follower's state machine that the leader has purged entries
+ * from its log and hence to catch up, the Follower state machine would have
+ * to install the latest snapshot.
+ * @param firstTermIndexInLog TermIndex of the first append entry available
+ * in the Leader's log.
+ * @return After the snapshot installation is complete, return the last
+ * included term index in the snapshot.
+ */
+ default CompletableFuture<TermIndex>
notifyInstallSnapshotFromLeader(TermIndex firstTermIndexInLog) {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
index 78605b4..18501c7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
@@ -22,6 +22,7 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
+import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.AtomicFileOutputStream;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
@@ -130,4 +131,9 @@ public class SimpleStateMachineStorage implements
StateMachineStorage {
public SingleFileSnapshotInfo getLatestSnapshot() {
return currentSnapshot;
}
+
+ @VisibleForTesting
+ public File getSmDir() {
+ return smDir;
+ }
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 8a8401c..795c7b2 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -60,7 +60,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
static final Logger LOG =
LoggerFactory.getLogger(RaftSnapshotBaseTest.class);
private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
- static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex,
long endIndex) {
+ public static List<File> getSnapshotFiles(MiniRaftCluster cluster, long
startIndex, long endIndex) {
final RaftServerImpl leader = cluster.getLeader();
final SimpleStateMachineStorage storage =
SimpleStateMachine4Testing.get(leader).getStateMachineStorage();
final long term = leader.getState().getCurrentTerm();
@@ -70,7 +70,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
}
- static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
+ public static void assertLeaderContent(MiniRaftCluster cluster) throws
Exception {
final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
final RaftLog leaderLog = leader.getState().getLog();
final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
@@ -147,7 +147,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest
{
}
}
- static boolean exists(File f) {
+ public static boolean exists(File f) {
if (f.exists()) {
LOG.info("File exists: " + f);
return true;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 6306ce2..6a8c532 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -375,4 +375,8 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
LOG.info("{}: notifyExtendedNoLeader {}, {}", this, group, roleInfoProto);
leaderElectionTimeoutInfo = roleInfoProto;
}
+
+ protected File getSMdir() {
+ return storage.getSmDir();
+ }
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
new file mode 100644
index 0000000..e512262
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.ratis.BaseTest.ONE_SECOND;
+
+public class TestInstallSnapshotWithGrpc {
+ static {
+ LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ }
+
+ static final Logger LOG =
LoggerFactory.getLogger(TestInstallSnapshotWithGrpc.class);
+ private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
+ private static SingleFileSnapshotInfo leaderSnapshotInfo;
+
+ private MiniRaftCluster cluster;
+
+ private MiniRaftCluster.Factory<?> getFactory() {
+ return MiniRaftClusterWithGrpc.FACTORY;
+ }
+
+ @Before
+ public void setup() throws IOException {
+ final RaftProperties prop = new RaftProperties();
+ prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ TestInstallSnapshotWithGrpc.StateMachineForGRpcTest.class,
StateMachine.class);
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, false);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+ prop, SNAPSHOT_TRIGGER_THRESHOLD);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+ this.cluster = getFactory().newCluster(1, prop);
+ cluster.start();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private static class StateMachineForGRpcTest extends
+ SimpleStateMachine4Testing {
+ @Override
+ public CompletableFuture<TermIndex>
notifyInstallSnapshotFromLeader(TermIndex termIndex) {
+ try {
+ Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
+ File followerSnapshotFilePath = new File(getSMdir(),
+ leaderSnapshotFile.getFileName().toString());
+ Files.copy(leaderSnapshotFile, followerSnapshotFilePath.toPath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return
CompletableFuture.completedFuture(leaderSnapshotInfo.getTermIndex());
+ }
+ }
+
+ /**
+ * Basic test for install snapshot notification: start a one node cluster
+ * (disable install snapshot option) and let it generate a snapshot. Then
+ * delete the log and restart the node, and add more nodes as followers.
+ * The new follower nodes should get a install snapshot notification.
+ */
+ @Test
+ public void testInstallSnapshotNotification() throws Exception {
+ final List<RaftStorageDirectory.LogPathAndIndex> logs;
+ int i = 0;
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try(final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+ RaftClientReply
+ reply = client.send(new RaftTestUtil.SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+ }
+
+ // wait for the snapshot to be done
+ RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
+ .getStorage().getStorageDir();
+
+ final long nextIndex =
cluster.getLeader().getState().getLog().getNextIndex();
+ LOG.info("nextIndex = {}", nextIndex);
+ final List<File> snapshotFiles =
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+ nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+ JavaUtils.attempt(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+ 10, ONE_SECOND, "snapshotFile.exist", LOG);
+ logs = storageDirectory.getLogSegmentFiles();
+ } finally {
+ cluster.shutdown();
+ }
+
+ // delete the log segments from the leader
+ for (RaftStorageDirectory.LogPathAndIndex path : logs) {
+ FileUtils.delete(path.getPath());
+ }
+
+ // restart the peer
+ LOG.info("Restarting the cluster");
+ cluster.restart(false);
+ try {
+ RaftSnapshotBaseTest.assertLeaderContent(cluster);
+
+ // generate some more traffic
+ try(final RaftClient client =
cluster.createClient(cluster.getLeader().getId())) {
+ Assert.assertTrue(client.send(new RaftTestUtil.SimpleMessage("m" +
i)).isSuccess());
+ }
+
+ leaderSnapshotInfo = (SingleFileSnapshotInfo)
cluster.getLeader().getStateMachine().getLatestSnapshot();
+
+ // add two more peers
+ MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
+ new String[]{"s3", "s4"}, true);
+ // trigger setConfiguration
+ cluster.setConfiguration(change.allPeersInNewConf);
+
+ RaftServerTestUtil
+ .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+
+ // Check the installed snapshot index on each Follower matches with the
+ // leader snapshot.
+ for (RaftServerImpl follower : cluster.getFollowers()) {
+ follower.getState().getStorage().getStorageDir().getStateMachineDir();
+ Assert.assertEquals(leaderSnapshotInfo.getIndex(),
+ follower.getState().getLatestInstalledSnapshot().getIndex());
+ }
+
+ // restart the peer and check if it can correctly handle conf change
+ cluster.restartServer(cluster.getLeader().getId(), false);
+ RaftSnapshotBaseTest.assertLeaderContent(cluster);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}