SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r958558963


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return 
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), 
minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new 
ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = 
replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new 
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, 
HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
+    final long callId = reply.getServerReply().getCallId();
+    for (;;) {
+      final Long first = heartbeatListeners.firstKey();
+      if (first == null || first > callId) {
+        return;
+      }
+
+      final HeartbeatBroadcastListener listener = 
heartbeatListeners.get(callId);
+      if (listener == null) {
+        continue;
+      }
+
+      listener.receive(reply, hasMajority);
+      if (listener.isCompleted()) {
+        heartbeatListeners.remove(callId);
+        ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("ack 
commit index change: {}", s));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to