szetszwo commented on code in PR #1444:
URL: https://github.com/apache/ratis/pull/1444#discussion_r3237626570
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1092,12 +1092,11 @@ private CompletableFuture<RaftClientReply>
staleReadAsync(RaftClientRequest requ
}
return processQueryFuture(stateMachine.queryStale(request.getMessage(),
minIndex), request);
}
-
- ReadRequests getReadRequests() {
- return getState().getReadRequests();
- }
-
private CompletableFuture<ReadIndexReplyProto>
sendReadIndexAsync(RaftClientRequest clientRequest) {
+ final Throwable snapshotInstallation =
snapshotInstallationHandler.getInProgressInstallSnapshotReadException();
Review Comment:
Let's create the ReadException with a different message. It will be easier
to debug later.
```java
ReadException newReadException(String op, long installSnapshot, boolean
started) {
return new ReadException(getMemberId() + ": Failed to " + op + "
readIndex as snapshot (" + installSnapshot
+ ") installation is " + (started ? "started" : "in progress"));
}
private CompletableFuture<ReadIndexReplyProto>
sendReadIndexAsync(RaftClientRequest clientRequest) {
final long installSnapshot =
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
if (installSnapshot != RaftLog.INVALID_LOG_INDEX) {
return JavaUtils.completeExceptionally(newReadException("get",
installSnapshot, false));
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1146,7 +1145,8 @@ private CompletableFuture<RaftClientReply>
readAsync(RaftClientRequest request)
}
return replyFuture
- .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+ .thenCompose(readIndex ->
getState().getReadRequests().waitToAdvance(readIndex,
+
snapshotInstallationHandler::getInProgressInstallSnapshotReadException))
Review Comment:
Let's add a waitReadIndex method and check there.
```java
.thenCompose(this::waitReadIndex)
```
```java
private CompletableFuture<Long> waitReadIndex(long readIndex) {
final long installSnapshot =
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
if (installSnapshot != RaftLog.INVALID_LOG_INDEX) {
return JavaUtils.completeExceptionally(newReadException("start waiting
for", installSnapshot, false));
}
return getState().getReadRequests().waitToAdvance(readIndex);
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -88,6 +101,14 @@ private void handleTimeout(long readIndex) {
removed.completeExceptionally(new ReadException("Read timeout " +
readTimeout + " for index " + readIndex));
}
+ void fail(Throwable cause) {
+ final Collection<CompletableFuture<Long>> futures;
+ synchronized (this) {
+ futures = new ArrayList<>(sorted.values());
+ sorted.clear();
Review Comment:
- Let's make sorted non-final in order to avoid the copying.
- We may return the futures here and fail them in ReadRequests.fail(..)..
```java
//ReadIndexQueue
synchronized Collection<CompletableFuture<Long>> clear(Throwable cause) {
final Collection<CompletableFuture<Long>> futures = sorted.values();
sorted = new TreeMap<>();
return futures;
}
```
```java
//ReadRequests
void fail(Throwable cause) {
for(CompletableFuture<Long> f : readIndexQueue.clear(cause)) {
f.completeExceptionally(cause);
}
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -20,21 +20,30 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;
+import java.util.function.Supplier;
/** For supporting linearizable read. */
class ReadRequests {
private static final Logger LOG =
LoggerFactory.getLogger(ReadRequests.class);
+ static ReadException newException(Object server, long installSnapshot) {
+ return new ReadException(server + ": Failed read as snapshot (" +
installSnapshot
+ + ") installation is in progress");
+ }
Review Comment:
Let's move this method to `RaftServerImpl.newReadException` as mentioned
earlier.
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -52,10 +61,14 @@ static class ReadIndexQueue {
this.readTimeout = readTimeout;
}
- CompletableFuture<Long> add(long readIndex) {
+ CompletableFuture<Long> add(long readIndex, Supplier<Throwable>
failureSupplier) {
final CompletableFuture<Long> returned;
final boolean create;
synchronized (this) {
+ final Throwable failure = failureSupplier.get();
+ if (failure != null) {
+ return JavaUtils.completeExceptionally(failure);
+ }
Review Comment:
This can be checked in RaftServerImpl.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]