This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch snapshot-3
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 5bb9e32e3bbf26eeb04b8f759544e4fa100d38a9
Author: Potato <[email protected]>
AuthorDate: Wed Jan 31 00:12:23 2024 +0800

    RATIS-2019 Fixed abnormal exit of StateMachineUpdater (#1033)
---
 .../main/java/org/apache/ratis/util/JavaUtils.java |  4 ++++
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 26 +++++++++-------------
 .../ratis/server/impl/ReadIndexHeartbeats.java     |  9 +++-----
 .../apache/ratis/server/impl/RetryCacheImpl.java   |  2 +-
 4 files changed, 19 insertions(+), 22 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 00725903a..5a977a45b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -283,6 +283,10 @@ public interface JavaUtils {
     return future;
   }
 
+  static boolean isCompletedNormally(CompletableFuture<?> future) {
+    return future.isDone() && !future.isCancelled() && 
!future.isCompletedExceptionally();
+  }
+
   static Throwable unwrapCompletionException(Throwable t) {
     Objects.requireNonNull(t, "t == null");
     return t instanceof CompletionException && t.getCause() != null? 
t.getCause(): t;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index c6983e331..34a28abe4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -310,19 +310,16 @@ class LeaderStateImpl implements LeaderState {
       return appliedIndexFuture;
     }
 
-    boolean isApplied(LogEntryProto logEntry) {
-      if (appliedIndexFuture.isDone()) {
-        return true;
-      }
-      final long appliedIndex = logEntry != null? logEntry.getIndex(): 
server.getState().getLastAppliedIndex();
-      if (appliedIndex >= startIndex) {
-        appliedIndexFuture.complete(appliedIndex);
-        LOG.info("leader is ready since appliedIndex == {} >= startIndex == 
{}",
-            appliedIndex, startIndex);
-        return true;
-      } else {
-        return false;
+    boolean checkStartIndex(LogEntryProto logEntry) {
+      final boolean completed = logEntry.getIndex() == startIndex && 
appliedIndexFuture.complete(startIndex);
+      if (completed) {
+        LOG.info("Leader {} is ready since appliedIndex == startIndex == {}", 
LeaderStateImpl.this, startIndex);
       }
+      return completed;
+    }
+
+    boolean isApplied() {
+      return JavaUtils.isCompletedNormally(appliedIndexFuture);
     }
   }
 
@@ -421,12 +418,11 @@ class LeaderStateImpl implements LeaderState {
   }
 
   boolean isReady() {
-    return startupLogEntry.isInitialized() && 
startupLogEntry.get().isApplied(null);
+    return startupLogEntry.isInitialized() && 
startupLogEntry.get().isApplied();
   }
 
   void checkReady(LogEntryProto entry) {
-    Preconditions.assertTrue(startupLogEntry.isInitialized());
-    if (entry.getTerm() == getCurrentTerm() && 
startupLogEntry.get().isApplied(entry)) {
+    if (entry.getTerm() == server.getState().getCurrentTerm() && 
startupLogEntry.get().checkStartIndex(entry)) {
       server.getStateMachine().leaderEvent().notifyLeaderReady();
     }
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
index 3f31a2530..d08a1ea40 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIndex;
+import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,7 +100,7 @@ class ReadIndexHeartbeats {
 
     boolean receive(LogAppender logAppender, AppendEntriesReplyProto proto,
                     Predicate<Predicate<RaftPeerId>> hasMajority) {
-      if (isCompletedNormally()) {
+      if (JavaUtils.isCompletedNormally(future)) {
         return true;
       }
 
@@ -112,16 +113,12 @@ class ReadIndexHeartbeats {
         }
       }
 
-      return isCompletedNormally();
+      return JavaUtils.isCompletedNormally(future);
     }
 
     boolean isAcknowledged(RaftPeerId id) {
       return 
Optional.ofNullable(replies.get(id)).filter(HeartbeatAck::isAcknowledged).isPresent();
     }
-
-    boolean isCompletedNormally() {
-      return future.isDone() && !future.isCancelled() && 
!future.isCompletedExceptionally();
-    }
   }
 
   class AppendEntriesListeners {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
index a8bac4e5e..50d238b07 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
@@ -67,7 +67,7 @@ class RetryCacheImpl implements RetryCache {
     }
 
     boolean isCompletedNormally() {
-      return !failed && replyFuture.isDone() && 
!replyFuture.isCompletedExceptionally() && !replyFuture.isCancelled();
+      return !failed && JavaUtils.isCompletedNormally(replyFuture);
     }
 
     void updateResult(RaftClientReply reply) {

Reply via email to