This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 88327da  RATIS-1097. DataStreamOutputImpl should use streamOffset 
instead of messageId (#224)
88327da is described below

commit 88327daff3261cb3e2929d7fbcbba1ec51d67a34
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 15 15:16:21 2020 +0800

    RATIS-1097. DataStreamOutputImpl should use streamOffset instead of 
messageId (#224)
    
    * RATIS-1097. DataStreamOutputImpl should use streamOffset instead of 
messageId.
    
    * Add review comments.
    
    * Remove the unused import.
    
    * Fix testAsyncConfiguration.
---
 .../apache/ratis/client/RaftClientConfigKeys.java  | 18 ++++-
 .../ratis/client/impl/DataStreamClientImpl.java    | 42 ++++++------
 .../ratis/client/impl/MessageStreamImpl.java       |  2 +-
 .../ratis/client/impl/OrderedStreamAsync.java      | 77 ++++++----------------
 .../impl/DataStreamPacketByteBuffer.java}          | 39 +++++------
 .../impl/DataStreamPacketImpl.java}                | 41 ++++++------
 .../impl/DataStreamReplyByteBuffer.java}           | 16 +++--
 .../impl/DataStreamRequestByteBuffer.java}         | 16 +++--
 ...ataStreamMessage.java => DataStreamPacket.java} |  4 +-
 .../org/apache/ratis/protocol/DataStreamReply.java |  5 +-
 .../apache/ratis/protocol/DataStreamRequest.java   |  2 +-
 .../ratis/netty/client/DataStreamReplyDecoder.java |  2 +-
 .../netty/client/DataStreamRequestEncoder.java     | 13 ++--
 .../ratis/netty/server/DataStreamReplyEncoder.java | 16 ++---
 .../netty/server/DataStreamRequestByteBuf.java     | 35 ++++------
 .../ratis/netty/server/NettyServerStreamRpc.java   | 31 ++++-----
 .../org/apache/ratis/MessageStreamApiTests.java    |  4 +-
 17 files changed, 167 insertions(+), 196 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 7fc01c2..746a4ab 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -90,8 +90,22 @@ public interface RaftClientConfigKeys {
     }
   }
 
-  interface Stream {
-    String PREFIX = RaftClientConfigKeys.PREFIX + ".stream";
+  interface DataStream {
+    String PREFIX = RaftClientConfigKeys.PREFIX + ".data-stream";
+
+    String OUTSTANDING_REQUESTS_MAX_KEY = PREFIX + ".outstanding-requests.max";
+    int OUTSTANDING_REQUESTS_MAX_DEFAULT = 100;
+    static int outstandingRequestsMax(RaftProperties properties) {
+      return getInt(properties::getInt, OUTSTANDING_REQUESTS_MAX_KEY,
+          OUTSTANDING_REQUESTS_MAX_DEFAULT, getDefaultLog(), requireMin(2));
+    }
+    static void setOutstandingRequestsMax(RaftProperties properties, int 
outstandingRequests) {
+      setInt(properties::setInt, OUTSTANDING_REQUESTS_MAX_KEY, 
outstandingRequests);
+    }
+  }
+
+  interface MessageStream {
+    String PREFIX = RaftClientConfigKeys.PREFIX + ".message-stream";
 
     String SUBMESSAGE_SIZE_KEY = PREFIX + ".submessage-size";
     SizeInBytes SUBMESSAGE_SIZE_DEFAULT = SizeInBytes.valueOf("1MB");
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 2c9d175..96c0cf0 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -36,59 +36,56 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Streaming client implementation
  * allows client to create streams and send asynchronously.
  */
-
 public class DataStreamClientImpl implements DataStreamClient {
   public static final Logger LOG = 
LoggerFactory.getLogger(DataStreamClientImpl.class);
 
-  private DataStreamClientRpc dataStreamClientRpc;
-  private OrderedStreamAsync orderedStreamAsync;
   // TODO Similar to RaftClientImpl, pass ClientId and RaftGroupId/RaftGroup 
in constructor.
   private final ClientId clientId = ClientId.randomId();
   private final RaftGroupId groupId =  RaftGroupId.randomId();
-  private RaftPeer raftServer;
-  private RaftProperties properties;
-  private Parameters parameters;
-  private long streamId = 0;
-
-  public DataStreamClientImpl(RaftPeer raftServer,
-                              RaftProperties properties,
-                              Parameters parameters) {
-    this.raftServer = Objects.requireNonNull(raftServer,
-                                          "peer == null");
-    this.properties = properties;
-    this.parameters = parameters;
+
+  private final RaftPeer raftServer;
+  private final DataStreamClientRpc dataStreamClientRpc;
+  private final OrderedStreamAsync orderedStreamAsync;
+
+  private final AtomicInteger streamId = new AtomicInteger();
+
+  public DataStreamClientImpl(RaftPeer server, RaftProperties properties, 
Parameters parameters) {
+    this.raftServer = Objects.requireNonNull(server, "server == null");
 
     final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
     this.dataStreamClientRpc = 
DataStreamClientFactory.cast(type.newFactory(parameters))
                                .newDataStreamClientRpc(raftServer, properties);
 
-    this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, 
properties);
+    this.orderedStreamAsync = new OrderedStreamAsync(clientId, 
dataStreamClientRpc, properties);
   }
 
   public class DataStreamOutputImpl implements DataStreamOutput {
-    private long streamId = 0;
-    private long messageId = 0;
+    private final long streamId;
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
 
+    private long streamOffset = 0;
+
     public DataStreamOutputImpl(long id){
       this.streamId = id;
       this.header = new RaftClientRequest(clientId, raftServer.getId(), 
groupId, RaftClientImpl.nextCallId(),
           RaftClientRequest.writeRequestType());
-      this.headerFuture = orderedStreamAsync.sendRequest(streamId, messageId,
+      this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
           
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer());
     }
 
     // send to the attached dataStreamClientRpc
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
-      messageId++;
-      return orderedStreamAsync.sendRequest(streamId, messageId, buf);
+      final CompletableFuture<DataStreamReply> f = 
orderedStreamAsync.sendRequest(streamId, streamOffset, buf);
+      streamOffset += buf.remaining();
+      return f;
     }
 
     // should wait for attached sliding window to terminate
@@ -113,8 +110,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
 
   @Override
   public DataStreamOutput stream() {
-    streamId++;
-    return new DataStreamOutputImpl(streamId);
+    return new DataStreamOutputImpl(streamId.incrementAndGet());
   }
 
   @Override
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
index e2971d6..14779a1 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
@@ -71,7 +71,7 @@ public final class MessageStreamImpl implements 
MessageStreamApi {
 
   private MessageStreamImpl(RaftClientImpl client, RaftProperties properties) {
     this.client = Objects.requireNonNull(client, "client == null");
-    this.submessageSize = 
RaftClientConfigKeys.Stream.submessageSize(properties);
+    this.submessageSize = 
RaftClientConfigKeys.MessageStream.submessageSize(properties);
   }
 
   @Override
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 093190b..fb4a7d9 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -20,10 +20,9 @@ package org.apache.ratis.client.impl;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.protocol.DataStreamRequestByteBuffer;
-import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SlidingWindow;
@@ -38,37 +37,18 @@ import java.util.function.LongFunction;
 public class OrderedStreamAsync {
   public static final Logger LOG = 
LoggerFactory.getLogger(OrderedStreamAsync.class);
 
-  public static class DataStreamWindowRequest implements DataStreamRequest,
-      SlidingWindow.ClientSideRequest<DataStreamReply> {
-    private final long streamId;
-    private final long messageId;
+  static class DataStreamWindowRequest extends DataStreamRequestByteBuffer
+      implements SlidingWindow.ClientSideRequest<DataStreamReply> {
     private final long seqNum;
-    private final ByteBuffer data;
-    private boolean isFirst = false;
-    private CompletableFuture<DataStreamReply> replyFuture = new 
CompletableFuture<>();
+    private final CompletableFuture<DataStreamReply> replyFuture = new 
CompletableFuture<>();
 
-    public DataStreamRequestByteBuffer newDataStreamRequest(){
-      return new DataStreamRequestByteBuffer(streamId, messageId, 
data.slice());
-    }
-
-    @Override
-    public long getStreamId() {
-      return streamId;
-    }
-
-    @Override
-    public long getDataOffset() {
-      return messageId;
-    }
-
-    @Override
-    public long getDataLength() {
-      return data.capacity();
+    DataStreamWindowRequest(long streamId, long offset, ByteBuffer data, long 
seqNum){
+      super(streamId, offset, data);
+      this.seqNum = seqNum;
     }
 
     @Override
     public void setFirstRequest() {
-      isFirst = true;
     }
 
     @Override
@@ -94,42 +74,28 @@ public class OrderedStreamAsync {
     public CompletableFuture<DataStreamReply> getReplyFuture(){
       return replyFuture;
     }
-
-    public DataStreamWindowRequest(long streamId, long messageId,
-                                 long seqNum, ByteBuffer data){
-      this.streamId = streamId;
-      this.messageId = messageId;
-      this.data = data.slice();
-      this.seqNum = seqNum;
-    }
   }
 
-  private SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> 
slidingWindow;
-  private Semaphore requestSemaphore;
-  private DataStreamClientRpc dataStreamClientRpc;
+  private final DataStreamClientRpc dataStreamClientRpc;
+  private final SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> 
slidingWindow;
+  private final Semaphore requestSemaphore;
 
-  public OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc,
-                            RaftProperties properties){
+  OrderedStreamAsync(ClientId clientId, DataStreamClientRpc 
dataStreamClientRpc, RaftProperties properties){
     this.dataStreamClientRpc = dataStreamClientRpc;
-    this.requestSemaphore = new 
Semaphore(RaftClientConfigKeys.Async.outstandingRequestsMax(properties)*2);
-    this.slidingWindow = new SlidingWindow.Client<>("sliding");
+    this.slidingWindow = new SlidingWindow.Client<>(clientId);
+    this.requestSemaphore = new 
Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties)*2);
   }
 
-  private void resetSlidingWindow(RaftClientRequest request) {
-    slidingWindow.resetFirstSeqNum();
-  }
-
-  public CompletableFuture<DataStreamReply> sendRequest(long streamId,
-                                                        long messageId,
-                                                        ByteBuffer data){
+  CompletableFuture<DataStreamReply> sendRequest(long streamId, long offset, 
ByteBuffer data){
+    final int length = data.remaining();
     try {
       requestSemaphore.acquire();
     } catch (InterruptedException e){
       return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
-          "Interrupted when sending streamId=" + streamId + ", messageId= " + 
messageId, e));
+          "Interrupted when sending streamId=" + streamId + ", offset= " + 
offset + ", length=" + length, e));
     }
-    final LongFunction<DataStreamWindowRequest> constructor = seqNum -> new 
DataStreamWindowRequest(streamId,
-                                                                          
messageId, seqNum, data);
+    final LongFunction<DataStreamWindowRequest> constructor
+        = seqNum -> new DataStreamWindowRequest(streamId, offset, 
data.slice(), seqNum);
     return slidingWindow.submitNewRequest(constructor, 
this::sendRequestToNetwork).
            getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
   }
@@ -142,8 +108,7 @@ public class OrderedStreamAsync {
     if(slidingWindow.isFirst(request.getSeqNum())){
       request.setFirstRequest();
     }
-    DataStreamRequestByteBuffer rpcRequest = request.newDataStreamRequest();
-    CompletableFuture<DataStreamReply> requestFuture = 
dataStreamClientRpc.streamAsync(rpcRequest);
+    final CompletableFuture<DataStreamReply> requestFuture = 
dataStreamClientRpc.streamAsync(request);
     requestFuture.thenApply(reply -> {
       slidingWindow.receiveReply(
           request.getSeqNum(), reply, this::sendRequestToNetwork);
@@ -155,6 +120,4 @@ public class OrderedStreamAsync {
       f.complete(reply);
     });
   }
-
-
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
similarity index 61%
rename from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
rename to 
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index fee5a04..02fc04a 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,38 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.ratis.protocol;
+package org.apache.ratis.datastream.impl;
 
 import java.nio.ByteBuffer;
 
-public class DataStreamRequestByteBuffer implements DataStreamRequest{
-  private long streamId;
-  private long dataOffset;
-  private ByteBuffer buf;
-
-  public DataStreamRequestByteBuffer(long streamId, long dataOffset, 
ByteBuffer buf){
-    this.streamId = streamId;
-    this.dataOffset = dataOffset;
-    this.buf = buf;
-  }
-
-  @Override
-  public long getStreamId() {
-    return streamId;
-  }
+/**
+ * Implements {@link org.apache.ratis.protocol.DataStreamPacket} with {@link 
ByteBuffer}.
+ *
+ * This class is immutable.
+ */
+public class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
+  private final ByteBuffer buffer;
 
-  @Override
-  public long getDataOffset() {
-    return dataOffset;
+  public DataStreamPacketByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer) {
+    super(streamId, streamOffset);
+    this.buffer = buffer.asReadOnlyBuffer();
   }
 
   @Override
   public long getDataLength() {
-    return buf.capacity();
+    return buffer.remaining();
   }
 
-  public ByteBuffer getBuf() {
-    return buf;
+  public ByteBuffer slice() {
+    return buffer.slice();
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
similarity index 56%
rename from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
rename to 
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
index f868bf8..7f76042 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,25 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.ratis.datastream.impl;
 
-package org.apache.ratis.protocol;
-
-import java.nio.ByteBuffer;
+import org.apache.ratis.protocol.DataStreamPacket;
 
-public class DataStreamReplyByteBuffer implements DataStreamReply {
-  private long streamId;
-  private long dataOffset;
-  private ByteBuffer response;
+/**
+ * This is an abstract implementation of {@link DataStreamPacket}.
+ *
+ * This class is immutable.
+ */
+public abstract class DataStreamPacketImpl implements DataStreamPacket {
+  private final long streamId;
+  private final long streamOffset;
 
-  public DataStreamReplyByteBuffer(long streamId, long dataOffset, ByteBuffer 
bf){
+  public DataStreamPacketImpl(long streamId, long streamOffset) {
     this.streamId = streamId;
-    this.dataOffset = dataOffset;
-    this.response = bf;
-  }
-
-  @Override
-  public ByteBuffer getResponse() {
-    return response;
+    this.streamOffset = streamOffset;
   }
 
   @Override
@@ -42,12 +39,16 @@ public class DataStreamReplyByteBuffer implements 
DataStreamReply {
   }
 
   @Override
-  public long getDataOffset() {
-    return dataOffset;
+  public long getStreamOffset() {
+    return streamOffset;
   }
 
   @Override
-  public long getDataLength() {
-    return response.capacity();
+  public String toString() {
+    return getClass().getSimpleName() + "{"
+        + "streamId=" + getStreamId()
+        + ", streamOffset=" + getStreamOffset()
+        + ", dataLength=" + getDataLength()
+        + '}';
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
similarity index 65%
copy from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
copy to 
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 856eed1..8370b25 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -15,11 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.ratis.datastream.impl;
 
-package org.apache.ratis.protocol;
+import org.apache.ratis.protocol.DataStreamReply;
 
 import java.nio.ByteBuffer;
 
-public interface DataStreamReply extends DataStreamMessage {
-  ByteBuffer getResponse();
-}
\ No newline at end of file
+/**
+ * Implements {@link DataStreamReply} with {@link ByteBuffer}.
+ *
+ * This class is immutable.
+ */
+public class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer 
implements DataStreamReply {
+  public DataStreamReplyByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer) {
+    super(streamId, streamOffset, buffer);
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
similarity index 65%
copy from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
copy to 
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 856eed1..fb0f3ec 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -15,11 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.ratis.datastream.impl;
 
-package org.apache.ratis.protocol;
+import org.apache.ratis.protocol.DataStreamRequest;
 
 import java.nio.ByteBuffer;
 
-public interface DataStreamReply extends DataStreamMessage {
-  ByteBuffer getResponse();
-}
\ No newline at end of file
+/**
+ * Implements {@link DataStreamRequest} with {@link ByteBuffer}.
+ *
+ * This class is immutable.
+ */
+public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer 
implements DataStreamRequest {
+  public DataStreamRequestByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer) {
+    super(streamId, streamOffset, buffer);
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
similarity index 93%
rename from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
rename to 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index b05694c..10ca737 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -18,10 +18,10 @@
 
 package org.apache.ratis.protocol;
 
-public interface DataStreamMessage{
+public interface DataStreamPacket {
   long getStreamId();
 
-  long getDataOffset();
+  long getStreamOffset();
 
   long getDataLength();
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 856eed1..6ac5253 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -18,8 +18,5 @@
 
 package org.apache.ratis.protocol;
 
-import java.nio.ByteBuffer;
-
-public interface DataStreamReply extends DataStreamMessage {
-  ByteBuffer getResponse();
+public interface DataStreamReply extends DataStreamPacket {
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index b2431d6..8db10fe 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -18,5 +18,5 @@
 
 package org.apache.ratis.protocol;
 
-public interface DataStreamRequest extends DataStreamMessage {
+public interface DataStreamRequest extends DataStreamPacket {
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
index 775833b..e91f606 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
@@ -18,7 +18,7 @@
 
 package org.apache.ratis.netty.client;
 
-import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
index 6bafffa..0864b12 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
@@ -18,7 +18,7 @@
 
 package org.apache.ratis.netty.client;
 
-import org.apache.ratis.protocol.DataStreamRequestByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -30,12 +30,13 @@ public class DataStreamRequestEncoder extends 
MessageToMessageEncoder<DataStream
 
   @Override
   protected void encode(ChannelHandlerContext channelHandlerContext,
-                        DataStreamRequestByteBuffer requestData, List<Object> 
list) {
+      DataStreamRequestByteBuffer request, List<Object> list) {
+    final ByteBuffer data = request.slice();
     ByteBuffer bb = ByteBuffer.allocateDirect(24);
-    bb.putLong(requestData.getStreamId());
-    bb.putLong(requestData.getDataOffset());
-    bb.putLong(requestData.getDataLength());
+    bb.putLong(request.getStreamId());
+    bb.putLong(request.getStreamOffset());
+    bb.putLong(data.remaining());
     bb.flip();
-    list.add(Unpooled.wrappedBuffer(bb, requestData.getBuf()));
+    list.add(Unpooled.wrappedBuffer(bb, data));
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
index 71132d9..52ab54d 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
@@ -18,7 +18,7 @@
 
 package org.apache.ratis.netty.server;
 
-import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -26,17 +26,15 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
 import java.nio.ByteBuffer;
 import java.util.List;
 
-public class DataStreamReplyEncoder extends 
MessageToMessageEncoder<DataStreamReply> {
+public class DataStreamReplyEncoder extends 
MessageToMessageEncoder<DataStreamReplyByteBuffer> {
   @Override
-  protected void encode(ChannelHandlerContext channelHandlerContext,
-                        DataStreamReply dataStreamReply,
-                        List<Object> list) {
+  protected void encode(ChannelHandlerContext context, 
DataStreamReplyByteBuffer reply, List<Object> list) {
 
     ByteBuffer bb = ByteBuffer.allocateDirect(24);
-    bb.putLong(dataStreamReply.getStreamId());
-    bb.putLong(dataStreamReply.getDataOffset());
-    bb.putLong(dataStreamReply.getDataLength());
+    bb.putLong(reply.getStreamId());
+    bb.putLong(reply.getStreamOffset());
+    bb.putLong(reply.getDataLength());
     bb.flip();
-    list.add(Unpooled.wrappedBuffer(bb, dataStreamReply.getResponse()));
+    list.add(Unpooled.wrappedBuffer(bb, reply.slice()));
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 6d5e4f6..b782481 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -18,36 +18,29 @@
 
 package org.apache.ratis.netty.server;
 
+import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 
-class DataStreamRequestByteBuf implements DataStreamRequest {
-  private long streamId;
-  private long dataOffset;
-  private ByteBuf buf;
-
-  DataStreamRequestByteBuf(long streamId, long dataOffset, ByteBuf buf) {
-    this.streamId = streamId;
-    this.dataOffset = dataOffset;
-    this.buf = buf;
-  }
-
-  @Override
-  public long getStreamId() {
-    return streamId;
-  }
+/**
+ * Implements {@link DataStreamRequest} with {@link ByteBuf}.
+ *
+ * This class is immutable.
+ */
+class DataStreamRequestByteBuf extends DataStreamPacketImpl implements 
DataStreamRequest {
+  private final ByteBuf buf;
 
-  @Override
-  public long getDataOffset() {
-    return dataOffset;
+  DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf buf) {
+    super(streamId, streamOffset);
+    this.buf = buf.asReadOnly();
   }
 
   @Override
   public long getDataLength() {
-    return buf.capacity();
+    return buf.readableBytes();
   }
 
-  public ByteBuf getBuf() {
-    return buf;
+  public ByteBuf slice() {
+    return buf.slice();
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 6ccabbc..c8092ef 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,8 +20,7 @@ package org.apache.ratis.netty.server;
 
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
@@ -79,31 +78,34 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     }
   }
 
-  private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
+  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
     if (released) {
-      return;
+      return 0;
     }
     try {
       if (stream == null) {
-        return;
+        return 0;
       }
 
       final WritableByteChannel channel = stream.getWritableByteChannel();
+      long byteWritten = 0;
       for (ByteBuffer buffer : buf.nioBuffers()) {
         try {
-          channel.write(buffer);
+          byteWritten += channel.write(buffer);
         } catch (Throwable t) {
           throw new CompletionException(t);
         }
       }
+      return byteWritten;
     } finally {
       buf.release();
     }
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx) {
-    final DataStreamReply reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getDataOffset(), 
ByteBuffer.wrap("OK".getBytes()));
+  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, 
ChannelHandlerContext ctx) {
+    // TODO RATIS-1098: include byteWritten and isSuccess in the reply
+    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+        request.getStreamId(), request.getStreamOffset(), 
ByteBuffer.wrap("OK".getBytes()));
     ctx.writeAndFlush(reply);
   }
 
@@ -111,13 +113,12 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
-        final long streamId = req.getStreamId();
-        final ByteBuf buf = req.getBuf();
+        final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
+        final ByteBuf buf = request.slice();
         final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, 
released))
-            .thenAccept(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(dummy -> sendReply(req, ctx));
+        streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf, released))
+            .thenApply(stream -> writeTo(buf, stream, released.get()))
+            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
       }
     };
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java 
b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
index f49564c..f37035c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
@@ -87,12 +87,12 @@ public abstract class MessageStreamApiTests<CLUSTER extends 
MiniRaftCluster> ext
   @Test
   public void testStreamAsync() throws Exception {
     final RaftProperties p = getProperties();
-    RaftClientConfigKeys.Stream.setSubmessageSize(p, SUBMESSAGE_SIZE);
+    RaftClientConfigKeys.MessageStream.setSubmessageSize(p, SUBMESSAGE_SIZE);
     p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
 
     runWithNewCluster(NUM_SERVERS, this::runTestStreamAsync);
-    RaftClientConfigKeys.Stream.setSubmessageSize(p);
+    RaftClientConfigKeys.MessageStream.setSubmessageSize(p);
   }
 
   void runTestStreamAsync(CLUSTER cluster) throws Exception {

Reply via email to