szetszwo commented on a change in pull request #339:
URL: https://github.com/apache/incubator-ratis/pull/339#discussion_r544849349
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -92,7 +94,11 @@ public boolean isOpen() {
@Override
public void close() throws IOException {
- IOUtils.getFromFuture(closeAsync(), () -> "close(" +
ClientInvocationId.valueOf(header) + ")");
+ if (isClosed()) {
+ return;
+ }
+ IOUtils.getFromFuture(writeAsync(Unpooled.EMPTY_BUFFER.nioBuffer(),
StandardWriteOption.CLOSE),
Review comment:
Use DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER instead of
Unpooled.EMPTY_BUFFER. We should avoid importing Netty in ratis-client.
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -104,46 +110,52 @@ private DataStreamOutputImpl(RaftClientRequest request) {
this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
}
- private CompletableFuture<DataStreamReply> send(Type type, Object data,
long length) {
- final DataStreamRequestHeader h = new DataStreamRequestHeader(type,
header.getCallId(), streamOffset, length);
+ private CompletableFuture<DataStreamReply> send(Type type, Object data,
long length, WriteOption... options) {
+ final DataStreamRequestHeader h =
+ new DataStreamRequestHeader(type, header.getCallId(), streamOffset,
length, options);
return orderedStreamAsync.sendRequest(h, data);
}
- private CompletableFuture<DataStreamReply> send(Type type) {
- return combineHeader(send(type, null, 0));
- }
-
private CompletableFuture<DataStreamReply>
combineHeader(CompletableFuture<DataStreamReply> future) {
return future.thenCombine(headerFuture, (reply, headerReply) ->
headerReply.isSuccess()? reply : headerReply);
}
- private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data,
long length, boolean sync) {
+ private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data,
long length, WriteOption... options) {
if (isClosed()) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
clientId + ": stream already closed, request=" + header));
}
- final CompletableFuture<DataStreamReply> f = send(sync ?
Type.STREAM_DATA_SYNC : Type.STREAM_DATA, data, length);
+ final CompletableFuture<DataStreamReply> f =
combineHeader(send(Type.STREAM_DATA, data, length, options));
+ if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
+ closeFuture.set(f);
+ closed.set(true);
+
f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
+ }
streamOffset += length;
- return combineHeader(f);
+ return f;
}
@Override
- public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src,
boolean sync) {
- return writeAsyncImpl(src, src.remaining(), sync);
+ public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src,
WriteOption... options) {
+ return writeAsyncImpl(src, src.remaining(), options);
}
@Override
- public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount
src, boolean sync) {
- return writeAsyncImpl(src, src.getCount(), sync);
+ public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount
src, WriteOption... options) {
+ return writeAsyncImpl(src, src.getCount(), options);
+ }
+
+ boolean isClosed() {
+ return closed.get();
Review comment:
We may use closeFuture as below and remove `closed`.
```
return closeFuture.get() != null;
```
##########
File path:
ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.io;
+
+public enum StandardWriteOption implements WriteOption {
+ /** Write the data to the underlying storage. */
+ WRITE,
Review comment:
WRITE is never used in server. Let's remove it.
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -69,11 +70,6 @@
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
private final CompletableFuture<RaftClientReply> raftClientReplyFuture =
new CompletableFuture<>();
- private final MemoizedSupplier<CompletableFuture<DataStreamReply>>
closeSupplier = JavaUtils.memoize(() -> {
Review comment:
You are right that we don't need AtomicBoolean here. Just a boolean is
enough.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]