This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c16a811 RATIS-1128. Update Configuration on InstallSnapshot (#253)
c16a811 is described below
commit c16a811cc3472767d923f52aa40bddca707ef49e
Author: Hanisha Koneru <[email protected]>
AuthorDate: Tue Nov 3 22:50:26 2020 -0800
RATIS-1128. Update Configuration on InstallSnapshot (#253)
---
ratis-proto/src/main/proto/Raft.proto | 2 ++
.../ratis/server/impl/RaftServerConstants.java | 1 +
.../apache/ratis/server/impl/RaftServerImpl.java | 31 +++++++++++++++++-----
.../apache/ratis/server/impl/ServerProtoUtils.java | 16 ++++++++---
4 files changed, 40 insertions(+), 10 deletions(-)
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 3a19f70..e99019b 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -202,6 +202,8 @@ message InstallSnapshotRequestProto {
SnapshotChunkProto snapshotChunk = 3;
NotificationProto notification = 4;
}
+
+ LogEntryProto lastRaftConfigurationLogEntryProto = 5;
}
message InstallSnapshotReplyProto {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index 0061ef4..15d3d3e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -24,6 +24,7 @@ public final class RaftServerConstants {
@Deprecated
public static final long INVALID_LOG_INDEX = RaftLog.INVALID_LOG_INDEX;
public static final long DEFAULT_CALLID = 0;
+ public static final long DEFAULT_TERM = 0;
private RaftServerConstants() {
//Never constructed
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5143f31..c30f017 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1255,25 +1255,44 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
assertGroup(leaderId, leaderGroupId);
+ InstallSnapshotReplyProto reply = null;
// Check if install snapshot from Leader is enabled
if (installSnapshotEnabled) {
// Leader has sent InstallSnapshot request with SnapshotInfo. Install
the snapshot.
if (request.hasSnapshotChunk()) {
- return checkAndInstallSnapshot(request, leaderId);
+ reply = checkAndInstallSnapshot(request, leaderId);
}
} else {
// Leader has only sent a notification to install snapshot. Inform State
Machine to install snapshot.
if (request.hasNotification()) {
- return notifyStateMachineToInstallSnapshot(request, leaderId);
+ reply = notifyStateMachineToInstallSnapshot(request, leaderId);
}
}
+
+ if (reply != null) {
+ if (request.hasLastRaftConfigurationLogEntryProto()) {
+ // Set the configuration included in the snapshot
+ LogEntryProto newConfLogEntryProto =
+ request.getLastRaftConfigurationLogEntryProto();
+ LOG.info("{}: set new configuration {} from snapshot", getMemberId(),
+ newConfLogEntryProto);
+
+ state.setRaftConf(newConfLogEntryProto);
+ state.writeRaftConfiguration(newConfLogEntryProto);
+ stateMachine.notifyConfigurationChange(newConfLogEntryProto.getTerm(),
+ newConfLogEntryProto.getIndex(),
+ newConfLogEntryProto.getConfigurationEntry());
+ }
+ return reply;
+ }
+
// There is a mismatch between configurations on leader and follower.
- final InstallSnapshotReplyProto reply =
ServerProtoUtils.toInstallSnapshotReplyProto(
+ final InstallSnapshotReplyProto failedReply =
ServerProtoUtils.toInstallSnapshotReplyProto(
leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH);
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but
follower {} has it set to {}",
getMemberId(),
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
leaderId, request.hasSnapshotChunk(), getId(), installSnapshotEnabled);
- return reply;
+ return failedReply;
}
private InstallSnapshotReplyProto checkAndInstallSnapshot(
@@ -1411,14 +1430,14 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
assert totalSize.isPresent();
return ServerProtoUtils.toInstallSnapshotRequestProto(getMemberId(),
targetId,
requestId, requestIndex, state.getCurrentTerm(),
snapshot.getTermIndex(),
- chunks, totalSize.getAsLong(), done);
+ chunks, totalSize.getAsLong(), done, getRaftConf());
}
synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
RaftPeerId targetId, TermIndex firstAvailableLogTermIndex) {
assert (firstAvailableLogTermIndex.getIndex() > 0);
return ServerProtoUtils.toInstallSnapshotRequestProto(getMemberId(),
targetId,
- state.getCurrentTerm(), firstAvailableLogTermIndex);
+ state.getCurrentTerm(), firstAvailableLogTermIndex, getRaftConf());
}
synchronized RequestVoteRequestProto createRequestVoteRequest(
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 143655c..a3c29c2 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -38,6 +38,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_TERM;
/** Server proto utilities for internal use. */
public interface ServerProtoUtils {
@@ -375,7 +376,7 @@ public interface ServerProtoUtils {
static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
RaftGroupMemberId requestorId, RaftPeerId replyId, String requestId, int
requestIndex,
long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
- long totalSize, boolean done) {
+ long totalSize, boolean done, RaftConfiguration raftConfiguration) {
final InstallSnapshotRequestProto.SnapshotChunkProto.Builder
snapshotChunkProto =
InstallSnapshotRequestProto.SnapshotChunkProto.newBuilder()
.setRequestId(requestId)
@@ -384,22 +385,29 @@ public interface ServerProtoUtils {
.addAllFileChunks(chunks)
.setTotalSize(totalSize)
.setDone(done);
+ // Set term to DEFAULT_TERM as this term is not going to used by
installSnapshot to update the RaftConfiguration
+ final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration,
DEFAULT_TERM,
+ raftConfiguration.getLogEntryIndex());
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
- // .setRaftConfiguration() TODO: save and pass RaftConfiguration
+ .setLastRaftConfigurationLogEntryProto(confLogEntryProto)
.setLeaderTerm(term)
.setSnapshotChunk(snapshotChunkProto)
.build();
}
static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
- RaftGroupMemberId requestorId, RaftPeerId replyId, long leaderTerm,
TermIndex firstAvailable) {
+ RaftGroupMemberId requestorId, RaftPeerId replyId, long leaderTerm,
+ TermIndex firstAvailable, RaftConfiguration raftConfiguration) {
final InstallSnapshotRequestProto.NotificationProto.Builder
notificationProto =
InstallSnapshotRequestProto.NotificationProto.newBuilder()
.setFirstAvailableTermIndex(toTermIndexProto(firstAvailable));
+ // Set term to DEFAULT_TERM as this term is not going to used by
installSnapshot to update the RaftConfiguration
+ final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration,
DEFAULT_TERM,
+ raftConfiguration.getLogEntryIndex());
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
- // .setRaftConfiguration() TODO: save and pass RaftConfiguration
+ .setLastRaftConfigurationLogEntryProto(confLogEntryProto)
.setLeaderTerm(leaderTerm)
.setNotification(notificationProto)
.build();