This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch snapshot-branch2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 4b7305ac87551b1461248d4852a19237f01f2af7 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Fri Aug 18 07:49:03 2023 -0700 RATIS-1872. HeartbeatAck use in-correct callId as minCallId. (#905) --- .../main/java/org/apache/ratis/server/impl/LeaderStateImpl.java | 9 +-------- .../java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java | 9 ++++++--- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 5dfbc009b..adf035e4f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -54,7 +54,6 @@ import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; @@ -1076,20 +1075,14 @@ class LeaderStateImpl implements LeaderState { new LeaderNotReadyException(server.getMemberId()))); } - final MemoizedSupplier<AppendEntriesListener> supplier = MemoizedSupplier.valueOf( - () -> new AppendEntriesListener(readIndex)); final AppendEntriesListener listener = readIndexHeartbeats.addAppendEntriesListener( - readIndex, key -> supplier.get()); + readIndex, i -> new AppendEntriesListener(i, senders)); // the readIndex is already acknowledged before if (listener == null) { return CompletableFuture.completedFuture(readIndex); } - if (supplier.isInitialized()) { - senders.forEach(LogAppender::triggerHeartbeat); - } - return listener.getFuture(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java index 92513086d..7e252f7ad 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java @@ -85,8 +85,12 @@ class ReadIndexHeartbeats { private final CompletableFuture<Long> future = new CompletableFuture<>(); private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>(); - AppendEntriesListener(long commitIndex) { + AppendEntriesListener(long commitIndex, Iterable<LogAppender> logAppenders) { this.commitIndex = commitIndex; + for (LogAppender a : logAppenders) { + a.triggerHeartbeat(); + replies.put(a.getFollowerId(), new HeartbeatAck(a)); + } } CompletableFuture<Long> getFuture() { @@ -159,8 +163,7 @@ class ReadIndexHeartbeats { private final AppendEntriesListeners appendEntriesListeners = new AppendEntriesListeners(); private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX); - AppendEntriesListener addAppendEntriesListener(long commitIndex, - Function<Long, AppendEntriesListener> constructor) { + AppendEntriesListener addAppendEntriesListener(long commitIndex, Function<Long, AppendEntriesListener> constructor) { if (commitIndex <= ackedCommitIndex.get()) { return null; }
