This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_tmp in repository https://gitbox.apache.org/repos/asf/ratis.git
commit b035556ef07188d7a4d6eb4c4bd7ac54c09dd510 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Thu Oct 13 10:28:47 2022 +0800 RATIS-1714. Support unordered async read. (#755) (cherry picked from commit 2a3482ed1c90c2332faa852883c168f2608da1ad) --- .../java/org/apache/ratis/client/api/AsyncApi.java | 20 +++- .../org/apache/ratis/client/api/BlockingApi.java | 4 +- .../org/apache/ratis/client/impl/AsyncImpl.java | 7 +- .../apache/ratis/client/impl/UnorderedAsync.java | 7 +- .../java/org/apache/ratis/protocol/Message.java | 5 + .../apache/ratis/examples/filestore/FileInfo.java | 18 ++++ .../apache/ratis/examples/filestore/FileStore.java | 33 ++++-- .../ratis/examples/filestore/FileStoreClient.java | 53 +++++++--- .../examples/filestore/FileStoreStateMachine.java | 3 +- .../examples/filestore/FileStoreBaseTest.java | 112 +++++++++++++++++---- .../examples/filestore/TestFileStoreWithGrpc.java | 2 +- .../examples/filestore/TestFileStoreWithNetty.java | 7 +- ratis-proto/src/main/proto/Examples.proto | 1 + 13 files changed, 218 insertions(+), 54 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java index 1e8da9bde..ef96d2a4b 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java @@ -26,8 +26,8 @@ import org.apache.ratis.protocol.RaftPeerId; /** * Asynchronous API to support operations * such as sending message, read-message, stale-read-message and watch-request. - * - * Note that this API and {@link BlockingApi} support the same set of operations. + * <p> + * Note that this API supports all the operations in {@link BlockingApi}. */ public interface AsyncApi { /** @@ -42,12 +42,28 @@ public interface AsyncApi { /** * Send the given readonly message asynchronously to the raft service. + * Note that the reply futures are completed in the same order of the messages being sent. * * @param message The request message. * @return a future of the reply. */ CompletableFuture<RaftClientReply> sendReadOnly(Message message); + /** The same as sendReadOnlyUnordered(message, null). */ + default CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message) { + return sendReadOnlyUnordered(message, null); + } + + /** + * Send the given readonly message asynchronously to the raft service. + * Note that the reply futures can be completed in any order. + * + * @param message The request message. + * @param server The target server. When server == null, send the message to the leader. + * @return a future of the reply. + */ + CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message, RaftPeerId server); + /** * Send the given stale-read message asynchronously to the given server (not the raft service). * If the server commit index is larger than or equal to the given min-index, the request will be processed. diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java index 84f9740e2..1f6237b49 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java @@ -26,8 +26,8 @@ import org.apache.ratis.protocol.RaftPeerId; /** * Blocking API to support operations * such as sending message, read-message, stale-read-message and watch-request. - * - * Note that this API and {@link AsyncApi} support the same set of operations. + * <p> + * Note that this API supports a subset of the operations in {@link AsyncApi}. */ public interface BlockingApi { /** diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java index e063eebe2..a78e5cc11 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java @@ -51,6 +51,11 @@ class AsyncImpl implements AsyncRpcApi { return send(RaftClientRequest.readRequestType(), message, null); } + @Override + public CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message, RaftPeerId server) { + return UnorderedAsync.send(RaftClientRequest.readRequestType(), message, server, client); + } + @Override public CompletableFuture<RaftClientReply> sendStaleRead(Message message, long minIndex, RaftPeerId server) { return send(RaftClientRequest.staleReadRequestType(minIndex), message, server); @@ -58,7 +63,7 @@ class AsyncImpl implements AsyncRpcApi { @Override public CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel replication) { - return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, replication), client); + return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, replication), null, null, client); } @Override diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java index 432acce4d..b053df172 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java @@ -20,6 +20,8 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.retry.ClientRetryEvent; import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.protocol.RaftClientReply; @@ -54,10 +56,11 @@ public interface UnorderedAsync { } } - static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, RaftClientImpl client) { + static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message message, RaftPeerId server, + RaftClientImpl client) { final long callId = CallId.getAndIncrement(); final PendingClientRequest pending = new PendingUnorderedRequest( - () -> client.newRaftClientRequest(null, callId, null, type, null)); + () -> client.newRaftClientRequest(server, callId, message, type, null)); sendRequestWithRetry(pending, client); return pending.getReplyFuture() .thenApply(reply -> RaftClientImpl.handleRaftException(reply, CompletionException::new)); diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java index b47f43074..e7ea97ca4 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.StringUtils; @@ -45,6 +46,10 @@ public interface Message { }; } + static Message valueOf(AbstractMessage abstractMessage) { + return valueOf(abstractMessage.toByteString(), abstractMessage::toString); + } + static Message valueOf(ByteString bytes) { return valueOf(bytes, () -> "Message:" + StringUtils.bytes2HexShortString(bytes)); } diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java index 37c982bcf..c7d8cb7cd 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java @@ -93,6 +93,24 @@ abstract class FileInfo { "File " + getRelativePath() + " is not under construction."); } + static class Watch extends FileInfo { + private final CompletableFuture<UnderConstruction> future = new CompletableFuture<>(); + + Watch(Path relativePath) { + super(relativePath); + } + + CompletableFuture<UnderConstruction> getFuture() { + return future; + } + + CompletableFuture<UnderConstruction> complete(UnderConstruction uc) { + Preconditions.assertTrue(getRelativePath().equals(uc.getRelativePath())); + future.complete(uc); + return future; + } + } + static class ReadOnly extends FileInfo { private final long committedSize; private final long writeSize; diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java index 50206627b..04ff64bd8 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java @@ -72,6 +72,14 @@ public class FileStore implements Closeable { return applyFunction(relative, map::get); } + FileInfo watch(String relative) { + try { + return applyFunction(relative, p -> map.computeIfAbsent(p, FileInfo.Watch::new)); + } catch (FileNotFoundException e) { + throw new IllegalStateException("Failed to watch " + relative, e); + } + } + FileInfo remove(String relative) throws FileNotFoundException { LOG.trace("{}: remove {}", name, relative); return applyFunction(relative, map::remove); @@ -88,7 +96,10 @@ public class FileStore implements Closeable { void putNew(UnderConstruction uc) { LOG.trace("{}: putNew {}", name, uc.getRelativePath()); - CollectionUtils.putNew(uc.getRelativePath(), uc, map, name::toString); + final FileInfo previous = map.put(uc.getRelativePath(), uc); + if (previous instanceof FileInfo.Watch) { + ((FileInfo.Watch) previous).complete(uc); + } } ReadOnly close(UnderConstruction uc) { @@ -171,6 +182,17 @@ public class FileStore implements Closeable { return full; } + CompletableFuture<ReadReplyProto> watch(String relative) { + final FileInfo info = files.watch(relative); + final ReadReplyProto reply = ReadReplyProto.newBuilder() + .setResolvedPath(FileStoreCommon.toByteString(info.getRelativePath())) + .build(); + if (info instanceof FileInfo.Watch) { + return ((FileInfo.Watch) info).getFuture().thenApply(uc -> reply); + } + return CompletableFuture.completedFuture(reply); + } + CompletableFuture<ReadReplyProto> read(String relative, long offset, long length, boolean readCommitted) { final Supplier<String> name = () -> "read(" + relative + ", " + offset + ", " + length + ") @" + getId(); @@ -262,15 +284,12 @@ public class FileStore implements Closeable { CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) { return CompletableFuture.supplyAsync(() -> { - long len = 0; - try { - final Path full = resolve(normalize(p)); - RandomAccessFile file = new RandomAccessFile(full.toFile(), "r"); + final long len; + try (RandomAccessFile file = new RandomAccessFile(resolve(normalize(p)).toFile(), "r")) { len = file.length(); return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build(); } catch (IOException e) { - throw new CompletionException("Failed to commit stream write on file:" + p + - ", expected written bytes:" + bytesWritten + ", actual written bytes:" + len, e); + throw new CompletionException("Failed to commit stream " + p + " with " + bytesWritten + " B.", e); } }, committer); } diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java index 9e91602d1..c223b100b 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java @@ -83,9 +83,9 @@ public class FileStoreClient implements Closeable { } static ByteString send( - ByteString request, CheckedFunction<Message, RaftClientReply, IOException> sendFunction) + Message request, CheckedFunction<Message, RaftClientReply, IOException> sendFunction) throws IOException { - final RaftClientReply reply = sendFunction.apply(Message.valueOf(request)); + final RaftClientReply reply = sendFunction.apply(request); final StateMachineException sme = reply.getStateMachineException(); if (sme != null) { throw new IOException("Failed to send request " + request, sme); @@ -95,9 +95,8 @@ public class FileStoreClient implements Closeable { } static CompletableFuture<ByteString> sendAsync( - ByteString request, Function<Message, CompletableFuture<RaftClientReply>> sendFunction) { - return sendFunction.apply(() -> request - ).thenApply(reply -> { + Message request, Function<Message, CompletableFuture<RaftClientReply>> sendFunction) { + return sendFunction.apply(request).thenApply(reply -> { final StateMachineException sme = reply.getStateMachineException(); if (sme != null) { throw new CompletionException("Failed to send request " + request, sme); @@ -107,19 +106,19 @@ public class FileStoreClient implements Closeable { }); } - private ByteString send(ByteString request) throws IOException { + private ByteString send(Message request) throws IOException { return send(request, client.io()::send); } - private ByteString sendReadOnly(ByteString request) throws IOException { + private ByteString sendReadOnly(Message request) throws IOException { return send(request, client.io()::sendReadOnly); } - private CompletableFuture<ByteString> sendAsync(ByteString request) { + private CompletableFuture<ByteString> sendAsync(Message request) { return sendAsync(request, client.async()::send); } - private CompletableFuture<ByteString> sendReadOnlyAsync(ByteString request) { + private CompletableFuture<ByteString> sendReadOnlyAsync(Message request) { return sendAsync(request, client.async()::sendReadOnly); } @@ -135,7 +134,7 @@ public class FileStoreClient implements Closeable { } private static <OUTPUT, THROWABLE extends Throwable> OUTPUT readImpl( - CheckedFunction<ByteString, OUTPUT, THROWABLE> sendReadOnlyFunction, + CheckedFunction<Message, OUTPUT, THROWABLE> sendReadOnlyFunction, String path, long offset, long length) throws THROWABLE { final ReadRequestProto read = ReadRequestProto.newBuilder() .setPath(ProtoUtils.toByteString(path)) @@ -143,7 +142,31 @@ public class FileStoreClient implements Closeable { .setLength(length) .build(); - return sendReadOnlyFunction.apply(read.toByteString()); + return sendReadOnlyFunction.apply(Message.valueOf(read)); + } + + private CompletableFuture<ByteString> sendWatchAsync(Message request) { + return sendAsync(request, client.async()::sendReadOnlyUnordered); + } + + /** + * Watch the path until it is created. + */ + public CompletableFuture<ReadReplyProto> watchAsync(String path) { + return watchImpl(this::sendWatchAsync, path) + .thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException( + () -> ReadReplyProto.parseFrom(reply))); + } + + private static <OUTPUT, THROWABLE extends Throwable> OUTPUT watchImpl( + CheckedFunction<Message, OUTPUT, THROWABLE> sendWatchFunction, + String path) throws THROWABLE { + final ReadRequestProto watch = ReadRequestProto.newBuilder() + .setPath(ProtoUtils.toByteString(path)) + .setIsWatch(true) + .build(); + + return sendWatchFunction.apply(Message.valueOf(watch)); } public long write(String path, long offset, boolean close, ByteBuffer buffer, boolean sync) @@ -170,7 +193,7 @@ public class FileStoreClient implements Closeable { } private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl( - CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction, + CheckedFunction<Message, OUTPUT, THROWABLE> sendFunction, String path, long offset, boolean close, ByteBuffer data, boolean sync) throws THROWABLE { final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder() @@ -185,16 +208,16 @@ public class FileStoreClient implements Closeable { .setData(ByteString.copyFrom(data)); final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setWrite(write).build(); - return sendFunction.apply(request.toByteString()); + return sendFunction.apply(Message.valueOf(request)); } private static <OUTPUT, THROWABLE extends Throwable> OUTPUT deleteImpl( - CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction, String path) + CheckedFunction<Message, OUTPUT, THROWABLE> sendFunction, String path) throws THROWABLE { final DeleteRequestProto.Builder delete = DeleteRequestProto.newBuilder() .setPath(ProtoUtils.toByteString(path)); final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setDelete(delete).build(); - return sendFunction.apply(request.toByteString()); + return sendFunction.apply(Message.valueOf(request)); } public String delete(String path) throws IOException { diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java index 6ba761ebe..209baaf7d 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java @@ -85,7 +85,8 @@ public class FileStoreStateMachine extends BaseStateMachine { } final String path = proto.getPath().toStringUtf8(); - return files.read(path, proto.getOffset(), proto.getLength(), true) + return (proto.getIsWatch()? files.watch(path) + : files.read(path, proto.getOffset(), proto.getLength(), true)) .thenApply(reply -> Message.valueOf(reply.toByteString())); } diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java index c455aeb8e..5bcc50039 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java @@ -18,38 +18,34 @@ package org.apache.ratis.examples.filestore; import org.apache.ratis.BaseTest; -import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.ConfUtils; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.ExamplesProtos.ReadReplyProto; +import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.thirdparty.io.netty.util.internal.ThreadLocalRandom; import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; -import org.apache.ratis.util.StringUtils; +import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedSupplier; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Objects; -import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster> extends BaseTest @@ -66,14 +62,81 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster> static final int NUM_PEERS = 3; + FileStoreClient newFileStoreClient(CLUSTER cluster) throws IOException { + return new FileStoreClient(cluster.getGroup(), getProperties()); + } + + @Test + public void testWatch() throws Exception { + runWithNewCluster(NUM_PEERS, cluster -> runTestWatch(10, cluster)); + } + + void runTestWatch(int n, CLUSTER cluster) throws Exception { + RaftTestUtil.waitForLeader(cluster); + + final AtomicBoolean isStarted = new AtomicBoolean(); + final List<Integer> randomIndices = new ArrayList<>(); + for (int i = 0; i < n; i++) { + randomIndices.add(i); + } + Collections.shuffle(randomIndices); + LOG.info("randomIndices {}", randomIndices); + final List<Integer> completionOrder = new ArrayList<>(); + + final String pathFirst = "first"; + final String pathSecond = "second"; + final List<CompletableFuture<ReadReplyProto>> firstList = new ArrayList<>(n); + final List<CompletableFuture<ReadReplyProto>> watchSecond = new ArrayList<>(n); + try (FileStoreClient client = new FileStoreClient(cluster.getGroup(), getProperties())) { + for (int i = 0; i < n; i++) { + LOG.info("watchAsync {}", i); + final int index = i; + final CompletableFuture<ReadReplyProto> f = client.watchAsync(pathFirst + i).whenComplete((reply, e) -> { + throw new IllegalStateException(pathFirst + index + " should never be completed."); + }); + firstList.add(f); + final CompletableFuture<ReadReplyProto> s = client.watchAsync(pathSecond + i).whenComplete((reply, e) -> { + Assert.assertNotNull(reply); + Assert.assertNull(e); + Assert.assertTrue(isStarted.get()); + completionOrder.add(index); + }); + watchSecond.add(s); + Assert.assertFalse(f.isDone()); + Assert.assertFalse(s.isDone()); + Assert.assertFalse(isStarted.get()); + } + + TimeDuration.valueOf(ThreadLocalRandom.current().nextLong(500) + 100, TimeUnit.MILLISECONDS) + .sleep(s -> LOG.info("{}", s)); + firstList.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse); + watchSecond.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse); + Assert.assertFalse(isStarted.get()); + isStarted.set(true); + + for (int i : randomIndices) { + writeSingleFile(pathSecond + i, SizeInBytes.ONE_KB, () -> client); + } + + for (int i = 0; i < n; i++) { + final ReadReplyProto reply = watchSecond.get(i).get(100, TimeUnit.MILLISECONDS); + LOG.info("reply {}: {}", i, reply); + Assert.assertNotNull(reply); + Assert.assertEquals(pathSecond + i, reply.getResolvedPath().toStringUtf8()); + } + LOG.info("completionOrder {}", completionOrder); + Assert.assertEquals(randomIndices, completionOrder); + firstList.stream().map(CompletableFuture::isDone).forEach(Assert::assertFalse); + } + } + @Test public void testFileStore() throws Exception { final CLUSTER cluster = newCluster(NUM_PEERS); cluster.start(); RaftTestUtil.waitForLeader(cluster); - final CheckedSupplier<FileStoreClient, IOException> newClient = - () -> new FileStoreClient(cluster.getGroup(), getProperties()); + final CheckedSupplier<FileStoreClient, IOException> newClient = () -> newFileStoreClient(cluster); testSingleFile("foo", SizeInBytes.valueOf("2M"), newClient); testMultipleFiles("file", 20, SizeInBytes.valueOf("1M"), newClient); @@ -81,19 +144,24 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster> cluster.shutdown(); } + private static FileStoreWriter writeSingleFile( + String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, IOException> newClient) + throws Exception { + return FileStoreWriter.newBuilder() + .setFileName(path) + .setFileSize(fileLength) + .setFileStoreClientSupplier(newClient) + .build() + .write(false) + .verify() + .delete(); + } + private static void testSingleFile( String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, IOException> newClient) throws Exception { LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength); - - try (final FileStoreWriter w = - FileStoreWriter.newBuilder() - .setFileName(path) - .setFileSize(fileLength) - .setFileStoreClientSupplier(newClient) - .build()) { - w.write(false).verify().delete(); - } + writeSingleFile(path, fileLength, newClient).close(); } private static void testMultipleFiles( diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java index 6e46b6e8e..6764184d9 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java index 9b38e2103..3f139ac35 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,4 +22,9 @@ import org.apache.ratis.netty.MiniRaftClusterWithNetty; public class TestFileStoreWithNetty extends FileStoreBaseTest<MiniRaftClusterWithNetty> implements MiniRaftClusterWithNetty.FactoryGet { + + @Override + public void testWatch() { + //NettyClientRpc does not support sendRequestAsyncUnordered + } } diff --git a/ratis-proto/src/main/proto/Examples.proto b/ratis-proto/src/main/proto/Examples.proto index ecf750d72..cc1bc043c 100644 --- a/ratis-proto/src/main/proto/Examples.proto +++ b/ratis-proto/src/main/proto/Examples.proto @@ -34,6 +34,7 @@ message ReadRequestProto { bytes path = 1; uint64 offset = 2; uint64 length = 3; + bool isWatch = 4; } message WriteRequestHeaderProto {
