This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd82aa2a RATIS-1960. Follower may be incorrectly marked as having 
caught up (#983)
9bd82aa2a is described below

commit 9bd82aa2aa125a749b9357f208dd66533f43374c
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Sun Dec 10 02:03:47 2023 +0100

    RATIS-1960. Follower may be incorrectly marked as having caught up (#983)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 53 ++++++++++++++--------
 1 file changed, 34 insertions(+), 19 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 565cb116c..c6983e331 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -500,16 +500,25 @@ class LeaderStateImpl implements LeaderState {
         peersToBootStrap, listenersToBootStrap, new 
PeerConfiguration(peersInNewConf, listenersInNewConf));
     Collection<RaftPeer> newPeers = configurationStagingState.getNewPeers();
     Collection<RaftPeer> newListeners = 
configurationStagingState.getNewListeners();
-    // set the staging state
-    this.stagingState = configurationStagingState;
-
-    if (newPeers.isEmpty() && newListeners.isEmpty()) {
-      applyOldNewConf();
+    Collection<RaftPeer> allNew = newListeners.isEmpty()
+        ? newPeers
+        : newPeers.isEmpty()
+            ? newListeners
+            : Stream.concat(newPeers.stream(), newListeners.stream())
+                .collect(Collectors.toList());
+
+    if (allNew.isEmpty()) {
+      applyOldNewConf(configurationStagingState);
     } else {
       // update the LeaderState's sender list
-      addAndStartSenders(newPeers);
-      addAndStartSenders(newListeners);
+      Collection<LogAppender> newAppenders = addSenders(allNew);
+
+      // set the staging state
+      stagingState = configurationStagingState;
+
+      newAppenders.forEach(LogAppender::start);
     }
+
     return pending;
   }
 
@@ -579,14 +588,14 @@ class LeaderStateImpl implements LeaderState {
     notifySenders();
   }
 
-  private void applyOldNewConf() {
+  private void applyOldNewConf(ConfigurationStagingState stage) {
     final ServerState state = server.getState();
     final RaftConfigurationImpl current = state.getRaftConf();
-    final RaftConfigurationImpl oldNewConf= 
stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
+    final long nextIndex = state.getLog().getNextIndex();
+    final RaftConfigurationImpl oldNewConf = stage.generateOldNewConf(current, 
nextIndex);
     // apply the (old, new) configuration to log, and use it as the current 
conf
     appendConfiguration(oldNewConf);
 
-    this.stagingState = null;
     notifySenders();
   }
 
@@ -607,7 +616,7 @@ class LeaderStateImpl implements LeaderState {
   @Override
   public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo 
follower,
       List<LogEntryProto> entries, TermIndex previous, long callId) {
-    final boolean initializing = isCaughtUp(follower);
+    final boolean initializing = !isCaughtUp(follower);
     final RaftPeerId targetId = follower.getId();
     return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), 
targetId, currentTerm, entries,
         ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), 
previous, entries.size()),
@@ -618,9 +627,13 @@ class LeaderStateImpl implements LeaderState {
    * Update sender list for setConfiguration request
    */
   private void addAndStartSenders(Collection<RaftPeer> newPeers) {
-    if (!newPeers.isEmpty()) {
-      addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, 
false).forEach(LogAppender::start);
-    }
+    addSenders(newPeers).forEach(LogAppender::start);
+  }
+
+  private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers) {
+    return !newPeers.isEmpty()
+        ? addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false)
+        : Collections.emptyList();
   }
 
   private RaftPeer getPeer(RaftPeerId id) {
@@ -811,20 +824,22 @@ class LeaderStateImpl implements LeaderState {
     } else {
       final long commitIndex = 
server.getState().getLog().getLastCommittedIndex();
       // check progress for the new followers
-      final EnumSet<BootStrapProgress> reports = getLogAppenders()
+      final List<FollowerInfoImpl> laggingFollowers = getLogAppenders()
           .map(LogAppender::getFollower)
           .filter(follower -> !isCaughtUp(follower))
+          .map(FollowerInfoImpl.class::cast)
+          .collect(Collectors.toList());
+      final EnumSet<BootStrapProgress> reports = laggingFollowers.stream()
           .map(follower -> checkProgress(follower, commitIndex))
           .collect(Collectors.toCollection(() -> 
EnumSet.noneOf(BootStrapProgress.class)));
       if (reports.contains(BootStrapProgress.NOPROGRESS)) {
         stagingState.fail(BootStrapProgress.NOPROGRESS);
       } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
         // all caught up!
-        applyOldNewConf();
-        getLogAppenders()
-            .map(LogAppender::getFollower)
+        applyOldNewConf(stagingState);
+        this.stagingState = null;
+        laggingFollowers.stream()
             .filter(f -> server.getRaftConf().containsInConf(f.getId()))
-            .map(FollowerInfoImpl.class::cast)
             .forEach(FollowerInfoImpl::catchUp);
       }
     }

Reply via email to