This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2a3482ed1 RATIS-1714. Support unordered async read. (#755)
2a3482ed1 is described below
commit 2a3482ed1c90c2332faa852883c168f2608da1ad
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 13 10:28:47 2022 +0800
RATIS-1714. Support unordered async read. (#755)
---
.../java/org/apache/ratis/client/api/AsyncApi.java | 20 +++-
.../org/apache/ratis/client/api/BlockingApi.java | 4 +-
.../org/apache/ratis/client/impl/AsyncImpl.java | 7 +-
.../apache/ratis/client/impl/UnorderedAsync.java | 7 +-
.../java/org/apache/ratis/protocol/Message.java | 5 +
.../apache/ratis/examples/filestore/FileInfo.java | 18 ++++
.../apache/ratis/examples/filestore/FileStore.java | 33 ++++--
.../ratis/examples/filestore/FileStoreClient.java | 53 +++++++---
.../examples/filestore/FileStoreStateMachine.java | 3 +-
.../examples/filestore/FileStoreBaseTest.java | 112 +++++++++++++++++----
.../examples/filestore/TestFileStoreWithGrpc.java | 2 +-
.../examples/filestore/TestFileStoreWithNetty.java | 7 +-
ratis-proto/src/main/proto/Examples.proto | 1 +
13 files changed, 218 insertions(+), 54 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 7041bd2b6..84a4f5437 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -26,8 +26,8 @@ import org.apache.ratis.protocol.RaftPeerId;
/**
* Asynchronous API to support operations
* such as sending message, read-message, stale-read-message and watch-request.
- *
- * Note that this API and {@link BlockingApi} support the same set of
operations.
+ * <p>
+ * Note that this API supports all the operations in {@link BlockingApi}.
*/
public interface AsyncApi {
/**
@@ -47,6 +47,7 @@ public interface AsyncApi {
/**
* Send the given readonly message asynchronously to the raft service.
+ * Note that the reply futures are completed in the same order of the
messages being sent.
*
* @param message The request message.
* @param server The target server. When server == null, send the message
to the leader.
@@ -54,6 +55,21 @@ public interface AsyncApi {
*/
CompletableFuture<RaftClientReply> sendReadOnly(Message message, RaftPeerId
server);
+ /** The same as sendReadOnlyUnordered(message, null). */
+ default CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message
message) {
+ return sendReadOnlyUnordered(message, null);
+ }
+
+ /**
+ * Send the given readonly message asynchronously to the raft service.
+ * Note that the reply futures can be completed in any order.
+ *
+ * @param message The request message.
+ * @param server The target server. When server == null, send the message
to the leader.
+ * @return a future of the reply.
+ */
+ CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message,
RaftPeerId server);
+
/**
* Send the given stale-read message asynchronously to the given server (not
the raft service).
* If the server commit index is larger than or equal to the given
min-index, the request will be processed.
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index 7493929b0..4a5237afc 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -26,8 +26,8 @@ import org.apache.ratis.protocol.RaftPeerId;
/**
* Blocking API to support operations
* such as sending message, read-message, stale-read-message and watch-request.
- *
- * Note that this API and {@link AsyncApi} support the same set of operations.
+ * <p>
+ * Note that this API supports a subset of the operations in {@link AsyncApi}.
*/
public interface BlockingApi {
/**
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index abaca2b0f..4672f5ecf 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -51,6 +51,11 @@ class AsyncImpl implements AsyncRpcApi {
return send(RaftClientRequest.readRequestType(), message, server);
}
+ @Override
+ public CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message
message, RaftPeerId server) {
+ return UnorderedAsync.send(RaftClientRequest.readRequestType(), message,
server, client);
+ }
+
@Override
public CompletableFuture<RaftClientReply> sendStaleRead(Message message,
long minIndex, RaftPeerId server) {
return send(RaftClientRequest.staleReadRequestType(minIndex), message,
server);
@@ -58,7 +63,7 @@ class AsyncImpl implements AsyncRpcApi {
@Override
public CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel
replication) {
- return UnorderedAsync.send(RaftClientRequest.watchRequestType(index,
replication), client);
+ return UnorderedAsync.send(RaftClientRequest.watchRequestType(index,
replication), null, null, client);
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 432acce4d..b053df172 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -20,6 +20,8 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -54,10 +56,11 @@ public interface UnorderedAsync {
}
}
- static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type,
RaftClientImpl client) {
+ static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type,
Message message, RaftPeerId server,
+ RaftClientImpl client) {
final long callId = CallId.getAndIncrement();
final PendingClientRequest pending = new PendingUnorderedRequest(
- () -> client.newRaftClientRequest(null, callId, null, type, null));
+ () -> client.newRaftClientRequest(server, callId, message, type,
null));
sendRequestWithRetry(pending, client);
return pending.getReplyFuture()
.thenApply(reply -> RaftClientImpl.handleRaftException(reply,
CompletionException::new));
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index b47f43074..e7ea97ca4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.StringUtils;
@@ -45,6 +46,10 @@ public interface Message {
};
}
+ static Message valueOf(AbstractMessage abstractMessage) {
+ return valueOf(abstractMessage.toByteString(), abstractMessage::toString);
+ }
+
static Message valueOf(ByteString bytes) {
return valueOf(bytes, () -> "Message:" +
StringUtils.bytes2HexShortString(bytes));
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index 37c982bcf..c7d8cb7cd 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -93,6 +93,24 @@ abstract class FileInfo {
"File " + getRelativePath() + " is not under construction.");
}
+ static class Watch extends FileInfo {
+ private final CompletableFuture<UnderConstruction> future = new
CompletableFuture<>();
+
+ Watch(Path relativePath) {
+ super(relativePath);
+ }
+
+ CompletableFuture<UnderConstruction> getFuture() {
+ return future;
+ }
+
+ CompletableFuture<UnderConstruction> complete(UnderConstruction uc) {
+ Preconditions.assertTrue(getRelativePath().equals(uc.getRelativePath()));
+ future.complete(uc);
+ return future;
+ }
+ }
+
static class ReadOnly extends FileInfo {
private final long committedSize;
private final long writeSize;
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index 50206627b..04ff64bd8 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -72,6 +72,14 @@ public class FileStore implements Closeable {
return applyFunction(relative, map::get);
}
+ FileInfo watch(String relative) {
+ try {
+ return applyFunction(relative, p -> map.computeIfAbsent(p,
FileInfo.Watch::new));
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException("Failed to watch " + relative, e);
+ }
+ }
+
FileInfo remove(String relative) throws FileNotFoundException {
LOG.trace("{}: remove {}", name, relative);
return applyFunction(relative, map::remove);
@@ -88,7 +96,10 @@ public class FileStore implements Closeable {
void putNew(UnderConstruction uc) {
LOG.trace("{}: putNew {}", name, uc.getRelativePath());
- CollectionUtils.putNew(uc.getRelativePath(), uc, map, name::toString);
+ final FileInfo previous = map.put(uc.getRelativePath(), uc);
+ if (previous instanceof FileInfo.Watch) {
+ ((FileInfo.Watch) previous).complete(uc);
+ }
}
ReadOnly close(UnderConstruction uc) {
@@ -171,6 +182,17 @@ public class FileStore implements Closeable {
return full;
}
+ CompletableFuture<ReadReplyProto> watch(String relative) {
+ final FileInfo info = files.watch(relative);
+ final ReadReplyProto reply = ReadReplyProto.newBuilder()
+ .setResolvedPath(FileStoreCommon.toByteString(info.getRelativePath()))
+ .build();
+ if (info instanceof FileInfo.Watch) {
+ return ((FileInfo.Watch) info).getFuture().thenApply(uc -> reply);
+ }
+ return CompletableFuture.completedFuture(reply);
+ }
+
CompletableFuture<ReadReplyProto> read(String relative, long offset, long
length, boolean readCommitted) {
final Supplier<String> name = () -> "read(" + relative
+ ", " + offset + ", " + length + ") @" + getId();
@@ -262,15 +284,12 @@ public class FileStore implements Closeable {
CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long
bytesWritten) {
return CompletableFuture.supplyAsync(() -> {
- long len = 0;
- try {
- final Path full = resolve(normalize(p));
- RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+ final long len;
+ try (RandomAccessFile file = new
RandomAccessFile(resolve(normalize(p)).toFile(), "r")) {
len = file.length();
return StreamWriteReplyProto.newBuilder().setIsSuccess(len ==
bytesWritten).setByteWritten(len).build();
} catch (IOException e) {
- throw new CompletionException("Failed to commit stream write on file:"
+ p +
- ", expected written bytes:" + bytesWritten + ", actual written bytes:"
+ len, e);
+ throw new CompletionException("Failed to commit stream " + p + " with
" + bytesWritten + " B.", e);
}
}, committer);
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 9e91602d1..c223b100b 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -83,9 +83,9 @@ public class FileStoreClient implements Closeable {
}
static ByteString send(
- ByteString request, CheckedFunction<Message, RaftClientReply,
IOException> sendFunction)
+ Message request, CheckedFunction<Message, RaftClientReply, IOException>
sendFunction)
throws IOException {
- final RaftClientReply reply = sendFunction.apply(Message.valueOf(request));
+ final RaftClientReply reply = sendFunction.apply(request);
final StateMachineException sme = reply.getStateMachineException();
if (sme != null) {
throw new IOException("Failed to send request " + request, sme);
@@ -95,9 +95,8 @@ public class FileStoreClient implements Closeable {
}
static CompletableFuture<ByteString> sendAsync(
- ByteString request, Function<Message,
CompletableFuture<RaftClientReply>> sendFunction) {
- return sendFunction.apply(() -> request
- ).thenApply(reply -> {
+ Message request, Function<Message, CompletableFuture<RaftClientReply>>
sendFunction) {
+ return sendFunction.apply(request).thenApply(reply -> {
final StateMachineException sme = reply.getStateMachineException();
if (sme != null) {
throw new CompletionException("Failed to send request " + request,
sme);
@@ -107,19 +106,19 @@ public class FileStoreClient implements Closeable {
});
}
- private ByteString send(ByteString request) throws IOException {
+ private ByteString send(Message request) throws IOException {
return send(request, client.io()::send);
}
- private ByteString sendReadOnly(ByteString request) throws IOException {
+ private ByteString sendReadOnly(Message request) throws IOException {
return send(request, client.io()::sendReadOnly);
}
- private CompletableFuture<ByteString> sendAsync(ByteString request) {
+ private CompletableFuture<ByteString> sendAsync(Message request) {
return sendAsync(request, client.async()::send);
}
- private CompletableFuture<ByteString> sendReadOnlyAsync(ByteString request) {
+ private CompletableFuture<ByteString> sendReadOnlyAsync(Message request) {
return sendAsync(request, client.async()::sendReadOnly);
}
@@ -135,7 +134,7 @@ public class FileStoreClient implements Closeable {
}
private static <OUTPUT, THROWABLE extends Throwable> OUTPUT readImpl(
- CheckedFunction<ByteString, OUTPUT, THROWABLE> sendReadOnlyFunction,
+ CheckedFunction<Message, OUTPUT, THROWABLE> sendReadOnlyFunction,
String path, long offset, long length) throws THROWABLE {
final ReadRequestProto read = ReadRequestProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
@@ -143,7 +142,31 @@ public class FileStoreClient implements Closeable {
.setLength(length)
.build();
- return sendReadOnlyFunction.apply(read.toByteString());
+ return sendReadOnlyFunction.apply(Message.valueOf(read));
+ }
+
+ private CompletableFuture<ByteString> sendWatchAsync(Message request) {
+ return sendAsync(request, client.async()::sendReadOnlyUnordered);
+ }
+
+ /**
+ * Watch the path until it is created.
+ */
+ public CompletableFuture<ReadReplyProto> watchAsync(String path) {
+ return watchImpl(this::sendWatchAsync, path)
+ .thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
+ () -> ReadReplyProto.parseFrom(reply)));
+ }
+
+ private static <OUTPUT, THROWABLE extends Throwable> OUTPUT watchImpl(
+ CheckedFunction<Message, OUTPUT, THROWABLE> sendWatchFunction,
+ String path) throws THROWABLE {
+ final ReadRequestProto watch = ReadRequestProto.newBuilder()
+ .setPath(ProtoUtils.toByteString(path))
+ .setIsWatch(true)
+ .build();
+
+ return sendWatchFunction.apply(Message.valueOf(watch));
}
public long write(String path, long offset, boolean close, ByteBuffer
buffer, boolean sync)
@@ -170,7 +193,7 @@ public class FileStoreClient implements Closeable {
}
private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
- CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction,
+ CheckedFunction<Message, OUTPUT, THROWABLE> sendFunction,
String path, long offset, boolean close, ByteBuffer data, boolean sync)
throws THROWABLE {
final WriteRequestHeaderProto.Builder header =
WriteRequestHeaderProto.newBuilder()
@@ -185,16 +208,16 @@ public class FileStoreClient implements Closeable {
.setData(ByteString.copyFrom(data));
final FileStoreRequestProto request =
FileStoreRequestProto.newBuilder().setWrite(write).build();
- return sendFunction.apply(request.toByteString());
+ return sendFunction.apply(Message.valueOf(request));
}
private static <OUTPUT, THROWABLE extends Throwable> OUTPUT deleteImpl(
- CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction, String path)
+ CheckedFunction<Message, OUTPUT, THROWABLE> sendFunction, String path)
throws THROWABLE {
final DeleteRequestProto.Builder delete = DeleteRequestProto.newBuilder()
.setPath(ProtoUtils.toByteString(path));
final FileStoreRequestProto request =
FileStoreRequestProto.newBuilder().setDelete(delete).build();
- return sendFunction.apply(request.toByteString());
+ return sendFunction.apply(Message.valueOf(request));
}
public String delete(String path) throws IOException {
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 6ba761ebe..209baaf7d 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -85,7 +85,8 @@ public class FileStoreStateMachine extends BaseStateMachine {
}
final String path = proto.getPath().toStringUtf8();
- return files.read(path, proto.getOffset(), proto.getLength(), true)
+ return (proto.getIsWatch()? files.watch(path)
+ : files.read(path, proto.getOffset(), proto.getLength(), true))
.thenApply(reply -> Message.valueOf(reply.toByteString()));
}
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
index c455aeb8e..5bcc50039 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -18,38 +18,34 @@
package org.apache.ratis.examples.filestore;
import org.apache.ratis.BaseTest;
-import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.ExamplesProtos.ReadReplyProto;
+import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.netty.util.internal.ThreadLocalRandom;
import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
-import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedSupplier;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Objects;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -66,14 +62,81 @@ public abstract class FileStoreBaseTest<CLUSTER extends
MiniRaftCluster>
static final int NUM_PEERS = 3;
+ FileStoreClient newFileStoreClient(CLUSTER cluster) throws IOException {
+ return new FileStoreClient(cluster.getGroup(), getProperties());
+ }
+
+ @Test
+ public void testWatch() throws Exception {
+ runWithNewCluster(NUM_PEERS, cluster -> runTestWatch(10, cluster));
+ }
+
+ void runTestWatch(int n, CLUSTER cluster) throws Exception {
+ RaftTestUtil.waitForLeader(cluster);
+
+ final AtomicBoolean isStarted = new AtomicBoolean();
+ final List<Integer> randomIndices = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ randomIndices.add(i);
+ }
+ Collections.shuffle(randomIndices);
+ LOG.info("randomIndices {}", randomIndices);
+ final List<Integer> completionOrder = new ArrayList<>();
+
+ final String pathFirst = "first";
+ final String pathSecond = "second";
+ final List<CompletableFuture<ReadReplyProto>> firstList = new
ArrayList<>(n);
+ final List<CompletableFuture<ReadReplyProto>> watchSecond = new
ArrayList<>(n);
+ try (FileStoreClient client = new FileStoreClient(cluster.getGroup(),
getProperties())) {
+ for (int i = 0; i < n; i++) {
+ LOG.info("watchAsync {}", i);
+ final int index = i;
+ final CompletableFuture<ReadReplyProto> f =
client.watchAsync(pathFirst + i).whenComplete((reply, e) -> {
+ throw new IllegalStateException(pathFirst + index + " should never
be completed.");
+ });
+ firstList.add(f);
+ final CompletableFuture<ReadReplyProto> s =
client.watchAsync(pathSecond + i).whenComplete((reply, e) -> {
+ Assert.assertNotNull(reply);
+ Assert.assertNull(e);
+ Assert.assertTrue(isStarted.get());
+ completionOrder.add(index);
+ });
+ watchSecond.add(s);
+ Assert.assertFalse(f.isDone());
+ Assert.assertFalse(s.isDone());
+ Assert.assertFalse(isStarted.get());
+ }
+
+ TimeDuration.valueOf(ThreadLocalRandom.current().nextLong(500) + 100,
TimeUnit.MILLISECONDS)
+ .sleep(s -> LOG.info("{}", s));
+
firstList.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse);
+
watchSecond.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse);
+ Assert.assertFalse(isStarted.get());
+ isStarted.set(true);
+
+ for (int i : randomIndices) {
+ writeSingleFile(pathSecond + i, SizeInBytes.ONE_KB, () -> client);
+ }
+
+ for (int i = 0; i < n; i++) {
+ final ReadReplyProto reply = watchSecond.get(i).get(100,
TimeUnit.MILLISECONDS);
+ LOG.info("reply {}: {}", i, reply);
+ Assert.assertNotNull(reply);
+ Assert.assertEquals(pathSecond + i,
reply.getResolvedPath().toStringUtf8());
+ }
+ LOG.info("completionOrder {}", completionOrder);
+ Assert.assertEquals(randomIndices, completionOrder);
+
firstList.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse);
+ }
+ }
+
@Test
public void testFileStore() throws Exception {
final CLUSTER cluster = newCluster(NUM_PEERS);
cluster.start();
RaftTestUtil.waitForLeader(cluster);
- final CheckedSupplier<FileStoreClient, IOException> newClient =
- () -> new FileStoreClient(cluster.getGroup(), getProperties());
+ final CheckedSupplier<FileStoreClient, IOException> newClient = () ->
newFileStoreClient(cluster);
testSingleFile("foo", SizeInBytes.valueOf("2M"), newClient);
testMultipleFiles("file", 20, SizeInBytes.valueOf("1M"), newClient);
@@ -81,19 +144,24 @@ public abstract class FileStoreBaseTest<CLUSTER extends
MiniRaftCluster>
cluster.shutdown();
}
+ private static FileStoreWriter writeSingleFile(
+ String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient,
IOException> newClient)
+ throws Exception {
+ return FileStoreWriter.newBuilder()
+ .setFileName(path)
+ .setFileSize(fileLength)
+ .setFileStoreClientSupplier(newClient)
+ .build()
+ .write(false)
+ .verify()
+ .delete();
+ }
+
private static void testSingleFile(
String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient,
IOException> newClient)
throws Exception {
LOG.info("runTestSingleFile with path={}, fileLength={}", path,
fileLength);
-
- try (final FileStoreWriter w =
- FileStoreWriter.newBuilder()
- .setFileName(path)
- .setFileSize(fileLength)
- .setFileStoreClientSupplier(newClient)
- .build()) {
- w.write(false).verify().delete();
- }
+ writeSingleFile(path, fileLength, newClient).close();
}
private static void testMultipleFiles(
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
index 6e46b6e8e..6764184d9 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
index 9b38e2103..3f139ac35 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,4 +22,9 @@ import org.apache.ratis.netty.MiniRaftClusterWithNetty;
public class TestFileStoreWithNetty
extends FileStoreBaseTest<MiniRaftClusterWithNetty>
implements MiniRaftClusterWithNetty.FactoryGet {
+
+ @Override
+ public void testWatch() {
+ //NettyClientRpc does not support sendRequestAsyncUnordered
+ }
}
diff --git a/ratis-proto/src/main/proto/Examples.proto
b/ratis-proto/src/main/proto/Examples.proto
index ecf750d72..cc1bc043c 100644
--- a/ratis-proto/src/main/proto/Examples.proto
+++ b/ratis-proto/src/main/proto/Examples.proto
@@ -34,6 +34,7 @@ message ReadRequestProto {
bytes path = 1;
uint64 offset = 2;
uint64 length = 3;
+ bool isWatch = 4;
}
message WriteRequestHeaderProto {