SzyWilliam commented on code in PR #1311:
URL: https://github.com/apache/ratis/pull/1311#discussion_r2518442047
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -79,35 +90,36 @@ private void handleTimeout(long readIndex) {
/** Complete all the entries less than or equal to the given applied
index. */
- synchronized void complete(Long appliedIndex) {
+ synchronized void complete(long appliedIndex) {
+ if (appliedIndex > lastAppliedIndex) {
+ lastAppliedIndex = appliedIndex;
+ } else {
+ // appliedIndex <= lastAppliedIndex: nothing to do
+ if (!sorted.isEmpty()) {
+ // Assert: all keys > lastAppliedIndex.
+ final long first = sorted.firstKey();
+ Preconditions.assertTrue(first > lastAppliedIndex,
+ () -> "first = " + first + " <= lastAppliedIndex = " +
lastAppliedIndex);
+ }
+ return;
+ }
final NavigableMap<Long, CompletableFuture<Long>> headMap =
sorted.headMap(appliedIndex, true);
headMap.values().forEach(f -> f.complete(appliedIndex));
headMap.clear();
}
}
private final ReadIndexQueue readIndexQueue;
- private final StateMachine stateMachine;
- ReadRequests(RaftProperties properties, StateMachine stateMachine) {
- this.readIndexQueue = new
ReadIndexQueue(RaftServerConfigKeys.Read.timeout(properties));
- this.stateMachine = stateMachine;
+ ReadRequests(long appliedIndex, RaftProperties properties) {
+ this.readIndexQueue = new ReadIndexQueue(appliedIndex,
RaftServerConfigKeys.Read.timeout(properties));
}
- Consumer<Long> getAppliedIndexConsumer() {
+ LongConsumer getAppliedIndexConsumer() {
return readIndexQueue::complete;
}
CompletableFuture<Long> waitToAdvance(long readIndex) {
- final long lastApplied = stateMachine.getLastAppliedTermIndex().getIndex();
- if (lastApplied >= readIndex) {
- return CompletableFuture.completedFuture(lastApplied);
- }
- final CompletableFuture<Long> f = readIndexQueue.add(readIndex);
- final long current = stateMachine.getLastAppliedTermIndex().getIndex();
- if (current > lastApplied) {
- readIndexQueue.complete(current);
- }
Review Comment:
I vaguely remember this double check is to prevent race condition, see
https://github.com/apache/ratis/pull/958. We may still face a race condition
here because `ReadIndexQueue.add` is not totally synchronized. How about moving
the entire `ReadIndexQueue.add` to synchronzied?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]