This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b54bfab0 RATIS-1881. In LeaderStateImpl, set placeHolderIndex from 
the log. (#912)
1b54bfab0 is described below

commit 1b54bfab05e4f1775ea82b58c8140c4b2b6beb8c
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Aug 31 19:04:17 2023 -0700

    RATIS-1881. In LeaderStateImpl, set placeHolderIndex from the log. (#912)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 73 ++++++++++++++--------
 .../apache/ratis/server/impl/RaftServerImpl.java   |  3 +-
 2 files changed, 48 insertions(+), 28 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 6d57e68aa..ac8c3599f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -54,6 +54,7 @@ import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
@@ -288,6 +289,31 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
+  private class StartupLogEntry {
+    /** The log index at leader startup. */
+    private final long startIndex = 
appendConfiguration(RaftConfigurationImpl.newBuilder()
+        .setConf(server.getRaftConf().getConf())
+        .setLogEntryIndex(raftLog.getNextIndex())
+        .build());
+    /** This future will be completed after the log entry is applied. */
+    private final CompletableFuture<Long> appliedIndexFuture = new 
CompletableFuture<>();
+
+    boolean isApplied(LogEntryProto logEntry) {
+      if (appliedIndexFuture.isDone()) {
+        return true;
+      }
+      final long appliedIndex = logEntry != null? logEntry.getIndex(): 
server.getState().getLastAppliedIndex();
+      if (appliedIndex >= startIndex) {
+        appliedIndexFuture.complete(appliedIndex);
+        LOG.info("leader is ready since appliedIndex == {} >= startIndex == 
{}",
+            appliedIndex, startIndex);
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
   private final StateUpdateEvent updateCommitEvent =
       new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, 
this::updateCommit);
   private final StateUpdateEvent checkStagingEvent =
@@ -311,11 +337,11 @@ class LeaderStateImpl implements LeaderState {
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
   private final MessageStreamRequests messageStreamRequests;
+
+  private final MemoizedSupplier<StartupLogEntry> startupLogEntry = 
MemoizedSupplier.valueOf(StartupLogEntry::new);
   private final AtomicBoolean isStopped = new AtomicBoolean();
 
   private final int stagingCatchupGap;
-  private final long placeHolderIndex;
-  private final AtomicBoolean isReady = new AtomicBoolean();
   private final RaftServerMetricsImpl raftServerMetrics;
   private final LogAppenderMetrics logAppenderMetrics;
   private final long followerMaxGapThreshold;
@@ -357,38 +383,36 @@ class LeaderStateImpl implements LeaderState {
 
     final RaftConfigurationImpl conf = state.getRaftConf();
     Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
-    placeHolderIndex = raftLog.getNextIndex();
 
+    final long nextIndex = raftLog.getNextIndex();
     senders = new SenderList();
-    addSenders(others, placeHolderIndex, true);
+    addSenders(others, nextIndex, true);
 
     final Collection<RaftPeer> listeners = 
conf.getAllPeers(RaftPeerRole.LISTENER);
     if (!listeners.isEmpty()) {
-      addSenders(listeners, placeHolderIndex, true);
+      addSenders(listeners, nextIndex, true);
     }
   }
 
-  LogEntryProto start() {
+  void start() {
     // In the beginning of the new term, replicate a conf entry in order
     // to finally commit entries in the previous term.
     // Also this message can help identify the last committed index and the 
conf.
-    final LogEntryProto placeHolder = LogProtoUtils.toLogEntryProto(
-        server.getRaftConf(), server.getState().getCurrentTerm(), 
raftLog.getNextIndex());
     CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
         server.getId().toString(), null);
-    raftLog.append(Collections.singletonList(placeHolder));
+    // Initialize startup log entry and append it to the RaftLog
+    startupLogEntry.get();
     processor.start();
     senders.forEach(LogAppender::start);
-    return placeHolder;
   }
 
   boolean isReady() {
-    return isReady.get();
+    return startupLogEntry.isInitialized() && 
startupLogEntry.get().isApplied(null);
   }
 
   void checkReady(LogEntryProto entry) {
-    if (entry.getTerm() == currentTerm && entry.getIndex() == 
placeHolderIndex) {
-      isReady.set(true);
+    Preconditions.assertTrue(startupLogEntry.isInitialized());
+    if (entry.getTerm() == getCurrentTerm() && 
startupLogEntry.get().isApplied(entry)) {
       server.getStateMachine().leaderEvent().notifyLeaderReady();
     }
   }
@@ -427,13 +451,10 @@ class LeaderStateImpl implements LeaderState {
   }
 
   long getCurrentTerm() {
+    Preconditions.assertSame(currentTerm, server.getState().getCurrentTerm(), 
"currentTerm");
     return currentTerm;
   }
 
-  TermIndex getLastEntry() {
-    return server.getState().getLastEntry();
-  }
-
   @Override
   public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
     if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) {
@@ -546,16 +567,17 @@ class LeaderStateImpl implements LeaderState {
     final RaftConfigurationImpl current = state.getRaftConf();
     final RaftConfigurationImpl oldNewConf= 
stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
     // apply the (old, new) configuration to log, and use it as the current 
conf
-    long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
-    updateConfiguration(index, oldNewConf);
+    appendConfiguration(oldNewConf);
 
     this.stagingState = null;
     notifySenders();
   }
 
-  private void updateConfiguration(long logIndex, RaftConfigurationImpl 
newConf) {
-    Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex());
-    server.getState().setRaftConf(newConf);
+  private long appendConfiguration(RaftConfigurationImpl conf) {
+    final long logIndex = raftLog.append(getCurrentTerm(), conf);
+    Preconditions.assertSame(conf.getLogEntryIndex(), logIndex, 
"confLogIndex");
+    server.getState().setRaftConf(conf);
+    return logIndex;
   }
 
   void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> 
protos) {
@@ -958,8 +980,7 @@ class LeaderStateImpl implements LeaderState {
         .build();
     // stop the LogAppender if the corresponding follower and listener is no 
longer in the conf
     updateSenders(newConf);
-    long index = raftLog.append(server.getState().getCurrentTerm(), newConf);
-    updateConfiguration(index, newConf);
+    appendConfiguration(newConf);
     notifySenders();
   }
 
@@ -1009,7 +1030,7 @@ class LeaderStateImpl implements LeaderState {
         highestPriorityInfos.add(logAppender);
       }
     }
-    final TermIndex leaderLastEntry = getLastEntry();
+    final TermIndex leaderLastEntry = server.getState().getLastEntry();
     final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos, 
leaderLastEntry);
     if (appender != null) {
       server.getTransferLeadership().start(appender);
@@ -1077,7 +1098,7 @@ class LeaderStateImpl implements LeaderState {
     }
 
     // leader has not committed any entries in this term, reject
-    if (server.getRaftLog().getTermIndex(readIndex).getTerm() != 
server.getState().getCurrentTerm()) {
+    if (server.getRaftLog().getTermIndex(readIndex).getTerm() != 
getCurrentTerm()) {
       return JavaUtils.completeExceptionally(new ReadIndexException(
           "Failed to getReadIndex " + readIndex + " since the term is not yet 
committed.",
           new LeaderNotReadyException(server.getMemberId())));
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 173bd1c70..3b225dc3c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -607,8 +607,7 @@ class RaftServerImpl implements RaftServer.Division,
     state.becomeLeader();
 
     // start sending AppendEntries RPC to followers
-    final LogEntryProto e = leader.start();
-    getState().setRaftConf(e);
+    leader.start();
   }
 
   @Override

Reply via email to