This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 39cbca5ed RATIS-1927. Use double check to eliminate data race in
ReadRequests. (#958)
39cbca5ed is described below
commit 39cbca5ed386859c4915a9d774e428e037a34a2f
Author: William Song <[email protected]>
AuthorDate: Tue Nov 7 00:56:31 2023 -0600
RATIS-1927. Use double check to eliminate data race in ReadRequests. (#958)
---
.../org/apache/ratis/server/impl/ReadRequests.java | 67 +++++++++++++---------
1 file changed, 40 insertions(+), 27 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
index aadc45c12..e63a23a0b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -19,19 +19,16 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.exceptions.ReadException;
-import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.NavigableMap;
-import java.util.Optional;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
/** For supporting linearizable read. */
@@ -40,7 +37,7 @@ class ReadRequests {
static class ReadIndexQueue {
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
- private final NavigableMap<Long, CompletableFuture<Long>> sorted = new
ConcurrentSkipListMap<>();
+ private final NavigableMap<Long, CompletableFuture<Long>> sorted = new
TreeMap<>();
private final TimeDuration readTimeout;
ReadIndexQueue(TimeDuration readTimeout) {
@@ -48,34 +45,44 @@ class ReadRequests {
}
CompletableFuture<Long> add(long readIndex) {
- final MemoizedSupplier<CompletableFuture<Long>> supplier =
MemoizedSupplier.valueOf(CompletableFuture::new);
- final CompletableFuture<Long> f = sorted.computeIfAbsent(readIndex, i ->
supplier.get());
+ final CompletableFuture<Long> returned;
+ final boolean create;
+ synchronized (this) {
+ // The same as computeIfAbsent except that it also tells if a new
value is created.
+ final CompletableFuture<Long> existing = sorted.get(readIndex);
+ create = existing == null;
+ if (create) {
+ returned = new CompletableFuture<>();
+ sorted.put(readIndex, returned);
+ } else {
+ returned = existing;
+ }
+ }
- if (supplier.isInitialized()) {
+ if (create) {
scheduler.onTimeout(readTimeout, () -> handleTimeout(readIndex),
LOG, () -> "Failed to handle read timeout for index " + readIndex);
}
- return f;
+ return returned;
}
private void handleTimeout(long readIndex) {
- Optional.ofNullable(sorted.remove(readIndex)).ifPresent(consumer -> {
- consumer.completeExceptionally(
- new ReadException(new TimeoutIOException("Read timeout for index " +
readIndex)));
- });
+ final CompletableFuture<Long> removed;
+ synchronized (this) {
+ removed = sorted.remove(readIndex);
+ }
+ if (removed == null) {
+ return;
+ }
+ removed.completeExceptionally(new ReadException("Read timeout " +
readTimeout + " for index " + readIndex));
}
- void complete(Long appliedIndex) {
- for(;;) {
- if (sorted.isEmpty()) {
- return;
- }
- final Long first = sorted.firstKey();
- if (first == null || first > appliedIndex) {
- return;
- }
- Optional.ofNullable(sorted.remove(first)).ifPresent(f ->
f.complete(appliedIndex));
- }
+
+ /** Complete all the entries less than or equal to the given applied
index. */
+ synchronized void complete(Long appliedIndex) {
+ final NavigableMap<Long, CompletableFuture<Long>> headMap =
sorted.headMap(appliedIndex, true);
+ headMap.values().forEach(f -> f.complete(appliedIndex));
+ headMap.clear();
}
}
@@ -92,9 +99,15 @@ class ReadRequests {
}
CompletableFuture<Long> waitToAdvance(long readIndex) {
- if (stateMachine.getLastAppliedTermIndex().getIndex() >= readIndex) {
- return CompletableFuture.completedFuture(readIndex);
+ final long lastApplied = stateMachine.getLastAppliedTermIndex().getIndex();
+ if (lastApplied >= readIndex) {
+ return CompletableFuture.completedFuture(lastApplied);
+ }
+ final CompletableFuture<Long> f = readIndexQueue.add(readIndex);
+ final long current = stateMachine.getLastAppliedTermIndex().getIndex();
+ if (current > lastApplied) {
+ readIndexQueue.complete(current);
}
- return readIndexQueue.add(readIndex);
+ return f;
}
}