szetszwo commented on a change in pull request #221:
URL: https://github.com/apache/incubator-ratis/pull/221#discussion_r504051914
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
##########
@@ -25,6 +25,9 @@
/** An asynchronous output stream supporting zero buffer copying. */
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
+ /** Create a RaftClientRequest and send out it asynchronously */
+ CompletableFuture<DataStreamReply> streamRequestAsync();
Review comment:
This method probably is not needed in this public user interface. The
request should be automatically sent but not be triggered by the user.
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
##########
@@ -25,6 +25,9 @@
/** An asynchronous output stream supporting zero buffer copying. */
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
+ /** Create a RaftClientRequest and send out it asynchronously */
+ CompletableFuture<DataStreamReply> streamRequestAsync();
+
/** Send out the data in the buffer asynchronously */
- CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf);
+ CompletableFuture<DataStreamReply> streamDataAsync(ByteBuffer buf);
Review comment:
Let's use the name "writeAsync". It is consistent with the
WritableByteChannel.write(ByteBuffer) method.
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
##########
@@ -25,12 +25,21 @@
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
+import java.nio.ByteBuffer;
+
/** Client utilities for internal use. */
public final class ClientImplUtils {
+ private static final ByteBuffer REPLY_OK_IN_BYTE_BUFFER =
ByteBuffer.wrap("OK".getBytes());
Review comment:
Let's don't add this "OK" constant. We should fix the reply.
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -64,17 +70,29 @@ public DataStreamClientImpl(RaftPeer raftServer,
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc,
properties);
}
- class DataStreamOutputImpl implements DataStreamOutput {
+ public class DataStreamOutputImpl implements DataStreamOutput {
private long streamId = 0;
private long messageId = 0;
+ private RaftClientRequest request;
Review comment:
Let's create the header and send it out in the constructor.
```suggestion
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
```
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -64,17 +70,29 @@ public DataStreamClientImpl(RaftPeer raftServer,
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc,
properties);
}
- class DataStreamOutputImpl implements DataStreamOutput {
+ public class DataStreamOutputImpl implements DataStreamOutput {
private long streamId = 0;
private long messageId = 0;
+ private RaftClientRequest request;
public DataStreamOutputImpl(long id){
this.streamId = id;
Review comment:
```suggestion
this.streamId = id;
this.header = new RaftClientRequest(clientId, raftServer.getId(),
groupId, RaftClientImpl.nextCallId(),
RaftClientRequest.writeRequestType());
this.headerFuture = orderedStreamAsync.sendRequest(streamId, messageId,
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer());
```
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -44,6 +48,7 @@
private DataStreamClientRpc dataStreamClientRpc;
private OrderedStreamAsync orderedStreamAsync;
+ private final ClientId clientId;
Review comment:
The clientId and the groupId/group should be passed in the constructor.
Let's add a TODO
```suggestion
// TODO Similar to RaftClientImpl, pass ClientId and RaftGroupId/RaftGroup
in constructor.
private final ClientId clientId = ClientId.randomId();
private final RaftGroupId groupId = RaftGroupId.randomId();
```
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -84,6 +102,11 @@ public DataStreamOutputImpl(long id){
public CompletableFuture<DataStreamReply> closeAsync() {
return null;
}
+
+ @VisibleForTesting
+ public RaftClientRequest getRequest() {
+ return request;
+ }
Review comment:
```suggestion
public RaftClientRequest getHeader() {
return header;
}
public CompletableFuture<DataStreamReply> getHeaderFuture() {
return headerFuture;
}
```
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -64,17 +70,29 @@ public DataStreamClientImpl(RaftPeer raftServer,
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc,
properties);
}
- class DataStreamOutputImpl implements DataStreamOutput {
+ public class DataStreamOutputImpl implements DataStreamOutput {
private long streamId = 0;
private long messageId = 0;
+ private RaftClientRequest request;
public DataStreamOutputImpl(long id){
this.streamId = id;
}
// send to the attached dataStreamClientRpc
@Override
- public CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf) {
+ public CompletableFuture<DataStreamReply> streamRequestAsync() {
+ messageId++;
+ RaftClientRequest.Type type =
RaftClientRequest.dataStreamRequestType(streamId, messageId, false);
Review comment:
Let's use RaftClientRequest.writeRequestType() for the moment since the
effect of dataStream should be the same as AsyncApi.send. If there is a need,
we may add dataStreamRequestType later.
##########
File path: ratis-proto/src/main/proto/Raft.proto
##########
@@ -253,6 +253,12 @@ message StreamRequestTypeProto {
bool endOfRequest = 3;// Is this the end-of-request?
}
+message DataStreamRequestTypeProto {
Review comment:
Let's use WriteRequestTypeProto for the moment.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]