This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch snapshot-3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 5bb9e32e3bbf26eeb04b8f759544e4fa100d38a9 Author: Potato <[email protected]> AuthorDate: Wed Jan 31 00:12:23 2024 +0800 RATIS-2019 Fixed abnormal exit of StateMachineUpdater (#1033) --- .../main/java/org/apache/ratis/util/JavaUtils.java | 4 ++++ .../apache/ratis/server/impl/LeaderStateImpl.java | 26 +++++++++------------- .../ratis/server/impl/ReadIndexHeartbeats.java | 9 +++----- .../apache/ratis/server/impl/RetryCacheImpl.java | 2 +- 4 files changed, 19 insertions(+), 22 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index 00725903a..5a977a45b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -283,6 +283,10 @@ public interface JavaUtils { return future; } + static boolean isCompletedNormally(CompletableFuture<?> future) { + return future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally(); + } + static Throwable unwrapCompletionException(Throwable t) { Objects.requireNonNull(t, "t == null"); return t instanceof CompletionException && t.getCause() != null? t.getCause(): t; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index c6983e331..34a28abe4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -310,19 +310,16 @@ class LeaderStateImpl implements LeaderState { return appliedIndexFuture; } - boolean isApplied(LogEntryProto logEntry) { - if (appliedIndexFuture.isDone()) { - return true; - } - final long appliedIndex = logEntry != null? logEntry.getIndex(): server.getState().getLastAppliedIndex(); - if (appliedIndex >= startIndex) { - appliedIndexFuture.complete(appliedIndex); - LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}", - appliedIndex, startIndex); - return true; - } else { - return false; + boolean checkStartIndex(LogEntryProto logEntry) { + final boolean completed = logEntry.getIndex() == startIndex && appliedIndexFuture.complete(startIndex); + if (completed) { + LOG.info("Leader {} is ready since appliedIndex == startIndex == {}", LeaderStateImpl.this, startIndex); } + return completed; + } + + boolean isApplied() { + return JavaUtils.isCompletedNormally(appliedIndexFuture); } } @@ -421,12 +418,11 @@ class LeaderStateImpl implements LeaderState { } boolean isReady() { - return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(null); + return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(); } void checkReady(LogEntryProto entry) { - Preconditions.assertTrue(startupLogEntry.isInitialized()); - if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().isApplied(entry)) { + if (entry.getTerm() == server.getState().getCurrentTerm() && startupLogEntry.get().checkStartIndex(entry)) { server.getStateMachine().leaderEvent().notifyLeaderReady(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java index 3f31a2530..d08a1ea40 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java @@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.leader.LogAppender; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.raftlog.RaftLogIndex; +import org.apache.ratis.util.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,7 +100,7 @@ class ReadIndexHeartbeats { boolean receive(LogAppender logAppender, AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) { - if (isCompletedNormally()) { + if (JavaUtils.isCompletedNormally(future)) { return true; } @@ -112,16 +113,12 @@ class ReadIndexHeartbeats { } } - return isCompletedNormally(); + return JavaUtils.isCompletedNormally(future); } boolean isAcknowledged(RaftPeerId id) { return Optional.ofNullable(replies.get(id)).filter(HeartbeatAck::isAcknowledged).isPresent(); } - - boolean isCompletedNormally() { - return future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally(); - } } class AppendEntriesListeners { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java index a8bac4e5e..50d238b07 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java @@ -67,7 +67,7 @@ class RetryCacheImpl implements RetryCache { } boolean isCompletedNormally() { - return !failed && replyFuture.isDone() && !replyFuture.isCompletedExceptionally() && !replyFuture.isCancelled(); + return !failed && JavaUtils.isCompletedNormally(replyFuture); } void updateResult(RaftClientReply reply) {
