This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c1fd4e5dc RATIS-1872. HeartbeatAck use in-correct callId as minCallId.
(#905)
c1fd4e5dc is described below
commit c1fd4e5dc015fc8b3e4b6b18d41eeab2b9e81284
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Aug 18 07:49:03 2023 -0700
RATIS-1872. HeartbeatAck use in-correct callId as minCallId. (#905)
---
.../main/java/org/apache/ratis/server/impl/LeaderStateImpl.java | 9 +--------
.../java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java | 9 ++++++---
2 files changed, 7 insertions(+), 11 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 5dfbc009b..adf035e4f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -54,7 +54,6 @@ import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
@@ -1076,20 +1075,14 @@ class LeaderStateImpl implements LeaderState {
new LeaderNotReadyException(server.getMemberId())));
}
- final MemoizedSupplier<AppendEntriesListener> supplier =
MemoizedSupplier.valueOf(
- () -> new AppendEntriesListener(readIndex));
final AppendEntriesListener listener =
readIndexHeartbeats.addAppendEntriesListener(
- readIndex, key -> supplier.get());
+ readIndex, i -> new AppendEntriesListener(i, senders));
// the readIndex is already acknowledged before
if (listener == null) {
return CompletableFuture.completedFuture(readIndex);
}
- if (supplier.isInitialized()) {
- senders.forEach(LogAppender::triggerHeartbeat);
- }
-
return listener.getFuture();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
index 92513086d..7e252f7ad 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
@@ -85,8 +85,12 @@ class ReadIndexHeartbeats {
private final CompletableFuture<Long> future = new CompletableFuture<>();
private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new
ConcurrentHashMap<>();
- AppendEntriesListener(long commitIndex) {
+ AppendEntriesListener(long commitIndex, Iterable<LogAppender>
logAppenders) {
this.commitIndex = commitIndex;
+ for (LogAppender a : logAppenders) {
+ a.triggerHeartbeat();
+ replies.put(a.getFollowerId(), new HeartbeatAck(a));
+ }
}
CompletableFuture<Long> getFuture() {
@@ -159,8 +163,7 @@ class ReadIndexHeartbeats {
private final AppendEntriesListeners appendEntriesListeners = new
AppendEntriesListeners();
private final RaftLogIndex ackedCommitIndex = new
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
- AppendEntriesListener addAppendEntriesListener(long commitIndex,
- Function<Long,
AppendEntriesListener> constructor) {
+ AppendEntriesListener addAppendEntriesListener(long commitIndex,
Function<Long, AppendEntriesListener> constructor) {
if (commitIndex <= ackedCommitIndex.get()) {
return null;
}