This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit a0b14ecbeeee78b842c89878b834fad938a7f3ae
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 13 10:28:47 2022 +0800

    RATIS-1714. Support unordered async read. (#755)
    
    (cherry picked from commit 2a3482ed1c90c2332faa852883c168f2608da1ad)
---
 .../java/org/apache/ratis/client/api/AsyncApi.java |  20 +++-
 .../org/apache/ratis/client/api/BlockingApi.java   |   4 +-
 .../org/apache/ratis/client/impl/AsyncImpl.java    |   7 +-
 .../apache/ratis/client/impl/UnorderedAsync.java   |   7 +-
 .../java/org/apache/ratis/protocol/Message.java    |   5 +
 .../apache/ratis/examples/filestore/FileInfo.java  |  18 ++++
 .../apache/ratis/examples/filestore/FileStore.java |  33 ++++--
 .../ratis/examples/filestore/FileStoreClient.java  |  53 +++++++---
 .../examples/filestore/FileStoreStateMachine.java  |   3 +-
 .../examples/filestore/FileStoreBaseTest.java      | 112 +++++++++++++++++----
 .../examples/filestore/TestFileStoreWithGrpc.java  |   2 +-
 .../examples/filestore/TestFileStoreWithNetty.java |   7 +-
 ratis-proto/src/main/proto/Examples.proto          |   1 +
 13 files changed, 218 insertions(+), 54 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 1e8da9bde..ef96d2a4b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -26,8 +26,8 @@ import org.apache.ratis.protocol.RaftPeerId;
 /**
  * Asynchronous API to support operations
  * such as sending message, read-message, stale-read-message and watch-request.
- *
- * Note that this API and {@link BlockingApi} support the same set of 
operations.
+ * <p>
+ * Note that this API supports all the operations in {@link BlockingApi}.
  */
 public interface AsyncApi {
   /**
@@ -42,12 +42,28 @@ public interface AsyncApi {
 
   /**
    * Send the given readonly message asynchronously to the raft service.
+   * Note that the reply futures are completed in the same order of the 
messages being sent.
    *
    * @param message The request message.
    * @return a future of the reply.
    */
   CompletableFuture<RaftClientReply> sendReadOnly(Message message);
 
+  /** The same as sendReadOnlyUnordered(message, null). */
+  default CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message 
message) {
+    return sendReadOnlyUnordered(message, null);
+  }
+
+  /**
+   * Send the given readonly message asynchronously to the raft service.
+   * Note that the reply futures can be completed in any order.
+   *
+   * @param message The request message.
+   * @param server The target server.  When server == null, send the message 
to the leader.
+   * @return a future of the reply.
+   */
+  CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message, 
RaftPeerId server);
+
   /**
    * Send the given stale-read message asynchronously to the given server (not 
the raft service).
    * If the server commit index is larger than or equal to the given 
min-index, the request will be processed.
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index 84f9740e2..1f6237b49 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -26,8 +26,8 @@ import org.apache.ratis.protocol.RaftPeerId;
 /**
  * Blocking API to support operations
  * such as sending message, read-message, stale-read-message and watch-request.
- *
- * Note that this API and {@link AsyncApi} support the same set of operations.
+ * <p>
+ * Note that this API supports a subset of the operations in {@link AsyncApi}.
  */
 public interface BlockingApi {
   /**
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index e063eebe2..a78e5cc11 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -51,6 +51,11 @@ class AsyncImpl implements AsyncRpcApi {
     return send(RaftClientRequest.readRequestType(), message, null);
   }
 
+  @Override
+  public CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message 
message, RaftPeerId server) {
+    return UnorderedAsync.send(RaftClientRequest.readRequestType(), message, 
server, client);
+  }
+
   @Override
   public CompletableFuture<RaftClientReply> sendStaleRead(Message message, 
long minIndex, RaftPeerId server) {
     return send(RaftClientRequest.staleReadRequestType(minIndex), message, 
server);
@@ -58,7 +63,7 @@ class AsyncImpl implements AsyncRpcApi {
 
   @Override
   public CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel 
replication) {
-    return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, 
replication), client);
+    return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, 
replication), null, null, client);
   }
 
   @Override
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 432acce4d..b053df172 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -20,6 +20,8 @@ package org.apache.ratis.client.impl;
 import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -54,10 +56,11 @@ public interface UnorderedAsync {
     }
   }
 
-  static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, 
RaftClientImpl client) {
+  static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, 
Message message, RaftPeerId server,
+      RaftClientImpl client) {
     final long callId = CallId.getAndIncrement();
     final PendingClientRequest pending = new PendingUnorderedRequest(
-        () -> client.newRaftClientRequest(null, callId, null, type, null));
+        () -> client.newRaftClientRequest(server, callId, message, type, 
null));
     sendRequestWithRetry(pending, client);
     return pending.getReplyFuture()
         .thenApply(reply -> RaftClientImpl.handleRaftException(reply, 
CompletionException::new));
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index b47f43074..e7ea97ca4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.StringUtils;
@@ -45,6 +46,10 @@ public interface Message {
     };
   }
 
+  static Message valueOf(AbstractMessage abstractMessage) {
+    return valueOf(abstractMessage.toByteString(), abstractMessage::toString);
+  }
+
   static Message valueOf(ByteString bytes) {
     return valueOf(bytes, () -> "Message:" + 
StringUtils.bytes2HexShortString(bytes));
   }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index 37c982bcf..c7d8cb7cd 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -93,6 +93,24 @@ abstract class FileInfo {
         "File " + getRelativePath() + " is not under construction.");
   }
 
+  static class Watch extends FileInfo {
+    private final CompletableFuture<UnderConstruction> future = new 
CompletableFuture<>();
+
+    Watch(Path relativePath) {
+      super(relativePath);
+    }
+
+    CompletableFuture<UnderConstruction> getFuture() {
+      return future;
+    }
+
+    CompletableFuture<UnderConstruction> complete(UnderConstruction uc) {
+      Preconditions.assertTrue(getRelativePath().equals(uc.getRelativePath()));
+      future.complete(uc);
+      return future;
+    }
+  }
+
   static class ReadOnly extends FileInfo {
     private final long committedSize;
     private final long writeSize;
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index 50206627b..04ff64bd8 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -72,6 +72,14 @@ public class FileStore implements Closeable {
       return applyFunction(relative, map::get);
     }
 
+    FileInfo watch(String relative) {
+      try {
+        return applyFunction(relative, p -> map.computeIfAbsent(p, 
FileInfo.Watch::new));
+      } catch (FileNotFoundException e) {
+        throw new IllegalStateException("Failed to watch " + relative, e);
+      }
+    }
+
     FileInfo remove(String relative) throws FileNotFoundException {
       LOG.trace("{}: remove {}", name, relative);
       return applyFunction(relative, map::remove);
@@ -88,7 +96,10 @@ public class FileStore implements Closeable {
 
     void putNew(UnderConstruction uc) {
       LOG.trace("{}: putNew {}", name, uc.getRelativePath());
-      CollectionUtils.putNew(uc.getRelativePath(), uc, map, name::toString);
+      final FileInfo previous = map.put(uc.getRelativePath(), uc);
+      if (previous instanceof FileInfo.Watch) {
+        ((FileInfo.Watch) previous).complete(uc);
+      }
     }
 
     ReadOnly close(UnderConstruction uc) {
@@ -171,6 +182,17 @@ public class FileStore implements Closeable {
     return full;
   }
 
+  CompletableFuture<ReadReplyProto> watch(String relative) {
+    final FileInfo info = files.watch(relative);
+    final ReadReplyProto reply = ReadReplyProto.newBuilder()
+        .setResolvedPath(FileStoreCommon.toByteString(info.getRelativePath()))
+        .build();
+    if (info instanceof FileInfo.Watch) {
+      return ((FileInfo.Watch) info).getFuture().thenApply(uc -> reply);
+    }
+    return CompletableFuture.completedFuture(reply);
+  }
+
   CompletableFuture<ReadReplyProto> read(String relative, long offset, long 
length, boolean readCommitted) {
     final Supplier<String> name = () -> "read(" + relative
         + ", " + offset + ", " + length + ") @" + getId();
@@ -262,15 +284,12 @@ public class FileStore implements Closeable {
 
   CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long 
bytesWritten) {
     return CompletableFuture.supplyAsync(() -> {
-      long len = 0;
-      try {
-        final Path full = resolve(normalize(p));
-        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+      final long len;
+      try (RandomAccessFile file = new 
RandomAccessFile(resolve(normalize(p)).toFile(), "r")) {
         len = file.length();
         return StreamWriteReplyProto.newBuilder().setIsSuccess(len == 
bytesWritten).setByteWritten(len).build();
       } catch (IOException e) {
-        throw new CompletionException("Failed to commit stream write on file:" 
+ p +
-        ", expected written bytes:" + bytesWritten + ", actual written bytes:" 
+ len, e);
+        throw new CompletionException("Failed to commit stream " + p + " with 
" + bytesWritten + " B.", e);
       }
     }, committer);
   }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 9e91602d1..c223b100b 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -83,9 +83,9 @@ public class FileStoreClient implements Closeable {
   }
 
   static ByteString send(
-      ByteString request, CheckedFunction<Message, RaftClientReply, 
IOException> sendFunction)
+      Message request, CheckedFunction<Message, RaftClientReply, IOException> 
sendFunction)
       throws IOException {
-    final RaftClientReply reply = sendFunction.apply(Message.valueOf(request));
+    final RaftClientReply reply = sendFunction.apply(request);
     final StateMachineException sme = reply.getStateMachineException();
     if (sme != null) {
       throw new IOException("Failed to send request " + request, sme);
@@ -95,9 +95,8 @@ public class FileStoreClient implements Closeable {
   }
 
   static CompletableFuture<ByteString> sendAsync(
-      ByteString request, Function<Message, 
CompletableFuture<RaftClientReply>> sendFunction) {
-    return sendFunction.apply(() -> request
-    ).thenApply(reply -> {
+      Message request, Function<Message, CompletableFuture<RaftClientReply>> 
sendFunction) {
+    return sendFunction.apply(request).thenApply(reply -> {
       final StateMachineException sme = reply.getStateMachineException();
       if (sme != null) {
         throw new CompletionException("Failed to send request " + request, 
sme);
@@ -107,19 +106,19 @@ public class FileStoreClient implements Closeable {
     });
   }
 
-  private ByteString send(ByteString request) throws IOException {
+  private ByteString send(Message request) throws IOException {
     return send(request, client.io()::send);
   }
 
-  private ByteString sendReadOnly(ByteString request) throws IOException {
+  private ByteString sendReadOnly(Message request) throws IOException {
     return send(request, client.io()::sendReadOnly);
   }
 
-  private CompletableFuture<ByteString> sendAsync(ByteString request) {
+  private CompletableFuture<ByteString> sendAsync(Message request) {
     return sendAsync(request, client.async()::send);
   }
 
-  private CompletableFuture<ByteString> sendReadOnlyAsync(ByteString request) {
+  private CompletableFuture<ByteString> sendReadOnlyAsync(Message request) {
     return sendAsync(request, client.async()::sendReadOnly);
   }
 
@@ -135,7 +134,7 @@ public class FileStoreClient implements Closeable {
   }
 
   private static <OUTPUT, THROWABLE extends Throwable> OUTPUT readImpl(
-      CheckedFunction<ByteString, OUTPUT, THROWABLE> sendReadOnlyFunction,
+      CheckedFunction<Message, OUTPUT, THROWABLE> sendReadOnlyFunction,
       String path, long offset, long length) throws THROWABLE {
     final ReadRequestProto read = ReadRequestProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path))
@@ -143,7 +142,31 @@ public class FileStoreClient implements Closeable {
         .setLength(length)
         .build();
 
-    return sendReadOnlyFunction.apply(read.toByteString());
+    return sendReadOnlyFunction.apply(Message.valueOf(read));
+  }
+
+  private CompletableFuture<ByteString> sendWatchAsync(Message request) {
+    return sendAsync(request, client.async()::sendReadOnlyUnordered);
+  }
+
+  /**
+   * Watch the path until it is created.
+   */
+  public CompletableFuture<ReadReplyProto> watchAsync(String path) {
+    return watchImpl(this::sendWatchAsync, path)
+        .thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
+            () -> ReadReplyProto.parseFrom(reply)));
+  }
+
+  private static <OUTPUT, THROWABLE extends Throwable> OUTPUT watchImpl(
+      CheckedFunction<Message, OUTPUT, THROWABLE> sendWatchFunction,
+      String path) throws THROWABLE {
+    final ReadRequestProto watch = ReadRequestProto.newBuilder()
+        .setPath(ProtoUtils.toByteString(path))
+        .setIsWatch(true)
+        .build();
+
+    return sendWatchFunction.apply(Message.valueOf(watch));
   }
 
   public long write(String path, long offset, boolean close, ByteBuffer 
buffer, boolean sync)
@@ -170,7 +193,7 @@ public class FileStoreClient implements Closeable {
   }
 
   private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
-      CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction,
+      CheckedFunction<Message, OUTPUT, THROWABLE> sendFunction,
       String path, long offset, boolean close, ByteBuffer data, boolean sync)
       throws THROWABLE {
     final WriteRequestHeaderProto.Builder header = 
WriteRequestHeaderProto.newBuilder()
@@ -185,16 +208,16 @@ public class FileStoreClient implements Closeable {
         .setData(ByteString.copyFrom(data));
 
     final FileStoreRequestProto request = 
FileStoreRequestProto.newBuilder().setWrite(write).build();
-    return sendFunction.apply(request.toByteString());
+    return sendFunction.apply(Message.valueOf(request));
   }
 
   private static <OUTPUT, THROWABLE extends Throwable> OUTPUT deleteImpl(
-      CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction, String path)
+      CheckedFunction<Message, OUTPUT, THROWABLE> sendFunction, String path)
       throws THROWABLE {
     final DeleteRequestProto.Builder delete = DeleteRequestProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path));
     final FileStoreRequestProto request = 
FileStoreRequestProto.newBuilder().setDelete(delete).build();
-    return sendFunction.apply(request.toByteString());
+    return sendFunction.apply(Message.valueOf(request));
   }
 
   public String delete(String path) throws IOException {
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 6ba761ebe..209baaf7d 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -85,7 +85,8 @@ public class FileStoreStateMachine extends BaseStateMachine {
     }
 
     final String path = proto.getPath().toStringUtf8();
-    return files.read(path, proto.getOffset(), proto.getLength(), true)
+    return (proto.getIsWatch()? files.watch(path)
+        : files.read(path, proto.getOffset(), proto.getLength(), true))
         .thenApply(reply -> Message.valueOf(reply.toByteString()));
   }
 
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
index c455aeb8e..5bcc50039 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -18,38 +18,34 @@
 package org.apache.ratis.examples.filestore;
 
 import org.apache.ratis.BaseTest;
-import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.ExamplesProtos.ReadReplyProto;
+import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.netty.util.internal.ThreadLocalRandom;
 import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
-import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
-import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
     extends BaseTest
@@ -66,14 +62,81 @@ public abstract class FileStoreBaseTest<CLUSTER extends 
MiniRaftCluster>
 
   static final int NUM_PEERS = 3;
 
+  FileStoreClient newFileStoreClient(CLUSTER cluster) throws IOException {
+    return new FileStoreClient(cluster.getGroup(), getProperties());
+  }
+
+  @Test
+  public void testWatch() throws Exception {
+    runWithNewCluster(NUM_PEERS, cluster -> runTestWatch(10, cluster));
+  }
+
+  void runTestWatch(int n, CLUSTER cluster) throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+
+    final AtomicBoolean isStarted = new AtomicBoolean();
+    final List<Integer> randomIndices = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      randomIndices.add(i);
+    }
+    Collections.shuffle(randomIndices);
+    LOG.info("randomIndices {}", randomIndices);
+    final List<Integer> completionOrder = new ArrayList<>();
+
+    final String pathFirst = "first";
+    final String pathSecond = "second";
+    final List<CompletableFuture<ReadReplyProto>> firstList = new 
ArrayList<>(n);
+    final List<CompletableFuture<ReadReplyProto>> watchSecond = new 
ArrayList<>(n);
+    try (FileStoreClient client = new FileStoreClient(cluster.getGroup(), 
getProperties())) {
+      for (int i = 0; i < n; i++) {
+        LOG.info("watchAsync {}", i);
+        final int index = i;
+        final CompletableFuture<ReadReplyProto> f = 
client.watchAsync(pathFirst + i).whenComplete((reply, e) -> {
+          throw new IllegalStateException(pathFirst + index + " should never 
be completed.");
+        });
+        firstList.add(f);
+        final CompletableFuture<ReadReplyProto> s = 
client.watchAsync(pathSecond + i).whenComplete((reply, e) -> {
+          Assert.assertNotNull(reply);
+          Assert.assertNull(e);
+          Assert.assertTrue(isStarted.get());
+          completionOrder.add(index);
+        });
+        watchSecond.add(s);
+        Assert.assertFalse(f.isDone());
+        Assert.assertFalse(s.isDone());
+        Assert.assertFalse(isStarted.get());
+      }
+
+      TimeDuration.valueOf(ThreadLocalRandom.current().nextLong(500) + 100, 
TimeUnit.MILLISECONDS)
+          .sleep(s -> LOG.info("{}", s));
+      
firstList.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse);
+      
watchSecond.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse);
+      Assert.assertFalse(isStarted.get());
+      isStarted.set(true);
+
+      for (int i : randomIndices) {
+        writeSingleFile(pathSecond + i, SizeInBytes.ONE_KB, () -> client);
+      }
+
+      for (int i = 0; i < n; i++) {
+        final ReadReplyProto reply = watchSecond.get(i).get(100, 
TimeUnit.MILLISECONDS);
+        LOG.info("reply {}: {}", i, reply);
+        Assert.assertNotNull(reply);
+        Assert.assertEquals(pathSecond + i, 
reply.getResolvedPath().toStringUtf8());
+      }
+      LOG.info("completionOrder {}", completionOrder);
+      Assert.assertEquals(randomIndices, completionOrder);
+      
firstList.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse);
+    }
+  }
+
   @Test
   public void testFileStore() throws Exception {
     final CLUSTER cluster = newCluster(NUM_PEERS);
     cluster.start();
     RaftTestUtil.waitForLeader(cluster);
 
-    final CheckedSupplier<FileStoreClient, IOException> newClient =
-        () -> new FileStoreClient(cluster.getGroup(), getProperties());
+    final CheckedSupplier<FileStoreClient, IOException> newClient = () -> 
newFileStoreClient(cluster);
 
     testSingleFile("foo", SizeInBytes.valueOf("2M"), newClient);
     testMultipleFiles("file", 20, SizeInBytes.valueOf("1M"), newClient);
@@ -81,19 +144,24 @@ public abstract class FileStoreBaseTest<CLUSTER extends 
MiniRaftCluster>
     cluster.shutdown();
   }
 
+  private static FileStoreWriter writeSingleFile(
+      String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, 
IOException> newClient)
+      throws Exception {
+    return FileStoreWriter.newBuilder()
+        .setFileName(path)
+        .setFileSize(fileLength)
+        .setFileStoreClientSupplier(newClient)
+        .build()
+        .write(false)
+        .verify()
+        .delete();
+  }
+
   private static void testSingleFile(
       String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, 
IOException> newClient)
       throws Exception {
     LOG.info("runTestSingleFile with path={}, fileLength={}", path, 
fileLength);
-
-    try (final FileStoreWriter w =
-             FileStoreWriter.newBuilder()
-                 .setFileName(path)
-                 .setFileSize(fileLength)
-                 .setFileStoreClientSupplier(newClient)
-                 .build()) {
-      w.write(false).verify().delete();
-    }
+    writeSingleFile(path, fileLength, newClient).close();
   }
 
   private static void testMultipleFiles(
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
index 6e46b6e8e..6764184d9 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
index 9b38e2103..3f139ac35 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,4 +22,9 @@ import org.apache.ratis.netty.MiniRaftClusterWithNetty;
 public class TestFileStoreWithNetty
     extends FileStoreBaseTest<MiniRaftClusterWithNetty>
     implements MiniRaftClusterWithNetty.FactoryGet {
+
+  @Override
+  public void testWatch() {
+    //NettyClientRpc does not support sendRequestAsyncUnordered
+  }
 }
diff --git a/ratis-proto/src/main/proto/Examples.proto 
b/ratis-proto/src/main/proto/Examples.proto
index ecf750d72..cc1bc043c 100644
--- a/ratis-proto/src/main/proto/Examples.proto
+++ b/ratis-proto/src/main/proto/Examples.proto
@@ -34,6 +34,7 @@ message ReadRequestProto {
   bytes path = 1;
   uint64 offset = 2;
   uint64 length = 3;
+  bool isWatch = 4;
 }
 
 message WriteRequestHeaderProto {

Reply via email to