SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956966671


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -1028,6 +1033,43 @@ public boolean checkLeadership() {
     return false;
   }
 
+  /** confirm whether it has valid leadership by broadcasting heartbeat 
AppendEntries */
+  private CompletableFuture<Boolean> confirmLeadership(long readIndex) {
+    if (pendingBroadcasts.containsKey(readIndex)) {
+      return pendingBroadcasts.get(readIndex);
+    }
+
+    CompletableFuture<Boolean> future =
+        this.readRequests.newHeartbeatBroadcastListener(senders.getSenders());
+    pendingBroadcasts.put(readIndex, future);
+
+    senders.stream().forEach(logAppender -> {
+      try {
+        logAppender.triggerHeartbeat();
+      } catch (IOException e) {
+        LOG.warn("{}: {} trigger heartbeat failed due to {}", this, 
logAppender, e);
+      }
+    });
+    return future;
+  }
+
+  /** return leader's read index, see thesis section 6.4 */
+  CompletableFuture<Long> getReadIndex() {
+    final long readIndex = server.getRaftLog().getLastCommittedIndex();

Review Comment:
   done



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 
 /** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReadRequests.class);
+
+  private final List<HeartbeatBroadcastListener> pendingBroadcasts;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+  private final TimeDuration readTimeout;
+
+  ReadRequests(RaftProperties properties) {
+    pendingBroadcasts = new ArrayList<>();
+    readTimeout = RaftServerConfigKeys.Read.readOnlyTimeout(properties);
+  }
+
+  CompletableFuture<Boolean> newHeartbeatBroadcastListener(List<LogAppender> 
senders) {
+    HeartbeatBroadcastListener listener = new 
HeartbeatBroadcastListener(senders);
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.add(listener);
+    }
+
+    // timeout and complete the broadcast listener after a given time
+    scheduler.onTimeout(readTimeout,
+        listener::handleTimeout,
+        LOG, () -> "timeout heartbeat broadcast listener: " + listener);
+
+    return listener.getFuture();
+  }
+
+  @Override
+  public void accept(RaftPeerId raftPeerId, AppendEntriesReplyProto reply) {
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.forEach(listener -> listener.handleReply(raftPeerId, 
reply));
+      pendingBroadcasts.removeIf(listener -> listener.isDone());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to