This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fd4e5dc RATIS-1872. HeartbeatAck use in-correct callId as minCallId. 
(#905)
c1fd4e5dc is described below

commit c1fd4e5dc015fc8b3e4b6b18d41eeab2b9e81284
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Aug 18 07:49:03 2023 -0700

    RATIS-1872. HeartbeatAck use in-correct callId as minCallId. (#905)
---
 .../main/java/org/apache/ratis/server/impl/LeaderStateImpl.java  | 9 +--------
 .../java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java   | 9 ++++++---
 2 files changed, 7 insertions(+), 11 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 5dfbc009b..adf035e4f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -54,7 +54,6 @@ import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
@@ -1076,20 +1075,14 @@ class LeaderStateImpl implements LeaderState {
           new LeaderNotReadyException(server.getMemberId())));
     }
 
-    final MemoizedSupplier<AppendEntriesListener> supplier = 
MemoizedSupplier.valueOf(
-        () -> new AppendEntriesListener(readIndex));
     final AppendEntriesListener listener = 
readIndexHeartbeats.addAppendEntriesListener(
-        readIndex, key -> supplier.get());
+        readIndex, i -> new AppendEntriesListener(i, senders));
 
     // the readIndex is already acknowledged before
     if (listener == null) {
       return CompletableFuture.completedFuture(readIndex);
     }
 
-    if (supplier.isInitialized()) {
-      senders.forEach(LogAppender::triggerHeartbeat);
-    }
-
     return listener.getFuture();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
index 92513086d..7e252f7ad 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
@@ -85,8 +85,12 @@ class ReadIndexHeartbeats {
     private final CompletableFuture<Long> future = new CompletableFuture<>();
     private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new 
ConcurrentHashMap<>();
 
-    AppendEntriesListener(long commitIndex) {
+    AppendEntriesListener(long commitIndex, Iterable<LogAppender> 
logAppenders) {
       this.commitIndex = commitIndex;
+      for (LogAppender a : logAppenders) {
+        a.triggerHeartbeat();
+        replies.put(a.getFollowerId(), new HeartbeatAck(a));
+      }
     }
 
     CompletableFuture<Long> getFuture() {
@@ -159,8 +163,7 @@ class ReadIndexHeartbeats {
   private final AppendEntriesListeners appendEntriesListeners = new 
AppendEntriesListeners();
   private final RaftLogIndex ackedCommitIndex = new 
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
 
-  AppendEntriesListener addAppendEntriesListener(long commitIndex,
-                                                              Function<Long, 
AppendEntriesListener> constructor) {
+  AppendEntriesListener addAppendEntriesListener(long commitIndex, 
Function<Long, AppendEntriesListener> constructor) {
     if (commitIndex <= ackedCommitIndex.get()) {
       return null;
     }

Reply via email to