szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r957889294


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -189,6 +189,8 @@ public long[] getFollowerNextIndices() {
 
   private final AtomicBoolean firstElectionSinceStartup = new 
AtomicBoolean(true);
 
+  private final ReadRequests readRequests;

Review Comment:
   Let's put move `readRequests` to `ServerState since
   - `stateMachineUpdater` needs to update`readRequests` later;
   - `ServerState` is a smaller class so that `RaftServerImpl` won't get to big.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return 
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), 
minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {

Review Comment:
   Let's call it `HeartbeatListener` or `AppendEntriesListener` since it 
listens to all heartbeats (indeed, all appendEntries), not only the broadcasted 
heartbeat.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return 
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), 
minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new 
ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = 
replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new 
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, 
HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, 
Predicate<Predicate<RaftPeerId>> hasMajority) {

Review Comment:
   I was thinking about an un-synchronized implementation but it may be too 
hard.  If we are going to use `synchronized`, we should also `synchronized` the 
add method and change the map to just `NavigableMap`.  Also, let's create a 
class so that `synchronized` applies only to that class but not the entire 
`ReadRequests`.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return 
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), 
minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new 
ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = 
replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new 
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, 
HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, 
Predicate<Predicate<RaftPeerId>> hasMajority) {

Review Comment:
   The code will look like
   ```java
     class HeartbeatListeners {
       private final NavigableMap<Long, HeartbeatListener> sorted = new 
TreeMap<>();
   
       synchronized HeartbeatListener add(long commitIndex, Function<Long, 
HeartbeatListener> constructor) {
         return sorted.computeIfAbsent(commitIndex, constructor);
       }
   
       synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
         final long callId = reply.getServerReply().getCallId();
         for (; ; ) {
           final Map.Entry<Long, HeartbeatListener> first = sorted.firstEntry();
           if (first == null || first.getKey() > callId) {
             return;
           }
   
           final HeartbeatListener listener = first.getValue();
           if (listener == null) {
             continue;
           }
   
           if (listener.receive(reply, hasMajority)) {
             final HeartbeatListener removed = sorted.remove(callId);
             Preconditions.assertSame(listener, removed, "HeartbeatListener");
             ackedCommitIndex.updateToMax(listener.commitIndex, s -> 
LOG.debug("{}: {}", ReadRequests.this, s));
           }
         }
       }
     }
   
     private final RaftLogIndex ackedCommitIndex = new 
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
     private final HeartbeatListeners heartbeatListeners = new 
HeartbeatListeners();
   
     HeartbeatListener addHeartbeatListener(long commitIndex, Function<Long, 
HeartbeatListener> constructor) {
       if (commitIndex <= ackedCommitIndex.get()) {
         return null;
       }
       return heartbeatListeners.add(commitIndex, constructor);
     }
   
     void onAppendEntriesReply(AppendEntriesReplyProto reply, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
       heartbeatListeners.onAppendEntriesReply(reply, hasMajority);
     }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return 
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), 
minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new 
ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = 
replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }

Review Comment:
   It should be
   ```java
       boolean isCompletedNormally() {
         return future.isDone() && !future.isCompletedExceptionally() && 
!future.isCancelled();
       }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return 
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), 
minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new 
ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = 
replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new 
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, 
HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
+    final long callId = reply.getServerReply().getCallId();
+    for (;;) {
+      final Long first = heartbeatListeners.firstKey();
+      if (first == null || first > callId) {
+        return;
+      }
+
+      final HeartbeatBroadcastListener listener = 
heartbeatListeners.get(callId);
+      if (listener == null) {
+        continue;
+      }
+
+      listener.receive(reply, hasMajority);
+      if (listener.isCompleted()) {
+        heartbeatListeners.remove(callId);
+        ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("ack 
commit index change: {}", s));

Review Comment:
   The name `ackedCommitIndex` is already printed.  Let's use 
   ```java
   LOG.debug("{}: {}", ReadRequests.this, s))
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return 
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), 
minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new 
ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {

Review Comment:
   Let's check `isCompletedNormally()`:
   ```java
       boolean receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
         if (isCompletedNormally()) {
           return true;
         }
         final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
         final HeartbeatAck reply = 
replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
         if (reply.receive(proto)) {
           if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
             future.complete(commitIndex);
             return true;
           }
         }
         return isCompletedNormally();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to