This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch snapshot-3
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit d2e114eda3219d0e114fe1f58b81d3da28e2c37e
Author: William Song <[email protected]>
AuthorDate: Mon Mar 18 23:15:14 2024 +0800

    RATIS-2044. Fix ReadIndex loss caused by data race in 
AppendEntriesListeners (#1052)
---
 .../org/apache/ratis/server/impl/ReadIndexHeartbeats.java    | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
index d08a1ea40..4ff1460d7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIndex;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,8 +124,15 @@ class ReadIndexHeartbeats {
 
   class AppendEntriesListeners {
     private final NavigableMap<Long, AppendEntriesListener> sorted = new 
TreeMap<>();
+    private Exception exception = null;
 
     synchronized AppendEntriesListener add(long commitIndex, Function<Long, 
AppendEntriesListener> constructor) {
+      if (exception != null) {
+        Preconditions.assertTrue(sorted.isEmpty());
+        final AppendEntriesListener listener = constructor.apply(commitIndex);
+        listener.getFuture().completeExceptionally(exception);
+        return listener;
+      }
       return sorted.computeIfAbsent(commitIndex, constructor);
     }
 
@@ -152,6 +160,10 @@ class ReadIndexHeartbeats {
     }
 
     synchronized void failAll(Exception e) {
+      if (exception != null) {
+        return;
+      }
+      exception = e;
       sorted.forEach((index, listener) -> 
listener.getFuture().completeExceptionally(e));
       sorted.clear();
     }

Reply via email to