This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 39cbca5ed RATIS-1927. Use double check to eliminate data race in 
ReadRequests. (#958)
39cbca5ed is described below

commit 39cbca5ed386859c4915a9d774e428e037a34a2f
Author: William Song <[email protected]>
AuthorDate: Tue Nov 7 00:56:31 2023 -0600

    RATIS-1927. Use double check to eliminate data race in ReadRequests. (#958)
---
 .../org/apache/ratis/server/impl/ReadRequests.java | 67 +++++++++++++---------
 1 file changed, 40 insertions(+), 27 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
index aadc45c12..e63a23a0b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -19,19 +19,16 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.exceptions.ReadException;
-import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.NavigableMap;
-import java.util.Optional;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Consumer;
 
 /** For supporting linearizable read. */
@@ -40,7 +37,7 @@ class ReadRequests {
 
   static class ReadIndexQueue {
     private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
-    private final NavigableMap<Long, CompletableFuture<Long>> sorted = new 
ConcurrentSkipListMap<>();
+    private final NavigableMap<Long, CompletableFuture<Long>> sorted = new 
TreeMap<>();
     private final TimeDuration readTimeout;
 
     ReadIndexQueue(TimeDuration readTimeout) {
@@ -48,34 +45,44 @@ class ReadRequests {
     }
 
     CompletableFuture<Long> add(long readIndex) {
-      final MemoizedSupplier<CompletableFuture<Long>> supplier = 
MemoizedSupplier.valueOf(CompletableFuture::new);
-      final CompletableFuture<Long> f = sorted.computeIfAbsent(readIndex, i -> 
supplier.get());
+      final CompletableFuture<Long> returned;
+      final boolean create;
+      synchronized (this) {
+        // The same as computeIfAbsent except that it also tells if a new 
value is created.
+        final CompletableFuture<Long> existing = sorted.get(readIndex);
+        create = existing == null;
+        if (create) {
+          returned = new CompletableFuture<>();
+          sorted.put(readIndex, returned);
+        } else {
+          returned = existing;
+        }
+      }
 
-      if (supplier.isInitialized()) {
+      if (create) {
         scheduler.onTimeout(readTimeout, () -> handleTimeout(readIndex),
             LOG, () -> "Failed to handle read timeout for index " + readIndex);
       }
-      return f;
+      return returned;
     }
 
     private void handleTimeout(long readIndex) {
-      Optional.ofNullable(sorted.remove(readIndex)).ifPresent(consumer -> {
-        consumer.completeExceptionally(
-          new ReadException(new TimeoutIOException("Read timeout for index " + 
readIndex)));
-      });
+      final CompletableFuture<Long> removed;
+      synchronized (this) {
+        removed = sorted.remove(readIndex);
+      }
+      if (removed == null) {
+        return;
+      }
+      removed.completeExceptionally(new ReadException("Read timeout " + 
readTimeout + " for index " + readIndex));
     }
 
-    void complete(Long appliedIndex) {
-      for(;;) {
-        if (sorted.isEmpty()) {
-          return;
-        }
-        final Long first = sorted.firstKey();
-        if (first == null || first > appliedIndex) {
-          return;
-        }
-        Optional.ofNullable(sorted.remove(first)).ifPresent(f -> 
f.complete(appliedIndex));
-      }
+
+    /** Complete all the entries less than or equal to the given applied 
index. */
+    synchronized void complete(Long appliedIndex) {
+      final NavigableMap<Long, CompletableFuture<Long>> headMap = 
sorted.headMap(appliedIndex, true);
+      headMap.values().forEach(f -> f.complete(appliedIndex));
+      headMap.clear();
     }
   }
 
@@ -92,9 +99,15 @@ class ReadRequests {
   }
 
   CompletableFuture<Long> waitToAdvance(long readIndex) {
-    if (stateMachine.getLastAppliedTermIndex().getIndex() >= readIndex) {
-      return CompletableFuture.completedFuture(readIndex);
+    final long lastApplied = stateMachine.getLastAppliedTermIndex().getIndex();
+    if (lastApplied >= readIndex) {
+      return CompletableFuture.completedFuture(lastApplied);
+    }
+    final CompletableFuture<Long> f = readIndexQueue.add(readIndex);
+    final long current = stateMachine.getLastAppliedTermIndex().getIndex();
+    if (current > lastApplied) {
+      readIndexQueue.complete(current);
     }
-    return readIndexQueue.add(readIndex);
+    return f;
   }
 }

Reply via email to