This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1b54bfab0 RATIS-1881. In LeaderStateImpl, set placeHolderIndex from
the log. (#912)
1b54bfab0 is described below
commit 1b54bfab05e4f1775ea82b58c8140c4b2b6beb8c
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Aug 31 19:04:17 2023 -0700
RATIS-1881. In LeaderStateImpl, set placeHolderIndex from the log. (#912)
---
.../apache/ratis/server/impl/LeaderStateImpl.java | 73 ++++++++++++++--------
.../apache/ratis/server/impl/RaftServerImpl.java | 3 +-
2 files changed, 48 insertions(+), 28 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 6d57e68aa..ac8c3599f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -54,6 +54,7 @@ import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
@@ -288,6 +289,31 @@ class LeaderStateImpl implements LeaderState {
}
}
+ private class StartupLogEntry {
+ /** The log index at leader startup. */
+ private final long startIndex =
appendConfiguration(RaftConfigurationImpl.newBuilder()
+ .setConf(server.getRaftConf().getConf())
+ .setLogEntryIndex(raftLog.getNextIndex())
+ .build());
+ /** This future will be completed after the log entry is applied. */
+ private final CompletableFuture<Long> appliedIndexFuture = new
CompletableFuture<>();
+
+ boolean isApplied(LogEntryProto logEntry) {
+ if (appliedIndexFuture.isDone()) {
+ return true;
+ }
+ final long appliedIndex = logEntry != null? logEntry.getIndex():
server.getState().getLastAppliedIndex();
+ if (appliedIndex >= startIndex) {
+ appliedIndexFuture.complete(appliedIndex);
+ LOG.info("leader is ready since appliedIndex == {} >= startIndex ==
{}",
+ appliedIndex, startIndex);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
private final StateUpdateEvent updateCommitEvent =
new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1,
this::updateCommit);
private final StateUpdateEvent checkStagingEvent =
@@ -311,11 +337,11 @@ class LeaderStateImpl implements LeaderState {
private final PendingRequests pendingRequests;
private final WatchRequests watchRequests;
private final MessageStreamRequests messageStreamRequests;
+
+ private final MemoizedSupplier<StartupLogEntry> startupLogEntry =
MemoizedSupplier.valueOf(StartupLogEntry::new);
private final AtomicBoolean isStopped = new AtomicBoolean();
private final int stagingCatchupGap;
- private final long placeHolderIndex;
- private final AtomicBoolean isReady = new AtomicBoolean();
private final RaftServerMetricsImpl raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
private final long followerMaxGapThreshold;
@@ -357,38 +383,36 @@ class LeaderStateImpl implements LeaderState {
final RaftConfigurationImpl conf = state.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
- placeHolderIndex = raftLog.getNextIndex();
+ final long nextIndex = raftLog.getNextIndex();
senders = new SenderList();
- addSenders(others, placeHolderIndex, true);
+ addSenders(others, nextIndex, true);
final Collection<RaftPeer> listeners =
conf.getAllPeers(RaftPeerRole.LISTENER);
if (!listeners.isEmpty()) {
- addSenders(listeners, placeHolderIndex, true);
+ addSenders(listeners, nextIndex, true);
}
}
- LogEntryProto start() {
+ void start() {
// In the beginning of the new term, replicate a conf entry in order
// to finally commit entries in the previous term.
// Also this message can help identify the last committed index and the
conf.
- final LogEntryProto placeHolder = LogProtoUtils.toLogEntryProto(
- server.getRaftConf(), server.getState().getCurrentTerm(),
raftLog.getNextIndex());
CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
server.getId().toString(), null);
- raftLog.append(Collections.singletonList(placeHolder));
+ // Initialize startup log entry and append it to the RaftLog
+ startupLogEntry.get();
processor.start();
senders.forEach(LogAppender::start);
- return placeHolder;
}
boolean isReady() {
- return isReady.get();
+ return startupLogEntry.isInitialized() &&
startupLogEntry.get().isApplied(null);
}
void checkReady(LogEntryProto entry) {
- if (entry.getTerm() == currentTerm && entry.getIndex() ==
placeHolderIndex) {
- isReady.set(true);
+ Preconditions.assertTrue(startupLogEntry.isInitialized());
+ if (entry.getTerm() == getCurrentTerm() &&
startupLogEntry.get().isApplied(entry)) {
server.getStateMachine().leaderEvent().notifyLeaderReady();
}
}
@@ -427,13 +451,10 @@ class LeaderStateImpl implements LeaderState {
}
long getCurrentTerm() {
+ Preconditions.assertSame(currentTerm, server.getState().getCurrentTerm(),
"currentTerm");
return currentTerm;
}
- TermIndex getLastEntry() {
- return server.getState().getLastEntry();
- }
-
@Override
public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) {
@@ -546,16 +567,17 @@ class LeaderStateImpl implements LeaderState {
final RaftConfigurationImpl current = state.getRaftConf();
final RaftConfigurationImpl oldNewConf=
stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
// apply the (old, new) configuration to log, and use it as the current
conf
- long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
- updateConfiguration(index, oldNewConf);
+ appendConfiguration(oldNewConf);
this.stagingState = null;
notifySenders();
}
- private void updateConfiguration(long logIndex, RaftConfigurationImpl
newConf) {
- Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex());
- server.getState().setRaftConf(newConf);
+ private long appendConfiguration(RaftConfigurationImpl conf) {
+ final long logIndex = raftLog.append(getCurrentTerm(), conf);
+ Preconditions.assertSame(conf.getLogEntryIndex(), logIndex,
"confLogIndex");
+ server.getState().setRaftConf(conf);
+ return logIndex;
}
void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto>
protos) {
@@ -958,8 +980,7 @@ class LeaderStateImpl implements LeaderState {
.build();
// stop the LogAppender if the corresponding follower and listener is no
longer in the conf
updateSenders(newConf);
- long index = raftLog.append(server.getState().getCurrentTerm(), newConf);
- updateConfiguration(index, newConf);
+ appendConfiguration(newConf);
notifySenders();
}
@@ -1009,7 +1030,7 @@ class LeaderStateImpl implements LeaderState {
highestPriorityInfos.add(logAppender);
}
}
- final TermIndex leaderLastEntry = getLastEntry();
+ final TermIndex leaderLastEntry = server.getState().getLastEntry();
final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos,
leaderLastEntry);
if (appender != null) {
server.getTransferLeadership().start(appender);
@@ -1077,7 +1098,7 @@ class LeaderStateImpl implements LeaderState {
}
// leader has not committed any entries in this term, reject
- if (server.getRaftLog().getTermIndex(readIndex).getTerm() !=
server.getState().getCurrentTerm()) {
+ if (server.getRaftLog().getTermIndex(readIndex).getTerm() !=
getCurrentTerm()) {
return JavaUtils.completeExceptionally(new ReadIndexException(
"Failed to getReadIndex " + readIndex + " since the term is not yet
committed.",
new LeaderNotReadyException(server.getMemberId())));
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 173bd1c70..3b225dc3c 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -607,8 +607,7 @@ class RaftServerImpl implements RaftServer.Division,
state.becomeLeader();
// start sending AppendEntries RPC to followers
- final LogEntryProto e = leader.start();
- getState().setRaftConf(e);
+ leader.start();
}
@Override