This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7167faf RATIS-1481. make state upgradate in
notifyStateMachineToInstallSnapshot serialized (#573)
7167faf is described below
commit 7167fafe75f8c50798561a135405caa984edc7e3
Author: Nibiru <[email protected]>
AuthorDate: Wed Mar 30 13:45:37 2022 +0800
RATIS-1481. make state upgradate in notifyStateMachineToInstallSnapshot
serialized (#573)
---
.../org/apache/ratis/server/impl/ServerState.java | 2 +-
.../server/impl/SnapshotInstallationHandler.java | 112 +++++++++++----------
2 files changed, 59 insertions(+), 55 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index f05776a..b66ba73 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -470,7 +470,7 @@ class ServerState implements Closeable {
long getNextIndex() {
final long logNextIndex = log.getNextIndex();
- final long snapshotNextIndex = getSnapshotIndex() + 1;
+ final long snapshotNextIndex = log.getSnapshotIndex() + 1;
return Math.max(logNextIndex, snapshotNextIndex);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index ad6f5a4..6c24d2f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.LifeCycle;
@@ -44,10 +45,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
class SnapshotInstallationHandler {
static final Logger LOG =
LoggerFactory.getLogger(SnapshotInstallationHandler.class);
@@ -57,7 +57,7 @@ class SnapshotInstallationHandler {
private final boolean installSnapshotEnabled;
private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong();
- private final AtomicLong installedSnapshotIndex = new AtomicLong();
+ private final AtomicReference<TermIndex> installedSnapshotTermIndex = new
AtomicReference<>(TermIndex.valueOf(0,0));
private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
SnapshotInstallationHandler(RaftServerImpl server, RaftProperties
properties) {
@@ -120,12 +120,13 @@ class SnapshotInstallationHandler {
if (request.hasLastRaftConfigurationLogEntryProto()) {
// Set the configuration included in the snapshot
final LogEntryProto proto =
request.getLastRaftConfigurationLogEntryProto();
- LOG.info("{}: set new configuration {} from snapshot", getMemberId(),
proto);
-
- state.setRaftConf(proto);
- state.writeRaftConfiguration(proto);
- server.getStateMachine().event().notifyConfigurationChanged(
- proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry());
+ if
(!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
+ LOG.info("{}: set new configuration {} from snapshot",
getMemberId(), proto);
+ state.setRaftConf(proto);
+ state.writeRaftConfiguration(proto);
+ server.getStateMachine().event().notifyConfigurationChanged(
+ proto.getTerm(), proto.getIndex(),
proto.getConfigurationEntry());
+ }
}
return reply;
}
@@ -203,13 +204,13 @@ class SnapshotInstallationHandler {
}
server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
- long snapshotIndex = state.getSnapshotIndex();
-
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
+
if (inProgressInstallSnapshotIndex.compareAndSet(0,
firstAvailableLogIndex)) {
LOG.info("{}: Received notification to install snapshot at index {}",
getMemberId(), firstAvailableLogIndex);
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
+ final long snapshotIndex = state.getLog().getSnapshotIndex();
if (snapshotIndex + 1 >= firstAvailableLogIndex &&
firstAvailableLogIndex > 0) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
@@ -241,44 +242,37 @@ class SnapshotInstallationHandler {
// index. Notify the state machine to install the snapshot.
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's
first available index is {}.",
getMemberId(), state.getLog().getNextIndex(),
firstAvailableLogIndex);
- try {
-
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto,
firstAvailableLogTermIndex)
- .whenComplete((reply, exception) -> {
- if (exception != null) {
- LOG.warn("{}: Failed to notify StateMachine to
InstallSnapshot. Exception: {}",
- getMemberId(), exception.getMessage());
-
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 0);
- return;
- }
+ // If notifyInstallSnapshotFromLeader future is done asynchronously,
the main thread will go through the
+ // downside part. As the time consumed by user-defined statemachine is
uncontrollable(e.g. the RocksDB
+ // checkpoint could be constantly increasing, the transmission will
always exceed one boundary), we expect that
+ // once snapshot installed, follower could work ASAP. For the rest of
time, server can respond snapshot
+ // installation in progress.
- if (reply != null) {
- LOG.info("{}: StateMachine successfully installed snapshot
index {}. Reloading the StateMachine.",
- getMemberId(), reply.getIndex());
- server.getStateMachine().pause();
- state.updateInstalledSnapshotIndex(reply);
- state.reloadStateMachine(reply.getIndex());
- installedSnapshotIndex.set(reply.getIndex());
- } else {
- isSnapshotNull.set(true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: StateMachine could not install snapshot as
it is not available", this);
- }
+ // There is another appendLog thread appending raft entries, which
returns inconsistency entries with
+ // nextIndex and commitIndex to the leader when install snapshot in
progress. The nextIndex on follower side
+ // is updated when state.reloadStateMachine. We shall keep this index
upgraded synchronously with main thread,
+ // otherwise leader could get this follower's latest nextIndex from
appendEntries instead of after
+ // acknowledging the SNAPSHOT_INSTALLED.
+
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto,
firstAvailableLogTermIndex)
+ .whenComplete((reply, exception) -> {
+ if (exception != null) {
+ LOG.error("{}: Failed to notify StateMachine to
InstallSnapshot. Exception: {}",
+ getMemberId(), exception.getMessage());
+
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 0);
+ return;
+ }
+
+ if (reply != null) {
+ LOG.info("{}: StateMachine successfully installed snapshot
index {}. Reloading the StateMachine.",
+ getMemberId(), reply.getIndex());
+ installedSnapshotTermIndex.set(reply);
+ } else {
+ isSnapshotNull.set(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: StateMachine could not install snapshot as it
is not available", this);
}
- // wait for 1 seconds for statemachine to install snapshot
- }).get(1, TimeUnit.SECONDS);
- } catch (InterruptedException | TimeoutException t) {
- //nothing to do
- } catch (Exception t) {
- // there are two cases:
- //1 `get()` may throw ExecutionException if `whenComplete` throw an
exception
- //2 when generating completeFuture,
`statemachine#notifyInstallSnapshotFromLeader`
- // may throw an uncertain exception, which is determined by the
implementation of
- // user statemachine.
- inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex,
0);
- final String err = getMemberId() + ": Failed to notify StateMachine
to InstallSnapshot.";
- LOG.warn(err + " " + t);
- throw new IOException(err, t);
- }
+ }
+ });
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is processing Snapshot Installation
Request.", getMemberId());
@@ -289,24 +283,34 @@ class SnapshotInstallationHandler {
}
}
+ final long inProgressInstallSnapshotIndexValue =
getInProgressInstallSnapshotIndex();
+ Preconditions.assertTrue(
+ inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex &&
inProgressInstallSnapshotIndexValue > 0,
+ "inProgressInstallSnapshotRequest: %s is not eligible,
firstAvailableLogIndex: %s",
+ getInProgressInstallSnapshotIndex(), firstAvailableLogIndex);
+
// If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
if (isSnapshotNull.compareAndSet(true, false)) {
LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
- inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex,
0);
+ inProgressInstallSnapshotIndex.set(0);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
}
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the
installed snapshot index and reset
- // installedSnapshotIndex to 0.
- long latestInstalledSnapshotIndex =
this.installedSnapshotIndex.getAndSet(0);
- if (latestInstalledSnapshotIndex > 0) {
+ // installedSnapshotIndex to (0,0).
+ final TermIndex latestInstalledSnapshotTermIndex =
this.installedSnapshotTermIndex
+ .getAndSet(TermIndex.valueOf(0,0));
+ if (latestInstalledSnapshotTermIndex.getIndex() > 0) {
+ server.getStateMachine().pause();
+ state.updateInstalledSnapshotIndex(latestInstalledSnapshotTermIndex);
+ state.reloadStateMachine(latestInstalledSnapshotTermIndex.getIndex());
LOG.info("{}: InstallSnapshot notification result: {}, at index: {}",
getMemberId(),
- InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotIndex);
- inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex,
0);
+ InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotTermIndex);
+ inProgressInstallSnapshotIndex.set(0);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
- currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotIndex);
+ currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotTermIndex.getIndex());
}
// Otherwise, Snapshot installation is in progress.