This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d2f1c57fe RATIS-2019 Fixed abnormal exit of StateMachineUpdater (#1033)
d2f1c57fe is described below
commit d2f1c57fe60e3abe1bbfa1820297f9d60736bb42
Author: Potato <[email protected]>
AuthorDate: Wed Jan 31 00:12:23 2024 +0800
RATIS-2019 Fixed abnormal exit of StateMachineUpdater (#1033)
---
.../main/java/org/apache/ratis/util/JavaUtils.java | 4 ++++
.../apache/ratis/server/impl/LeaderStateImpl.java | 26 +++++++++-------------
.../ratis/server/impl/ReadIndexHeartbeats.java | 9 +++-----
.../apache/ratis/server/impl/RetryCacheImpl.java | 2 +-
4 files changed, 19 insertions(+), 22 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index d3f899a7f..f689006db 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -269,6 +269,10 @@ public interface JavaUtils {
return future;
}
+ static boolean isCompletedNormally(CompletableFuture<?> future) {
+ return future.isDone() && !future.isCancelled() &&
!future.isCompletedExceptionally();
+ }
+
static Throwable unwrapCompletionException(Throwable t) {
Objects.requireNonNull(t, "t == null");
return t instanceof CompletionException && t.getCause() != null?
t.getCause(): t;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 043c7319b..4f313a437 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -311,19 +311,16 @@ class LeaderStateImpl implements LeaderState {
return appliedIndexFuture;
}
- boolean isApplied(LogEntryProto logEntry) {
- if (appliedIndexFuture.isDone()) {
- return true;
- }
- final long appliedIndex = logEntry != null? logEntry.getIndex():
server.getState().getLastAppliedIndex();
- if (appliedIndex >= startIndex) {
- appliedIndexFuture.complete(appliedIndex);
- LOG.info("leader is ready since appliedIndex == {} >= startIndex ==
{}",
- appliedIndex, startIndex);
- return true;
- } else {
- return false;
+ boolean checkStartIndex(LogEntryProto logEntry) {
+ final boolean completed = logEntry.getIndex() == startIndex &&
appliedIndexFuture.complete(startIndex);
+ if (completed) {
+ LOG.info("Leader {} is ready since appliedIndex == startIndex == {}",
LeaderStateImpl.this, startIndex);
}
+ return completed;
+ }
+
+ boolean isApplied() {
+ return JavaUtils.isCompletedNormally(appliedIndexFuture);
}
}
@@ -422,12 +419,11 @@ class LeaderStateImpl implements LeaderState {
}
boolean isReady() {
- return startupLogEntry.isInitialized() &&
startupLogEntry.get().isApplied(null);
+ return startupLogEntry.isInitialized() &&
startupLogEntry.get().isApplied();
}
void checkReady(LogEntryProto entry) {
- Preconditions.assertTrue(startupLogEntry.isInitialized());
- if (entry.getTerm() == getCurrentTerm() &&
startupLogEntry.get().isApplied(entry)) {
+ if (entry.getTerm() == server.getState().getCurrentTerm() &&
startupLogEntry.get().checkStartIndex(entry)) {
server.getStateMachine().leaderEvent().notifyLeaderReady();
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
index 3f31a2530..d08a1ea40 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIndex;
+import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,7 +100,7 @@ class ReadIndexHeartbeats {
boolean receive(LogAppender logAppender, AppendEntriesReplyProto proto,
Predicate<Predicate<RaftPeerId>> hasMajority) {
- if (isCompletedNormally()) {
+ if (JavaUtils.isCompletedNormally(future)) {
return true;
}
@@ -112,16 +113,12 @@ class ReadIndexHeartbeats {
}
}
- return isCompletedNormally();
+ return JavaUtils.isCompletedNormally(future);
}
boolean isAcknowledged(RaftPeerId id) {
return
Optional.ofNullable(replies.get(id)).filter(HeartbeatAck::isAcknowledged).isPresent();
}
-
- boolean isCompletedNormally() {
- return future.isDone() && !future.isCancelled() &&
!future.isCompletedExceptionally();
- }
}
class AppendEntriesListeners {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
index a8bac4e5e..50d238b07 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
@@ -67,7 +67,7 @@ class RetryCacheImpl implements RetryCache {
}
boolean isCompletedNormally() {
- return !failed && replyFuture.isDone() &&
!replyFuture.isCompletedExceptionally() && !replyFuture.isCancelled();
+ return !failed && JavaUtils.isCompletedNormally(replyFuture);
}
void updateResult(RaftClientReply reply) {