szetszwo commented on a change in pull request #339:
URL: https://github.com/apache/incubator-ratis/pull/339#discussion_r538983256
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
##########
@@ -71,6 +71,14 @@
*/
CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean
sync);
+ /**
+ * Send out the data in the source file and close stream asynchronously.
+ *
+ * @param src the source file with the starting position and the number of
bytes.
+ * @return a future of the reply.
+ */
+ CompletableFuture<DataStreamReply> writeAndCloseAsync(ByteBuffer src);
Review comment:
Similar to
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#newInputStream-java.nio.file.Path-java.nio.file.OpenOption...-
, we may define a WriteOption to include both SYNC and CLOSE
```
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index c054d356..7bd0db98 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -41,10 +42,10 @@ public interface DataStreamOutput extends
CloseAsync<DataStreamReply> {
* Send out the data in the source buffer asynchronously.
*
* @param src the source buffer to be sent.
- * @param sync Should the data be sync'ed to the underlying storage?
+ * @param options
* @return a future of the reply.
*/
- CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean
sync);
+ CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src,
WriteOption... options);
/**
diff --git
a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
new file mode 100644
index 00000000..5e5bc08b
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
@@ -0,0 +1,8 @@
+package org.apache.ratis.io;
+
+public enum StandardWriteOption implements WriteOption {
+ /** Sync the data to the underlying storage. */
+ SYNC,
+ /** Close the data to the underlying storage? */
+ CLOSE
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
new file mode 100644
index 00000000..9a28d709
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
@@ -0,0 +1,4 @@
+package org.apache.ratis.io;
+
+public interface WriteOption {
+}
```
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
##########
@@ -28,7 +28,7 @@
import java.util.concurrent.CompletableFuture;
/** An asynchronous output stream supporting zero buffer copying. */
-public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
Review comment:
I think it is convenient to keep extending CloseAsync<DataStreamReply>
so that the API is easier to be used. We may provide a default closeAsync()
implementation.
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -69,11 +70,6 @@
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
private final CompletableFuture<RaftClientReply> raftClientReplyFuture =
new CompletableFuture<>();
- private final MemoizedSupplier<CompletableFuture<DataStreamReply>>
closeSupplier = JavaUtils.memoize(() -> {
Review comment:
We need some way to make the close idempotent. May be a AtomicBoolean.
##########
File path: ratis-proto/src/main/proto/Raft.proto
##########
@@ -301,7 +301,7 @@ message DataStreamPacketHeaderProto {
STREAM_HEADER = 0;
STREAM_DATA = 1;
STREAM_DATA_SYNC = 2;
- STREAM_CLOSE = 3;
+ STREAM_DATA_SYNC_CLOSE = 3;
Review comment:
How about adding an enum?
```
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -300,14 +300,18 @@ message DataStreamPacketHeaderProto {
enum Type {
STREAM_HEADER = 0;
STREAM_DATA = 1;
- STREAM_DATA_SYNC = 2;
- STREAM_CLOSE = 3;
+ }
+
+ enum Option {
+ SYNC = 0;
+ CLOSE = 1;
}
uint64 streamId = 1;
uint64 streamOffset = 2;
Type type = 3;
- uint64 dataLength = 4;
+ repeated Option options = 4;
+ uint64 dataLength = 5;
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]