This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9230867d1 RATIS-1706. Move heartbeat listeners to LeaderState (#743)
9230867d1 is described below
commit 9230867d19953583b817c7b56ccd4a933fb3835a
Author: William Song <[email protected]>
AuthorDate: Tue Sep 20 17:53:06 2022 +0800
RATIS-1706. Move heartbeat listeners to LeaderState (#743)
---
.../apache/ratis/server/RaftServerConfigKeys.java | 2 +-
.../apache/ratis/server/impl/LeaderStateImpl.java | 11 +-
...{ReadRequests.java => ReadIndexHeartbeats.java} | 80 ++----------
.../org/apache/ratis/server/impl/ReadRequests.java | 135 ---------------------
.../org/apache/ratis/ReadOnlyRequestTests.java | 31 ++---
5 files changed, 27 insertions(+), 232 deletions(-)
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 5ae1020c5..47e50a048 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -179,7 +179,7 @@ public interface RaftServerConfigKeys {
LINEARIZABLE
}
- String OPTION_KEY = ".option";
+ String OPTION_KEY = PREFIX + ".option";
Option OPTION_DEFAULT = Option.DEFAULT;
static Option option(RaftProperties properties) {
Option option = get(properties::getEnum, OPTION_KEY, OPTION_DEFAULT,
getDefaultLog());
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index d51a3bcfc..74649c566 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -40,7 +40,7 @@ import
org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.ReadRequests.AppendEntriesListener;
+import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.leader.LogAppender;
@@ -260,6 +260,8 @@ class LeaderStateImpl implements LeaderState {
private final long followerMaxGapThreshold;
private final PendingStepDown pendingStepDown;
+ private final ReadIndexHeartbeats readIndexHeartbeats;
+
LeaderStateImpl(RaftServerImpl server) {
this.name = server.getMemberId() + "-" +
JavaUtils.getClassSimpleName(getClass());
this.server = server;
@@ -279,6 +281,7 @@ class LeaderStateImpl implements LeaderState {
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
this.messageStreamRequests = new
MessageStreamRequests(server.getMemberId());
this.pendingStepDown = new PendingStepDown(this);
+ this.readIndexHeartbeats = new ReadIndexHeartbeats();
long maxPendingRequests =
RaftServerConfigKeys.Write.elementLimit(properties);
double followerGapRatioMax =
RaftServerConfigKeys.Write.followerGapRatioMax(properties);
@@ -337,6 +340,8 @@ class LeaderStateImpl implements LeaderState {
LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e);
}
messageStreamRequests.clear();
+ // TODO client should retry on NotLeaderException
+ readIndexHeartbeats.failListeners(nle);
server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
@@ -1087,7 +1092,7 @@ class LeaderStateImpl implements LeaderState {
final MemoizedSupplier<AppendEntriesListener> supplier =
MemoizedSupplier.valueOf(
() -> new AppendEntriesListener(readIndex));
- final AppendEntriesListener listener =
server.getReadRequests().addAppendEntriesListener(
+ final AppendEntriesListener listener =
readIndexHeartbeats.addAppendEntriesListener(
readIndex, key -> supplier.get());
// the readIndex is already acknowledged before
@@ -1110,7 +1115,7 @@ class LeaderStateImpl implements LeaderState {
@Override
public void onAppendEntriesReply(LogAppender appender,
RaftProtos.AppendEntriesReplyProto reply) {
- server.getReadRequests().onAppendEntriesReply(appender, reply,
this::hasMajority);
+ readIndexHeartbeats.onAppendEntriesReply(appender, reply,
this::hasMajority);
}
void replyPendingRequest(long logIndex, RaftClientReply reply) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
similarity index 67%
copy from
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
copy to
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
index bbd2a92ba..40c559b2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
@@ -17,37 +17,25 @@
*/
package org.apache.ratis.server.impl;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.exceptions.ReadException;
-import org.apache.ratis.protocol.exceptions.TimeoutIOException;
-import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIndex;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.MemoizedSupplier;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
-/** For supporting linearizable read. */
-class ReadRequests {
- private static final Logger LOG =
LoggerFactory.getLogger(ReadRequests.class);
+class ReadIndexHeartbeats {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadIndexHeartbeats.class);
/** The acknowledgement from a {@link LogAppender} of a heartbeat for a
particular call id. */
static class HeartbeatAck {
@@ -151,66 +139,23 @@ class ReadRequests {
}
if (listener.receive(appender, reply, hasMajority)) {
- ackedCommitIndex.updateToMax(listener.commitIndex, s ->
LOG.debug("{}: {}", ReadRequests.this, s));
+ ackedCommitIndex.updateToMax(listener.commitIndex, s ->
LOG.debug("{}: {}", this, s));
iterator.remove();
}
}
}
- }
-
- static class ReadIndexQueue {
- private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
- private final NavigableMap<Long, CompletableFuture<Long>> sorted = new
ConcurrentSkipListMap<>();
- private final TimeDuration readTimeout;
-
- ReadIndexQueue(TimeDuration readTimeout) {
- this.readTimeout = readTimeout;
- }
-
- CompletableFuture<Long> add(long readIndex) {
- final MemoizedSupplier<CompletableFuture<Long>> supplier =
MemoizedSupplier.valueOf(CompletableFuture::new);
- final CompletableFuture<Long> f = sorted.computeIfAbsent(readIndex, i ->
supplier.get());
-
- if (supplier.isInitialized()) {
- scheduler.onTimeout(readTimeout, () -> handleTimeout(readIndex),
- LOG, () -> "Failed to handle read timeout for index " + readIndex);
- }
- return f;
- }
-
- private void handleTimeout(long readIndex) {
- Optional.ofNullable(sorted.remove(readIndex)).ifPresent(consumer -> {
- consumer.completeExceptionally(
- new ReadException(new TimeoutIOException("Read timeout for index " +
readIndex)));
- });
- }
- void complete(Long appliedIndex) {
- for(;;) {
- if (sorted.isEmpty()) {
- return;
- }
- final Long first = sorted.firstKey();
- if (first == null || first > appliedIndex) {
- return;
- }
- Optional.ofNullable(sorted.remove(first)).ifPresent(f ->
f.complete(appliedIndex));
- }
+ synchronized void failAll(Exception e) {
+ sorted.forEach((index, listener) ->
listener.getFuture().completeExceptionally(e));
+ sorted.clear();
}
}
private final AppendEntriesListeners appendEntriesListeners = new
AppendEntriesListeners();
private final RaftLogIndex ackedCommitIndex = new
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
- private final ReadIndexQueue readIndexQueue;
- private final StateMachine stateMachine;
-
- ReadRequests(RaftProperties properties, StateMachine stateMachine) {
- this.readIndexQueue = new
ReadIndexQueue(RaftServerConfigKeys.Read.timeout(properties));
- this.stateMachine = stateMachine;
- }
AppendEntriesListener addAppendEntriesListener(long commitIndex,
- Function<Long,
AppendEntriesListener> constructor) {
+ Function<Long,
AppendEntriesListener> constructor) {
if (commitIndex <= ackedCommitIndex.get()) {
return null;
}
@@ -222,14 +167,7 @@ class ReadRequests {
appendEntriesListeners.onAppendEntriesReply(appender, reply, hasMajority);
}
- Consumer<Long> getAppliedIndexConsumer() {
- return readIndexQueue::complete;
- }
-
- CompletableFuture<Long> waitToAdvance(long readIndex) {
- if (stateMachine.getLastAppliedTermIndex().getIndex() >= readIndex) {
- return CompletableFuture.completedFuture(readIndex);
- }
- return readIndexQueue.add(readIndex);
+ void failListeners(Exception e) {
+ appendEntriesListeners.failAll(e);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
index bbd2a92ba..aadc45c12 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -18,14 +18,9 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.leader.LogAppender;
-import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
@@ -33,131 +28,16 @@ import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
-import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
-import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
/** For supporting linearizable read. */
class ReadRequests {
private static final Logger LOG =
LoggerFactory.getLogger(ReadRequests.class);
- /** The acknowledgement from a {@link LogAppender} of a heartbeat for a
particular call id. */
- static class HeartbeatAck {
- private final LogAppender appender;
- private final long minCallId;
- private volatile boolean acknowledged = false;
-
- HeartbeatAck(LogAppender appender) {
- this.appender = appender;
- this.minCallId = appender.getCallId();
- }
-
- /** Is the heartbeat (for a particular call id) acknowledged? */
- boolean isAcknowledged() {
- return acknowledged;
- }
-
- /**
- * @return true if the acknowledged state is changed from false to true;
- * otherwise, the acknowledged state remains unchanged, return
false.
- */
- boolean receive(AppendEntriesReplyProto reply) {
- if (acknowledged) {
- return false;
- }
- synchronized (this) {
- if (!acknowledged && isValid(reply)) {
- acknowledged = true;
- return true;
- }
- return false;
- }
- }
-
- private boolean isValid(AppendEntriesReplyProto reply) {
- if (reply == null || !reply.getServerReply().getSuccess()) {
- return false;
- }
- // valid only if the reply has a later call id than the min.
- return
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(),
minCallId) >= 0;
- }
- }
-
- static class AppendEntriesListener {
- private final long commitIndex;
- private final CompletableFuture<Long> future = new CompletableFuture<>();
- private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new
ConcurrentHashMap<>();
-
- AppendEntriesListener(long commitIndex) {
- this.commitIndex = commitIndex;
- }
-
- CompletableFuture<Long> getFuture() {
- return future;
- }
-
- boolean receive(LogAppender logAppender, AppendEntriesReplyProto proto,
- Predicate<Predicate<RaftPeerId>> hasMajority) {
- if (isCompletedNormally()) {
- return true;
- }
-
- final HeartbeatAck reply = replies.computeIfAbsent(
- logAppender.getFollowerId(), key -> new HeartbeatAck(logAppender));
- if (reply.receive(proto)) {
- if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
- future.complete(commitIndex);
- return true;
- }
- }
-
- return isCompletedNormally();
- }
-
- boolean isCompletedNormally() {
- return future.isDone() && !future.isCancelled() &&
!future.isCompletedExceptionally();
- }
- }
-
- class AppendEntriesListeners {
- private final NavigableMap<Long, AppendEntriesListener> sorted = new
TreeMap<>();
-
- synchronized AppendEntriesListener add(long commitIndex, Function<Long,
AppendEntriesListener> constructor) {
- return sorted.computeIfAbsent(commitIndex, constructor);
- }
-
- synchronized void onAppendEntriesReply(LogAppender appender,
AppendEntriesReplyProto reply,
- Predicate<Predicate<RaftPeerId>>
hasMajority) {
- final long callId = reply.getServerReply().getCallId();
-
- Iterator<Map.Entry<Long, AppendEntriesListener>> iterator =
sorted.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, AppendEntriesListener> entry = iterator.next();
- if (entry.getKey() > callId) {
- return;
- }
-
- final AppendEntriesListener listener = entry.getValue();
- if (listener == null) {
- continue;
- }
-
- if (listener.receive(appender, reply, hasMajority)) {
- ackedCommitIndex.updateToMax(listener.commitIndex, s ->
LOG.debug("{}: {}", ReadRequests.this, s));
- iterator.remove();
- }
- }
- }
- }
-
static class ReadIndexQueue {
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final NavigableMap<Long, CompletableFuture<Long>> sorted = new
ConcurrentSkipListMap<>();
@@ -199,8 +79,6 @@ class ReadRequests {
}
}
- private final AppendEntriesListeners appendEntriesListeners = new
AppendEntriesListeners();
- private final RaftLogIndex ackedCommitIndex = new
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
private final ReadIndexQueue readIndexQueue;
private final StateMachine stateMachine;
@@ -209,19 +87,6 @@ class ReadRequests {
this.stateMachine = stateMachine;
}
- AppendEntriesListener addAppendEntriesListener(long commitIndex,
- Function<Long,
AppendEntriesListener> constructor) {
- if (commitIndex <= ackedCommitIndex.get()) {
- return null;
- }
- return appendEntriesListeners.add(commitIndex, constructor);
- }
-
- void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto
reply,
- Predicate<Predicate<RaftPeerId>> hasMajority) {
- appendEntriesListeners.onAppendEntriesReply(appender, reply, hasMajority);
- }
-
Consumer<Long> getAppliedIndexConsumer() {
return readIndexQueue::complete;
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index f611ac4d1..61d43d85b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -124,32 +124,18 @@ public abstract class ReadOnlyRequestTests<CLUSTER
extends MiniRaftCluster>
RaftClientReply reply = client.io().send(incrementMessage);
Assert.assertTrue(reply.isSuccess());
- Semaphore canRead = new Semaphore(0);
- // this future will complete after 500 ms
- Thread thread = new Thread(() -> {
- try {
- RaftClientReply staleValueBefore = client.io()
+ CompletableFuture<RaftClientReply> result =
client.async().send(waitAndIncrementMessage);
+ Thread.sleep(100);
+
+ RaftClientReply staleValueBefore = client.io()
.sendStaleRead(queryMessage, 0, leaderId);
- Assert.assertEquals(retrieve(staleValueBefore), 1);
-
- canRead.acquire();
- // we still have to sleep for a while to guarantee that the async
write arrives at RaftServer
- Thread.sleep(100);
- // send a linearizable read request
- // linearizable read will wait the statemachine to advance
- RaftClientReply linearizableReadValue = client.io()
- .sendReadOnly(queryMessage);
- Assert.assertEquals(retrieve(linearizableReadValue), 2);
- }
- catch (Exception ignored) {}
- });
+ Assert.assertEquals(retrieve(staleValueBefore), 1);
- thread.start();
- CompletableFuture<RaftClientReply> result =
client.async().send(waitAndIncrementMessage);
- canRead.release();
- thread.join();
+ RaftClientReply linearizableReadValue = client.io()
+ .sendReadOnly(queryMessage);
+ Assert.assertEquals(retrieve(linearizableReadValue), 2);
}
} finally {
@@ -231,6 +217,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
leaderClient.io().send(incrementMessage);
leaderClient.async().send(waitAndIncrementMessage);
+ Thread.sleep(100);
RaftClientReply clientReply =
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
Assert.assertEquals(2, retrieve(clientReply));