szetszwo commented on code in PR #1311:
URL: https://github.com/apache/ratis/pull/1311#discussion_r2518897416


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -79,35 +90,36 @@ private void handleTimeout(long readIndex) {
 
 
     /** Complete all the entries less than or equal to the given applied 
index. */
-    synchronized void complete(Long appliedIndex) {
+    synchronized void complete(long appliedIndex) {
+      if (appliedIndex > lastAppliedIndex) {
+        lastAppliedIndex = appliedIndex;
+      } else {
+        // appliedIndex <= lastAppliedIndex: nothing to do
+        if (!sorted.isEmpty()) {
+          // Assert: all keys > lastAppliedIndex.
+          final long first = sorted.firstKey();
+          Preconditions.assertTrue(first > lastAppliedIndex,
+              () -> "first = " + first + " <= lastAppliedIndex = " + 
lastAppliedIndex);
+        }
+        return;
+      }
       final NavigableMap<Long, CompletableFuture<Long>> headMap = 
sorted.headMap(appliedIndex, true);
       headMap.values().forEach(f -> f.complete(appliedIndex));
       headMap.clear();
     }
   }
 
   private final ReadIndexQueue readIndexQueue;
-  private final StateMachine stateMachine;
 
-  ReadRequests(RaftProperties properties, StateMachine stateMachine) {
-    this.readIndexQueue = new 
ReadIndexQueue(RaftServerConfigKeys.Read.timeout(properties));
-    this.stateMachine = stateMachine;
+  ReadRequests(long appliedIndex, RaftProperties properties) {
+    this.readIndexQueue = new ReadIndexQueue(appliedIndex, 
RaftServerConfigKeys.Read.timeout(properties));
   }
 
-  Consumer<Long> getAppliedIndexConsumer() {
+  LongConsumer getAppliedIndexConsumer() {
     return readIndexQueue::complete;
   }
 
   CompletableFuture<Long> waitToAdvance(long readIndex) {
-    final long lastApplied = stateMachine.getLastAppliedTermIndex().getIndex();
-    if (lastApplied >= readIndex) {
-      return CompletableFuture.completedFuture(lastApplied);
-    }
-    final CompletableFuture<Long> f = readIndexQueue.add(readIndex);
-    final long current = stateMachine.getLastAppliedTermIndex().getIndex();
-    if (current > lastApplied) {
-      readIndexQueue.complete(current);
-    }

Review Comment:
   @SzyWilliam , thanks for reminding me about RATIS-1927!
   
   In this pr, the check is moved to a synchronized block in 
ReadIndexQueue.add(..) and `lastAppliedIndex` is also updated synchronously.  
So, it won't have a race condition anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to