This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9bd82aa2a RATIS-1960. Follower may be incorrectly marked as having
caught up (#983)
9bd82aa2a is described below
commit 9bd82aa2aa125a749b9357f208dd66533f43374c
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Sun Dec 10 02:03:47 2023 +0100
RATIS-1960. Follower may be incorrectly marked as having caught up (#983)
---
.../apache/ratis/server/impl/LeaderStateImpl.java | 53 ++++++++++++++--------
1 file changed, 34 insertions(+), 19 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 565cb116c..c6983e331 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -500,16 +500,25 @@ class LeaderStateImpl implements LeaderState {
peersToBootStrap, listenersToBootStrap, new
PeerConfiguration(peersInNewConf, listenersInNewConf));
Collection<RaftPeer> newPeers = configurationStagingState.getNewPeers();
Collection<RaftPeer> newListeners =
configurationStagingState.getNewListeners();
- // set the staging state
- this.stagingState = configurationStagingState;
-
- if (newPeers.isEmpty() && newListeners.isEmpty()) {
- applyOldNewConf();
+ Collection<RaftPeer> allNew = newListeners.isEmpty()
+ ? newPeers
+ : newPeers.isEmpty()
+ ? newListeners
+ : Stream.concat(newPeers.stream(), newListeners.stream())
+ .collect(Collectors.toList());
+
+ if (allNew.isEmpty()) {
+ applyOldNewConf(configurationStagingState);
} else {
// update the LeaderState's sender list
- addAndStartSenders(newPeers);
- addAndStartSenders(newListeners);
+ Collection<LogAppender> newAppenders = addSenders(allNew);
+
+ // set the staging state
+ stagingState = configurationStagingState;
+
+ newAppenders.forEach(LogAppender::start);
}
+
return pending;
}
@@ -579,14 +588,14 @@ class LeaderStateImpl implements LeaderState {
notifySenders();
}
- private void applyOldNewConf() {
+ private void applyOldNewConf(ConfigurationStagingState stage) {
final ServerState state = server.getState();
final RaftConfigurationImpl current = state.getRaftConf();
- final RaftConfigurationImpl oldNewConf=
stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
+ final long nextIndex = state.getLog().getNextIndex();
+ final RaftConfigurationImpl oldNewConf = stage.generateOldNewConf(current,
nextIndex);
// apply the (old, new) configuration to log, and use it as the current
conf
appendConfiguration(oldNewConf);
- this.stagingState = null;
notifySenders();
}
@@ -607,7 +616,7 @@ class LeaderStateImpl implements LeaderState {
@Override
public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo
follower,
List<LogEntryProto> entries, TermIndex previous, long callId) {
- final boolean initializing = isCaughtUp(follower);
+ final boolean initializing = !isCaughtUp(follower);
final RaftPeerId targetId = follower.getId();
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(),
targetId, currentTerm, entries,
ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(),
previous, entries.size()),
@@ -618,9 +627,13 @@ class LeaderStateImpl implements LeaderState {
* Update sender list for setConfiguration request
*/
private void addAndStartSenders(Collection<RaftPeer> newPeers) {
- if (!newPeers.isEmpty()) {
- addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX,
false).forEach(LogAppender::start);
- }
+ addSenders(newPeers).forEach(LogAppender::start);
+ }
+
+ private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers) {
+ return !newPeers.isEmpty()
+ ? addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false)
+ : Collections.emptyList();
}
private RaftPeer getPeer(RaftPeerId id) {
@@ -811,20 +824,22 @@ class LeaderStateImpl implements LeaderState {
} else {
final long commitIndex =
server.getState().getLog().getLastCommittedIndex();
// check progress for the new followers
- final EnumSet<BootStrapProgress> reports = getLogAppenders()
+ final List<FollowerInfoImpl> laggingFollowers = getLogAppenders()
.map(LogAppender::getFollower)
.filter(follower -> !isCaughtUp(follower))
+ .map(FollowerInfoImpl.class::cast)
+ .collect(Collectors.toList());
+ final EnumSet<BootStrapProgress> reports = laggingFollowers.stream()
.map(follower -> checkProgress(follower, commitIndex))
.collect(Collectors.toCollection(() ->
EnumSet.noneOf(BootStrapProgress.class)));
if (reports.contains(BootStrapProgress.NOPROGRESS)) {
stagingState.fail(BootStrapProgress.NOPROGRESS);
} else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
// all caught up!
- applyOldNewConf();
- getLogAppenders()
- .map(LogAppender::getFollower)
+ applyOldNewConf(stagingState);
+ this.stagingState = null;
+ laggingFollowers.stream()
.filter(f -> server.getRaftConf().containsInConf(f.getId()))
- .map(FollowerInfoImpl.class::cast)
.forEach(FollowerInfoImpl::catchUp);
}
}