SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956676538
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +66,82 @@ private boolean isValid(AppendEntriesReplyProto reply) {
return
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(),
minCallId) >= 0;
}
}
+
+ static class HeartbeatBroadcastListener {
+ private final int groupPeerCount;
+ private int ack;
+ private int fail;
+ private volatile boolean done;
+ private final CompletableFuture<Boolean> result;
+
+ HeartbeatBroadcastListener(int groupSize) {
+ this.groupPeerCount = groupSize;
+ this.ack = 0;
+ this.fail = 0;
Review Comment:
I've removed the fail field while keeps the timeout mechanism. If a
broadcast does not complete in a given time, just complete it with value
`false`.
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +66,82 @@ private boolean isValid(AppendEntriesReplyProto reply) {
return
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(),
minCallId) >= 0;
}
}
+
+ static class HeartbeatBroadcastListener {
+ private final int groupPeerCount;
+ private int ack;
+ private int fail;
+ private volatile boolean done;
+ private final CompletableFuture<Boolean> result;
+
+ HeartbeatBroadcastListener(int groupSize) {
+ this.groupPeerCount = groupSize;
+ this.ack = 0;
+ this.fail = 0;
+ this.done = false;
+ result = new CompletableFuture<>();
+ }
+
+ private class HeartbeatListener implements BiFunction<Boolean,
AppendEntriesReplyProto, Boolean> {
+ private final HeartbeatAck heartbeatAck;
+
+ HeartbeatListener(LogAppender logAppender) {
+ heartbeatAck = new HeartbeatAck(logAppender);
+ }
+
+ @Override
+ public Boolean apply(Boolean isTimeout, AppendEntriesReplyProto reply) {
+ if (isTimeout) {
+ onHeartbeatReply(false);
+ return true;
+ }
+
+ boolean acknowledged = heartbeatAck.receive(reply);
+ if (acknowledged) {
+ onHeartbeatReply(true);
+ }
+ return heartbeatAck.isAcknowledged();
+ }
+
+ }
+
+ HeartbeatListener newHeartbeatListener(LogAppender logAppender) {
+ return new HeartbeatListener(logAppender);
+ }
+
+ synchronized void onHeartbeatReply(boolean isSuccess) {
+ if (done) {
+ return;
+ }
+
+ if (isSuccess) {
+ ack++;
+ } else {
+ fail++;
+ }
+
+ if (isMajorityAck()) {
+ result.complete(true);
+ done = true;
+ }
+
+ if (isMajorityFail()) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]