szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956789679


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 
 /** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReadRequests.class);
+
+  private final List<HeartbeatBroadcastListener> pendingBroadcasts;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+  private final TimeDuration readTimeout;
+
+  ReadRequests(RaftProperties properties) {
+    pendingBroadcasts = new ArrayList<>();
+    readTimeout = RaftServerConfigKeys.Read.readOnlyTimeout(properties);
+  }
+
+  CompletableFuture<Boolean> newHeartbeatBroadcastListener(List<LogAppender> 
senders) {
+    HeartbeatBroadcastListener listener = new 
HeartbeatBroadcastListener(senders);
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.add(listener);
+    }
+
+    // timeout and complete the broadcast listener after a given time
+    scheduler.onTimeout(readTimeout,
+        listener::handleTimeout,
+        LOG, () -> "timeout heartbeat broadcast listener: " + listener);
+
+    return listener.getFuture();
+  }
+
+  @Override
+  public void accept(RaftPeerId raftPeerId, AppendEntriesReplyProto reply) {
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.forEach(listener -> listener.handleReply(raftPeerId, 
reply));
+      pendingBroadcasts.removeIf(listener -> listener.isDone());

Review Comment:
   Use a `ConcurrentNavigableMap` so that it handles only the pending requests 
with read index <= the reply index.  Then, it do not have to process the entire 
list; see `ReadIndexQueue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to