OneSizeFitsQuorum commented on code in PR #1168:
URL: https://github.com/apache/ratis/pull/1168#discussion_r1802452911
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java:
##########
@@ -98,6 +99,7 @@ boolean isCurrentLeaderValid() {
CompletableFuture<Void> stopRunning() {
this.isRunning = false;
interrupt();
+ stopped.complete(null);
Review Comment:
remove this line
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -565,16 +565,40 @@ void setFirstElection(Object reason) {
}
}
+ static class Pair<U, V> {
+ private final U first;
+ private final V second;
+
+ Pair(U metadataUpdated, V future) {
+ this.first = metadataUpdated;
+ this.second = future;
+ }
+
+ public static <U, V> Pair<U, V> makePair(U first, V second) {
+ return new Pair<>(first, second);
+ }
+
+ U first() {
+ return first;
+ }
+
+ V second() {
+ return second;
+ }
+
Review Comment:
remove empty line
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1582,46 +1620,51 @@ private CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(RaftPeerId
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
final Timekeeper.Context timer =
raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
- synchronized (this) {
- // Check life cycle state again to avoid the PAUSING/PAUSED state.
- assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
- currentTerm = state.getCurrentTerm();
- final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES,
leaderId, leaderTerm);
- if (!recognized) {
- return CompletableFuture.completedFuture(toAppendEntriesReplyProto(
- leaderId, getMemberId(), currentTerm, followerCommit,
state.getNextIndex(),
- AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX,
isHeartbeat));
- }
- try {
- changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
- } catch (IOException e) {
- return JavaUtils.completeExceptionally(e);
- }
- state.setLeader(leaderId, "appendEntries");
+ Optional<CompletableFuture<Void>> future = Optional.empty();
+ try {
+ synchronized (this) {
+ // Check life cycle state again to avoid the PAUSING/PAUSED state.
+ assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
+ currentTerm = state.getCurrentTerm();
+ final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES,
leaderId, leaderTerm);
+ if (!recognized) {
+ return CompletableFuture.completedFuture(toAppendEntriesReplyProto(
+ leaderId, getMemberId(), currentTerm, followerCommit,
state.getNextIndex(),
+ AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX,
isHeartbeat));
+ }
+ try {
+ future =
Optional.ofNullable(changeToFollowerAndPersistMetadata(leaderTerm, true,
"appendEntries"));
+ } catch (IOException e) {
+ return JavaUtils.completeExceptionally(e);
+ }
+ state.setLeader(leaderId, "appendEntries");
- if (!proto.getInitializing() &&
lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
- role.startFollowerState(this, Op.APPEND_ENTRIES);
- }
- followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
-
- // Check that the append entries are not inconsistent. There are 3
- // scenarios which can result in inconsistency:
- // 1. There is a snapshot installation in progress
- // 2. There is an overlap between the snapshot index and the entries
- // 3. There is a gap between the local log and the entries
- // In any of these scenarios, we should return an INCONSISTENCY reply
- // back to leader so that the leader can update this follower's next
index.
- final long inconsistencyReplyNextIndex =
checkInconsistentAppendEntries(previous, entries);
- if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) {
- final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(
- leaderId, getMemberId(), currentTerm, followerCommit,
inconsistencyReplyNextIndex,
- AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX,
isHeartbeat);
- LOG.info("{}: appendEntries* reply {}", getMemberId(),
toAppendEntriesReplyString(reply));
- followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
- return CompletableFuture.completedFuture(reply);
- }
+ if (!proto.getInitializing() &&
lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
+ role.startFollowerState(this, Op.APPEND_ENTRIES);
+ }
+ followerState =
updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
+
+ // Check that the append entries are not inconsistent. There are 3
+ // scenarios which can result in inconsistency:
+ // 1. There is a snapshot installation in progress
+ // 2. There is an overlap between the snapshot index and the
entries
+ // 3. There is a gap between the local log and the entries
+ // In any of these scenarios, we should return an INCONSISTENCY reply
+ // back to leader so that the leader can update this follower's next
index.
+ final long inconsistencyReplyNextIndex =
checkInconsistentAppendEntries(previous, entries);
+ if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) {
+ final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(
+ leaderId, getMemberId(), currentTerm, followerCommit,
inconsistencyReplyNextIndex,
+ AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX,
isHeartbeat);
+ LOG.info("{}: appendEntries* reply {}", getMemberId(),
toAppendEntriesReplyString(reply));
+ followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
+ return CompletableFuture.completedFuture(reply);
+ }
- state.updateConfiguration(entries);
+ state.updateConfiguration(entries);
+ }
+ } finally {
+ future.ifPresent(CompletableFuture::join);
Review Comment:
I recommend using the wait mechanism to release the lock when stopping
followers, candidates and leaders to avoid deadlock problems. Putting join at
the end here may bring new concurrency problems
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1448,42 +1481,47 @@ private RequestVoteReplyProto requestVote(Phase phase,
getMemberId(), phase, candidateId, candidateGroupId, candidateTerm,
candidateLastEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(getMemberId(), candidateId, candidateGroupId);
+ Pair<RequestVoteReplyProto, CompletableFuture<Void>> replyAndFuture =
requestVoteImpl(phase, candidateId, candidateGroupId, candidateTerm,
candidateLastEntry);
+ replyAndFuture.second.join();
+ return replyAndFuture.first;
+ }
+ synchronized Pair<RequestVoteReplyProto, CompletableFuture<Void>>
requestVoteImpl(Phase phase,
Review Comment:
private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]