szetszwo commented on a change in pull request #221:
URL: https://github.com/apache/incubator-ratis/pull/221#discussion_r503696550
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
##########
@@ -37,4 +43,10 @@
public interface DataStreamApi {
/** Create a stream to send data. */
DataStreamOutput stream();
+
+ /** Send data using a stream with the submessage size. */
+ CompletableFuture<DataStreamReply> streamAsync(Message message, SizeInBytes
submessageSize);
Review comment:
For DataStreamApi, the streamAsync(..) methods may not be very useful
since it uses Message so that it requires buffer copying.
##########
File path:
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -137,42 +139,30 @@ public void testDataStream(){
public void runTestDataStream(){
final int bufferSize = 1024*1024;
final int bufferNum = 10;
- final DataStreamOutput out = client.stream();
Review comment:
We should not remove this test. This is testing DataStreamOutput which
support zero buffering copying.
If there is a need, we should add a separated test for the other cases.
##########
File path:
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -137,42 +139,30 @@ public void testDataStream(){
public void runTestDataStream(){
final int bufferSize = 1024*1024;
final int bufferNum = 10;
- final DataStreamOutput out = client.stream();
-
- //send request
- final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- futures.add(sendRequest(out, 1024));
-
- //send data
- final int halfBufferSize = bufferSize/2;
- int dataSize = 0;
- for(int i = 0; i < bufferNum; i++) {
- final int size = halfBufferSize +
ThreadLocalRandom.current().nextInt(halfBufferSize);
- final ByteBuffer bf = initBuffer(dataSize, size);
- futures.add(out.streamAsync(bf));
- dataSize += size;
- }
- //join all requests
- for(CompletableFuture<DataStreamReply> f : futures) {
- f.join();
- }
- Assert.assertEquals(dataSize, byteWritten);
- shutDownSetup();
- }
+ ByteBuffer buf = initBuffer(bufferSize);
+ Message msg = Message.valueOf(ByteString.copyFrom(buf));
+ SizeInBytes size = SizeInBytes.valueOf(bufferSize / bufferNum);
+ DataStreamOutput out = client.stream();
+ CompletableFuture<DataStreamReply> f = client.streamAsync(msg, size, out);
+ f.join();
+
+ DataStreamClientImpl.DataStreamOutputImpl impl =
(DataStreamClientImpl.DataStreamOutputImpl) out;
+ Assert.assertEquals(writeRequest.getClientId(),
impl.getRequest().getClientId());
+ Assert.assertEquals(writeRequest.getCallId(),
impl.getRequest().getCallId());
+ Assert.assertEquals(writeRequest.getRaftGroupId(),
impl.getRequest().getRaftGroupId());
+ Assert.assertEquals(writeRequest.getServerId(),
impl.getRequest().getServerId());
- CompletableFuture<DataStreamReply> sendRequest(DataStreamOutput out, int
size) {
- // TODO RATIS-1085: create a RaftClientRequest and put it in the buffer
Review comment:
The idea for this TODO is to create a RaftClientRequest without a
Message and then send it out as the header. The the data is sent following the
request using ByteBuffer. We won't create Message here in order to support
zero buffer copying.
##########
File path:
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -137,42 +139,30 @@ public void testDataStream(){
public void runTestDataStream(){
final int bufferSize = 1024*1024;
final int bufferNum = 10;
- final DataStreamOutput out = client.stream();
-
- //send request
- final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- futures.add(sendRequest(out, 1024));
-
- //send data
- final int halfBufferSize = bufferSize/2;
- int dataSize = 0;
- for(int i = 0; i < bufferNum; i++) {
- final int size = halfBufferSize +
ThreadLocalRandom.current().nextInt(halfBufferSize);
- final ByteBuffer bf = initBuffer(dataSize, size);
- futures.add(out.streamAsync(bf));
- dataSize += size;
- }
- //join all requests
- for(CompletableFuture<DataStreamReply> f : futures) {
- f.join();
- }
- Assert.assertEquals(dataSize, byteWritten);
- shutDownSetup();
- }
+ ByteBuffer buf = initBuffer(bufferSize);
+ Message msg = Message.valueOf(ByteString.copyFrom(buf));
Review comment:
We don't want to use ByteString (or Message) in DataStream. Otherwise,
we won't be able to support zero buffer copying.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]