szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956787339
##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -225,4 +230,14 @@ public InstallSnapshotRequestProto
newInstallSnapshotNotificationRequest(TermInd
public Iterable<InstallSnapshotRequestProto>
newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot) {
return new InstallSnapshotRequests(server, getFollowerId(), requestId,
snapshot, snapshotChunkMaxSize);
}
+
+ @Override
+ public void registerAppendEntriesListener(BiConsumer<RaftPeerId,
AppendEntriesReplyProto> listener) {
+ appendEntriesListener = listener;
+ }
+
+ protected void notifyAppendEntriesListener(AppendEntriesReplyProto reply) {
+ Optional.ofNullable(appendEntriesListener).ifPresent(
+ listener -> listener.accept(this.getServer().getId(), reply));
Review Comment:
Let's add a new method to `LeaderState`
```java
//LeaderState.java
/** Received an {@link AppendEntriesReplyProto} */
void onAppendEntriesReply(FollowerInfo follower, AppendEntriesReplyProto
reply);
```
Then, this method becomes:
```java
getLeaderState().onAppendEntriesReply(getFollower(), reply);
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +111,69 @@ private boolean isValid(AppendEntriesReplyProto reply) {
return
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(),
minCallId) >= 0;
}
}
+
+ static class HeartbeatBroadcastListener {
+ private final Map<RaftPeerId, HeartbeatAck> pendingAcknowledgements;
+ private final int groupPeerCount;
+ private int ack;
+ private volatile boolean done;
+ private final CompletableFuture<Boolean> result;
+
+ HeartbeatBroadcastListener(List<LogAppender> senders) {
+ this.groupPeerCount = senders.size() + 1;
+ this.ack = 0;
+ this.done = false;
+ result = new CompletableFuture<>();
+
+ pendingAcknowledgements = new ConcurrentHashMap<>();
+ senders.forEach(sender ->
+ pendingAcknowledgements.put(sender.getFollowerId(), new
HeartbeatAck(sender)));
+ }
+
+ private void handleReply(RaftPeerId peerId, AppendEntriesReplyProto reply)
{
+ if (done) {
+ return;
+ }
+
+
Optional.ofNullable(pendingAcknowledgements.get(peerId)).ifPresent(heartbeatAck
-> {
+ boolean firstAcknowledged = heartbeatAck.receive(reply);
+ if (firstAcknowledged) {
+ onHeartbeatAcknowledged();
+ }
+ });
+ }
+
+ private synchronized void handleTimeout() {
+ if (!done) {
+ done = true;
+ result.complete(false);
+ }
+ }
+
+ private boolean isDone() {
+ return this.done;
+ }
+
+ synchronized void onHeartbeatAcknowledged() {
+ if (done) {
+ return;
+ }
+
+ ack++;
+
+ if (isMajorityAck()) {
+ done = true;
+ result.complete(true);
+ }
+ }
+
+ private synchronized boolean isMajorityAck() {
+ // include leader itself
+ return ack + 1 > groupPeerCount / 2;
+ }
Review Comment:
The configuration may change so that we should use the same logic in
`LeaderStateImpl.getMajorityMin` to calculate majority. We may add the
following method:
```java
//LeaderStateImpl.java
private boolean hasMajority(Predicate<RaftPeerId> isAcked) {
final RaftPeerId selfId = server.getId();
final RaftConfigurationImpl conf = server.getRaftConf();
final List<RaftPeerId> followers = voterLists.get(0);
final boolean includeSelf = conf.containsInConf(selfId);
final boolean newConf = hasMajority(isAcked, followers, includeSelf);
if (!conf.isTransitional()) {
return newConf;
} else { // configuration is in transitional state
final List<RaftPeerId> oldFollowers = voterLists.get(1);
final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
final boolean oldConf = hasMajority(isAcked, oldFollowers,
includeSelfInOldConf);
return newConf && oldConf;
}
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -1028,6 +1033,43 @@ public boolean checkLeadership() {
return false;
}
+ /** confirm whether it has valid leadership by broadcasting heartbeat
AppendEntries */
+ private CompletableFuture<Boolean> confirmLeadership(long readIndex) {
+ if (pendingBroadcasts.containsKey(readIndex)) {
+ return pendingBroadcasts.get(readIndex);
+ }
+
+ CompletableFuture<Boolean> future =
+ this.readRequests.newHeartbeatBroadcastListener(senders.getSenders());
+ pendingBroadcasts.put(readIndex, future);
+
+ senders.stream().forEach(logAppender -> {
+ try {
+ logAppender.triggerHeartbeat();
+ } catch (IOException e) {
+ LOG.warn("{}: {} trigger heartbeat failed due to {}", this,
logAppender, e);
+ }
+ });
+ return future;
+ }
+
+ /** return leader's read index, see thesis section 6.4 */
+ CompletableFuture<Long> getReadIndex() {
+ final long readIndex = server.getRaftLog().getLastCommittedIndex();
Review Comment:
Check if the index is already ack'ed.
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
/** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {
Review Comment:
In this case, we don't have to use `BiConsumer` since we are going to remove
`registerAppendEntriesListener`.
In general, use lambda expressions instead of implementing a functional
interface.
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -247,6 +247,8 @@ boolean removeAll(Collection<LogAppender> c) {
private final EventProcessor processor;
private final PendingRequests pendingRequests;
private final WatchRequests watchRequests;
+ private final ReadRequests readRequests;
+ private final Map<Long, CompletableFuture<Boolean>> pendingBroadcasts;
Review Comment:
Move `LeaderStateImpl.ReadRequests` and `ReadRequests.pendingBroadcasts`
should be combined into a single data structure.
##########
ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java:
##########
@@ -169,6 +171,9 @@ default boolean hasAppendEntries() {
/** send a heartbeat AppendEntries immediately */
void triggerHeartbeat() throws IOException;
+ /** Register an AppendEntries listener */
+ void registerAppendEntriesListener(BiConsumer<RaftPeerId,
AppendEntriesReplyProto> listener);
Review Comment:
Let's change `LeaderState` as mentioned instead. `LeaderState` has only one
implementation while `LogAppender` has multiple implementations.
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
/** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadRequests.class);
+
+ private final List<HeartbeatBroadcastListener> pendingBroadcasts;
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+ private final TimeDuration readTimeout;
+
+ ReadRequests(RaftProperties properties) {
+ pendingBroadcasts = new ArrayList<>();
+ readTimeout = RaftServerConfigKeys.Read.readOnlyTimeout(properties);
+ }
+
+ CompletableFuture<Boolean> newHeartbeatBroadcastListener(List<LogAppender>
senders) {
+ HeartbeatBroadcastListener listener = new
HeartbeatBroadcastListener(senders);
+ synchronized (pendingBroadcasts) {
+ pendingBroadcasts.add(listener);
+ }
+
+ // timeout and complete the broadcast listener after a given time
+ scheduler.onTimeout(readTimeout,
+ listener::handleTimeout,
+ LOG, () -> "timeout heartbeat broadcast listener: " + listener);
+
+ return listener.getFuture();
+ }
+
+ @Override
+ public void accept(RaftPeerId raftPeerId, AppendEntriesReplyProto reply) {
+ synchronized (pendingBroadcasts) {
+ pendingBroadcasts.forEach(listener -> listener.handleReply(raftPeerId,
reply));
+ pendingBroadcasts.removeIf(listener -> listener.isDone());
Review Comment:
Use a `ConcurrentNavigableMap` so that it handles only the pending requests
with read index <= the reply index. Then, it do not have to process the entire
list.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]