szetszwo commented on code in PR #735:
URL: https://github.com/apache/ratis/pull/735#discussion_r962880905


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -129,28 +142,75 @@ synchronized AppendEntriesListener add(long commitIndex, 
Function<Long, AppendEn
     synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply,
                                            Predicate<Predicate<RaftPeerId>> 
hasMajority) {
       final long callId = reply.getServerReply().getCallId();
-      for (;;) {
-        final Map.Entry<Long, AppendEntriesListener> first = 
sorted.firstEntry();
-        if (first == null || first.getKey() > callId) {
+      for (Map.Entry<Long, AppendEntriesListener> entry : sorted.entrySet()) {

Review Comment:
   It will lead to `ConcurrentModificationException` when an entry is removed.



##########
ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadOnlyException.java:
##########
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+/**
+ * This exception indicates the failure of a read-only request.
+ */
+public class ReadOnlyException extends RaftException {

Review Comment:
   Let's call it `ReadException`.  The term `ReadOnlyException` sounds like 
that the request is to write but the file is readonly.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -129,28 +142,75 @@ synchronized AppendEntriesListener add(long commitIndex, 
Function<Long, AppendEn
     synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply,
                                            Predicate<Predicate<RaftPeerId>> 
hasMajority) {
       final long callId = reply.getServerReply().getCallId();
-      for (;;) {
-        final Map.Entry<Long, AppendEntriesListener> first = 
sorted.firstEntry();
-        if (first == null || first.getKey() > callId) {
+      for (Map.Entry<Long, AppendEntriesListener> entry : sorted.entrySet()) {
+        if (entry.getKey() > callId) {
           return;
         }
 
-        final AppendEntriesListener listener = first.getValue();
+        final AppendEntriesListener listener = entry.getValue();
         if (listener == null) {
           continue;
         }
 
         if (listener.receive(reply, hasMajority)) {
-          final AppendEntriesListener removed = sorted.remove(callId);
+          final AppendEntriesListener removed = sorted.remove(entry.getKey());
           Preconditions.assertSame(listener, removed, "AppendEntriesListener");
           ackedCommitIndex.updateToMax(listener.commitIndex, s -> 
LOG.debug("{}: {}", ReadRequests.this, s));
         }
       }
     }
   }
 
+  static class ReadIndexQueue {
+    private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();

Review Comment:
   Please use `TimeoutExecutor.getInstance()`.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -129,28 +142,75 @@ synchronized AppendEntriesListener add(long commitIndex, 
Function<Long, AppendEn
     synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply,
                                            Predicate<Predicate<RaftPeerId>> 
hasMajority) {
       final long callId = reply.getServerReply().getCallId();
-      for (;;) {
-        final Map.Entry<Long, AppendEntriesListener> first = 
sorted.firstEntry();
-        if (first == null || first.getKey() > callId) {
+      for (Map.Entry<Long, AppendEntriesListener> entry : sorted.entrySet()) {
+        if (entry.getKey() > callId) {
           return;
         }
 
-        final AppendEntriesListener listener = first.getValue();
+        final AppendEntriesListener listener = entry.getValue();
         if (listener == null) {
           continue;
         }
 
         if (listener.receive(reply, hasMajority)) {
-          final AppendEntriesListener removed = sorted.remove(callId);
+          final AppendEntriesListener removed = sorted.remove(entry.getKey());
           Preconditions.assertSame(listener, removed, "AppendEntriesListener");
           ackedCommitIndex.updateToMax(listener.commitIndex, s -> 
LOG.debug("{}: {}", ReadRequests.this, s));
         }
       }
     }
   }
 
+  static class ReadIndexQueue {
+    private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+    private final NavigableMap<Long, CompletableFuture<Long>> sorted = new 
ConcurrentSkipListMap<>();
+    private final TimeDuration readTimeout;
+
+    ReadIndexQueue(TimeDuration readTimeout) {
+      this.readTimeout = readTimeout;
+    }
+
+    CompletableFuture<Long> add(long readIndex) {
+      final MemoizedSupplier<CompletableFuture<Long>> supplier = 
MemoizedSupplier.valueOf(CompletableFuture::new);
+      final CompletableFuture<Long> f = sorted.computeIfAbsent(readIndex, i -> 
supplier.get());
+
+      if (supplier.isInitialized()) {
+        scheduler.onTimeout(readTimeout, () -> handleTimeout(readIndex),
+            LOG, () -> "Failed to handle read timeout for index " + readIndex);
+      }
+      return f;
+    }
+
+    private void handleTimeout(long readIndex) {
+      Optional.ofNullable(sorted.remove(readIndex)).ifPresent(consumer -> {
+        consumer.completeExceptionally(
+          new ReadOnlyException(new TimeoutIOException("Read timeout for index 
" + readIndex)));
+      });
+    }
+
+    synchronized void complete(Long appliedIndex) {

Review Comment:
   `synchronized` is not needed here.  `ConcurrentSkipListMap` can handle all 
the operations below.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java:
##########
@@ -220,6 +224,7 @@ private void reload() throws IOException {
     final long i = snapshot.getIndex();
     snapshotIndex.setUnconditionally(i, infoIndexChange);
     appliedIndex.setUnconditionally(i, infoIndexChange);
+    notifyAppliedIndex(appliedIndex.get());

Review Comment:
   Use `i`.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java:
##########
@@ -242,6 +247,7 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> 
applyLog() throws Raf
         }
         final long incremented = 
appliedIndex.incrementAndGet(debugIndexChange);
         Preconditions.assertTrue(incremented == nextIndex);
+        notifyAppliedIndex(appliedIndex.get());

Review Comment:
   We should wait for `f`:
   ```java
           final CompletableFuture<Message> f = 
server.applyLogToStateMachine(next);
           final long incremented = 
appliedIndex.incrementAndGet(debugIndexChange);
           Preconditions.assertTrue(incremented == nextIndex);
           if (f != null) {
             futures.get().add(f);
             f.thenAccept(m -> notifyAppliedIndex(incremented));
           }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -907,6 +910,43 @@ private CompletableFuture<RaftClientReply> 
staleReadAsync(RaftClientRequest requ
     return processQueryFuture(stateMachine.queryStale(request.getMessage(), 
minIndex), request);
   }
 
+  ReadRequests getReadRequests() {
+    return getState().getReadRequests();
+  }
+
+  private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest 
request) {
+    if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+      /*
+        Linearizable read using ReadIndex. See Raft paper section 6.4.
+        1. First obtain readIndex from Leader.
+        2. Then waits for statemachine to advance at least as far as readIndex.
+        3. Finally, query the statemachine and return the result.
+       */
+      final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+      // TODO support follower linearizable read
+      if (leader == null) {
+        return JavaUtils.completeExceptionally(generateNotLeaderException());
+      }
+      return leader.getReadIndex()
+          .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+          .thenCompose(readIndex -> queryStateMachine(request))
+          .exceptionally(e -> readOnlyException2Reply(request, e));
+    }
+    // default
+    return queryStateMachine(request);

Review Comment:
   Let's use if-statements to check all the options and  throw 
IllegalStateException at the end.
   ```
       throw new IllegalStateException("Unexpected read option: " + readOption);
   ```
   



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -1097,7 +1097,6 @@ CompletableFuture<Long> getReadIndex() {
 
     if (supplier.isInitialized()) {
       senders.forEach(sender -> {
-        listener.init(sender);

Review Comment:
   It is an good idea to remove `init(..)`.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -104,7 +117,7 @@ boolean receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>>
       }
       final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
       final HeartbeatAck reply = 
replies.get(RaftPeerId.valueOf(rpc.getReplyId()));

Review Comment:
   Let's pass LogAppender
   ```java
   -    boolean receive(AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
   +    boolean receive(LogAppender appender, AppendEntriesReplyProto proto, 
Predicate<Predicate<RaftPeerId>> hasMajority) {
   ```
   and then use computeIfAbsent:
   ```java
         final HeartbeatAck reply = 
replies.computeIfAbsent(appender.getFollowerId(), key -> new 
HeartbeatAck(appender));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to