This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e8940d  RATIS-1106. Add type for DataStream (#232)
4e8940d is described below

commit 4e8940daf2f818c4e5d64c627bc971110edff62b
Author: runzhiwang <[email protected]>
AuthorDate: Tue Oct 27 11:56:51 2020 +0800

    RATIS-1106. Add type for DataStream (#232)
---
 .../ratis/client/impl/DataStreamClientImpl.java    |  6 +-
 .../ratis/client/impl/OrderedStreamAsync.java      |  9 +--
 .../impl/DataStreamPacketByteBuffer.java           |  5 +-
 .../datastream/impl/DataStreamPacketImpl.java      | 11 +++-
 .../datastream/impl/DataStreamReplyByteBuffer.java |  8 ++-
 .../impl/DataStreamRequestByteBuffer.java          |  5 +-
 .../apache/ratis/protocol/DataStreamPacket.java    | 10 +---
 .../ratis/protocol/DataStreamPacketHeader.java     | 25 ++------
 .../org/apache/ratis/protocol/DataStreamReply.java | 21 -------
 .../ratis/protocol/DataStreamReplyHeader.java      | 57 ++++++++++++-------
 .../apache/ratis/protocol/DataStreamRequest.java   |  4 --
 .../ratis/protocol/DataStreamRequestHeader.java    | 53 +++++++++++------
 .../apache/ratis/netty/NettyDataStreamUtils.java   | 66 +++++++++++++++++++---
 .../ratis/netty/client/NettyClientStreamRpc.java   |  2 +-
 .../netty/server/DataStreamRequestByteBuf.java     |  7 ++-
 .../ratis/netty/server/NettyServerStreamRpc.java   |  9 +--
 ratis-proto/src/main/proto/Raft.proto              | 24 ++++++++
 .../apache/ratis/datastream/TestDataStream.java    |  3 +
 18 files changed, 206 insertions(+), 119 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 0bfd662..51e9eb6 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -30,6 +30,7 @@ import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,13 +78,14 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       this.header = new RaftClientRequest(clientId, raftServer.getId(), 
groupId, RaftClientImpl.nextCallId(),
           RaftClientRequest.writeRequestType());
       this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
-          
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer());
+          
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(),
 Type.STREAM_HEADER);
     }
 
     // send to the attached dataStreamClientRpc
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
-      final CompletableFuture<DataStreamReply> f = 
orderedStreamAsync.sendRequest(streamId, streamOffset, buf);
+      final CompletableFuture<DataStreamReply> f = 
orderedStreamAsync.sendRequest(streamId, streamOffset, buf,
+          Type.STREAM_DATA);
       streamOffset += buf.remaining();
       return f;
     }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index fb4a7d9..6bfca51 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -42,8 +43,8 @@ public class OrderedStreamAsync {
     private final long seqNum;
     private final CompletableFuture<DataStreamReply> replyFuture = new 
CompletableFuture<>();
 
-    DataStreamWindowRequest(long streamId, long offset, ByteBuffer data, long 
seqNum){
-      super(streamId, offset, data);
+    DataStreamWindowRequest(long streamId, long offset, ByteBuffer data, long 
seqNum, Type type){
+      super(streamId, offset, data, type);
       this.seqNum = seqNum;
     }
 
@@ -86,7 +87,7 @@ public class OrderedStreamAsync {
     this.requestSemaphore = new 
Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties)*2);
   }
 
-  CompletableFuture<DataStreamReply> sendRequest(long streamId, long offset, 
ByteBuffer data){
+  CompletableFuture<DataStreamReply> sendRequest(long streamId, long offset, 
ByteBuffer data, Type type){
     final int length = data.remaining();
     try {
       requestSemaphore.acquire();
@@ -95,7 +96,7 @@ public class OrderedStreamAsync {
           "Interrupted when sending streamId=" + streamId + ", offset= " + 
offset + ", length=" + length, e));
     }
     final LongFunction<DataStreamWindowRequest> constructor
-        = seqNum -> new DataStreamWindowRequest(streamId, offset, 
data.slice(), seqNum);
+        = seqNum -> new DataStreamWindowRequest(streamId, offset, 
data.slice(), seqNum, type);
     return slidingWindow.submitNewRequest(constructor, 
this::sendRequestToNetwork).
            getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
   }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index 6eea669..fcb420d 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.datastream.impl;
 
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import java.nio.ByteBuffer;
 
 /**
@@ -27,8 +28,8 @@ public abstract class DataStreamPacketByteBuffer extends 
DataStreamPacketImpl {
 
   private final ByteBuffer buffer;
 
-  public DataStreamPacketByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer) {
-    super(streamId, streamOffset);
+  public DataStreamPacketByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer, Type type) {
+    super(streamId, streamOffset, type);
     this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY;
   }
 
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
index 7f76042..f47caee 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.datastream.impl;
 
 import org.apache.ratis.protocol.DataStreamPacket;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
 /**
  * This is an abstract implementation of {@link DataStreamPacket}.
@@ -27,10 +28,12 @@ import org.apache.ratis.protocol.DataStreamPacket;
 public abstract class DataStreamPacketImpl implements DataStreamPacket {
   private final long streamId;
   private final long streamOffset;
+  private final Type type;
 
-  public DataStreamPacketImpl(long streamId, long streamOffset) {
+  public DataStreamPacketImpl(long streamId, long streamOffset, Type type) {
     this.streamId = streamId;
     this.streamOffset = streamOffset;
+    this.type = type;
   }
 
   @Override
@@ -44,11 +47,17 @@ public abstract class DataStreamPacketImpl implements 
DataStreamPacket {
   }
 
   @Override
+  public Type getType() {
+    return type;
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName() + "{"
         + "streamId=" + getStreamId()
         + ", streamOffset=" + getStreamOffset()
         + ", dataLength=" + getDataLength()
+        + ", type=" + getType()
         + '}';
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 230c59b..3e0736e 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -19,6 +19,7 @@ package org.apache.ratis.datastream.impl;
 
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamReplyHeader;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
 import java.nio.ByteBuffer;
 
@@ -32,15 +33,16 @@ public class DataStreamReplyByteBuffer extends 
DataStreamPacketByteBuffer implem
   private final boolean success;
 
   public DataStreamReplyByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer,
-      long bytesWritten, boolean success ) {
-    super(streamId, streamOffset, buffer);
+      long bytesWritten, boolean success, Type type) {
+    super(streamId, streamOffset, buffer, type);
 
     this.success = success;
     this.bytesWritten = bytesWritten;
   }
 
   public DataStreamReplyByteBuffer(DataStreamReplyHeader header, ByteBuffer 
buffer) {
-    this(header.getStreamId(), header.getStreamOffset(), buffer, 
header.getBytesWritten(), header.isSuccess());
+    this(header.getStreamId(), header.getStreamOffset(), buffer, 
header.getBytesWritten(), header.isSuccess(),
+        header.getType());
   }
 
   @Override
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index fb0f3ec..7e63340 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.datastream.impl;
 
 import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
 import java.nio.ByteBuffer;
 
@@ -27,7 +28,7 @@ import java.nio.ByteBuffer;
  * This class is immutable.
  */
 public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer 
implements DataStreamRequest {
-  public DataStreamRequestByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer) {
-    super(streamId, streamOffset, buffer);
+  public DataStreamRequestByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer, Type type) {
+    super(streamId, streamOffset, buffer, type);
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index fa3f706..995ed16 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -18,7 +18,7 @@
 
 package org.apache.ratis.protocol;
 
-import java.util.function.LongConsumer;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
 public interface DataStreamPacket {
   long getStreamId();
@@ -27,11 +27,5 @@ public interface DataStreamPacket {
 
   long getDataLength();
 
-  int getHeaderSize();
-
-  default void writeHeaderTo(LongConsumer putLong) {
-    putLong.accept(getStreamId());
-    putLong.accept(getStreamOffset());
-    putLong.accept(getDataLength());
-  }
+  Type getType();
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 56274b6..f3d705a 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -19,29 +19,17 @@
 package org.apache.ratis.protocol;
 
 import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.util.SizeInBytes;
 
-import java.util.function.LongSupplier;
-
 /** The header format is streamId, streamOffset, dataLength. */
 public class DataStreamPacketHeader extends DataStreamPacketImpl {
-  private static final SizeInBytes SIZE = SizeInBytes.valueOf(24);
-
-  public static int getSize() {
-    return SIZE.getSizeInt();
-  }
-
-  public static DataStreamPacketHeader read(LongSupplier readLong, int 
readableBytes) {
-    if (readableBytes < getSize()) {
-      return null;
-    }
-    return new DataStreamPacketHeader(readLong.getAsLong(), 
readLong.getAsLong(), readLong.getAsLong());
-  }
+  private static final SizeInBytes SIZE_OF_HEADER_LEN = SizeInBytes.valueOf(4);
 
   private final long dataLength;
 
-  public DataStreamPacketHeader(long streamId, long streamOffset, long 
dataLength) {
-    super(streamId, streamOffset);
+  public DataStreamPacketHeader(long streamId, long streamOffset, long 
dataLength, Type type) {
+    super(streamId, streamOffset, type);
     this.dataLength = dataLength;
   }
 
@@ -50,8 +38,7 @@ public class DataStreamPacketHeader extends 
DataStreamPacketImpl {
     return dataLength;
   }
 
-  @Override
-  public int getHeaderSize() {
-    return getSize();
+  public static int getSizeOfHeaderLen() {
+    return SIZE_OF_HEADER_LEN.getSizeInt();
   }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 9fb8fa7..9f50cc8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -18,30 +18,9 @@
 
 package org.apache.ratis.protocol;
 
-import java.util.function.LongConsumer;
-
 public interface DataStreamReply extends DataStreamPacket {
-  static boolean getSuccess(long flags) {
-    return flags == 0;
-  }
-
-  static long toFlags(boolean success) {
-    return success? 0: 1;
-  }
 
   boolean isSuccess();
 
   long getBytesWritten();
-
-  @Override
-  default int getHeaderSize() {
-    return DataStreamReplyHeader.getSize();
-  }
-
-  @Override
-  default void writeHeaderTo(LongConsumer putLong) {
-    DataStreamPacket.super.writeHeaderTo(putLong);
-    putLong.accept(getBytesWritten());
-    putLong.accept(toFlags(isSuccess()));
-  }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index 1db731c..e829b9f 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -18,43 +18,62 @@
 
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.util.SizeInBytes;
-
-import java.util.function.LongSupplier;
+import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** The header format is {@link DataStreamPacketHeader}, bytesWritten and 
flags. */
 public class DataStreamReplyHeader extends DataStreamPacketHeader implements 
DataStreamReply {
-  private static final SizeInBytes SIZE = 
SizeInBytes.valueOf(DataStreamPacketHeader.getSize() + 16);
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStreamReplyHeader.class);
 
-  public static int getSize() {
-    return SIZE.getSizeInt();
-  }
+  public static DataStreamReplyHeader read(ByteBuf buf) {
+    if (getSizeOfHeaderLen() > buf.readableBytes()) {
+      return null;
+    }
 
-  public static DataStreamReplyHeader read(LongSupplier readLong, int 
readableBytes) {
-    if (readableBytes < getSize()) {
+    int headerBufLen = buf.readInt();
+    if (headerBufLen > buf.readableBytes()) {
+      buf.resetReaderIndex();
       return null;
     }
-    final DataStreamPacketHeader packerHeader = 
DataStreamPacketHeader.read(readLong, readableBytes);
-    if (packerHeader == null) {
+
+    try {
+      ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
+      DataStreamReplyHeaderProto header = 
DataStreamReplyHeaderProto.parseFrom(headerBuf.nioBuffer());
+
+      if (header.getPacketHeader().getDataLength() + headerBufLen <= 
buf.readableBytes()) {
+        buf.readerIndex(buf.readerIndex() + headerBufLen);
+        return new DataStreamReplyHeader(
+            header.getPacketHeader().getStreamId(),
+            header.getPacketHeader().getStreamOffset(),
+            header.getPacketHeader().getDataLength(),
+            header.getPacketHeader().getType(),
+            header.getBytesWritten(),
+            header.getSuccess());
+      } else {
+        buf.resetReaderIndex();
+        return null;
+      }
+    } catch (InvalidProtocolBufferException e) {
+      LOG.error("Fail to decode reply header:", e);
+      buf.resetReaderIndex();
       return null;
     }
-    return new DataStreamReplyHeader(packerHeader, readLong.getAsLong(),
-        DataStreamReply.getSuccess(readLong.getAsLong()));
   }
 
   private final long bytesWritten;
   private final boolean success;
 
-  public DataStreamReplyHeader(long streamId, long streamOffset, long 
dataLength, long bytesWritten, boolean success) {
-    super(streamId, streamOffset, dataLength);
+  public DataStreamReplyHeader(
+      long streamId, long streamOffset, long dataLength, Type type, long 
bytesWritten, boolean success) {
+    super(streamId, streamOffset, dataLength, type);
     this.bytesWritten = bytesWritten;
     this.success = success;
   }
 
-  public DataStreamReplyHeader(DataStreamPacketHeader header, long 
bytesWritten, boolean success) {
-    this(header.getStreamId(), header.getStreamOffset(), 
header.getDataLength(), bytesWritten, success);
-  }
-
   @Override
   public long getBytesWritten() {
     return bytesWritten;
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index 797ddf4..8db10fe 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -19,8 +19,4 @@
 package org.apache.ratis.protocol;
 
 public interface DataStreamRequest extends DataStreamPacket {
-  @Override
-  default int getHeaderSize() {
-    return DataStreamRequestHeader.getSize();
-  }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index d6646df..b87d68f 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,37 +18,54 @@
 
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.util.SizeInBytes;
-
-import java.util.function.LongSupplier;
+import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The header format is the same {@link DataStreamPacketHeader}
  * since there are no additional fields.
  */
 public class DataStreamRequestHeader extends DataStreamPacketHeader implements 
DataStreamRequest {
-  private static final SizeInBytes SIZE = 
SizeInBytes.valueOf(DataStreamPacketHeader.getSize());
-
-  public static int getSize() {
-    return SIZE.getSizeInt();
-  }
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStreamRequestHeader.class);
 
-  public static DataStreamRequestHeader read(LongSupplier readLong, int 
readableBytes) {
-    if (readableBytes < getSize()) {
+  public static DataStreamRequestHeader read(ByteBuf buf) {
+    if (getSizeOfHeaderLen() > buf.readableBytes()) {
       return null;
     }
-    final DataStreamPacketHeader packerHeader = 
DataStreamPacketHeader.read(readLong, readableBytes);
-    if (packerHeader == null) {
+
+    int headerBufLen = buf.readInt();
+    if (headerBufLen > buf.readableBytes()) {
+      buf.resetReaderIndex();
       return null;
     }
-    return new DataStreamRequestHeader(packerHeader);
-  }
 
-  public DataStreamRequestHeader(long streamId, long streamOffset, long 
dataLength) {
-    super(streamId, streamOffset, dataLength);
+    try {
+      ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
+      DataStreamRequestHeaderProto header = 
DataStreamRequestHeaderProto.parseFrom(headerBuf.nioBuffer());
+
+      if (header.getPacketHeader().getDataLength() + headerBufLen <= 
buf.readableBytes()) {
+        buf.readerIndex(buf.readerIndex() + headerBufLen);
+        return new DataStreamRequestHeader(
+            header.getPacketHeader().getStreamId(),
+            header.getPacketHeader().getStreamOffset(),
+            header.getPacketHeader().getDataLength(),
+            header.getPacketHeader().getType());
+      } else {
+        buf.resetReaderIndex();
+        return null;
+      }
+    } catch (InvalidProtocolBufferException e) {
+      LOG.error("Fail to decode request header:", e);
+      buf.resetReaderIndex();
+      return null;
+    }
   }
 
-  public DataStreamRequestHeader(DataStreamPacketHeader header) {
-    this(header.getStreamId(), header.getStreamOffset(), 
header.getDataLength());
+  public DataStreamRequestHeader(long streamId, long streamOffset, long 
dataLength, Type type) {
+    super(streamId, streamOffset, dataLength, type);
   }
 }
\ No newline at end of file
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 757c179..0aadfea 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -17,9 +17,12 @@
  */
 package org.apache.ratis.netty;
 
-import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
+import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
 import org.apache.ratis.protocol.DataStreamPacketHeader;
 import org.apache.ratis.protocol.DataStreamReplyHeader;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
@@ -27,27 +30,74 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
+import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 public interface NettyDataStreamUtils {
-  static void encodeDataStreamPacketByteBuffer(DataStreamPacketByteBuffer 
packet, Consumer<ByteBuf> out) {
-    final ByteBuf buf = 
PooledByteBufAllocator.DEFAULT.directBuffer(packet.getHeaderSize());
-    packet.writeHeaderTo(buf::writeLong);
-    out.accept(buf);
-    out.accept(Unpooled.wrappedBuffer(packet.slice()));
+
+  static ByteBuffer 
getDataStreamRequestHeaderProtoByteBuf(DataStreamRequestByteBuffer request) {
+    DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
+        .newBuilder()
+        .setStreamId(request.getStreamId())
+        .setStreamOffset(request.getStreamOffset())
+        .setType(request.getType())
+        .setDataLength(request.getDataLength());
+    return DataStreamRequestHeaderProto
+        .newBuilder()
+        .setPacketHeader(b)
+        .build()
+        .toByteString()
+        .asReadOnlyByteBuffer();
+  }
+
+  static ByteBuffer 
getDataStreamReplyHeaderProtoByteBuf(DataStreamReplyByteBuffer reply) {
+    DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
+        .newBuilder()
+        .setStreamId(reply.getStreamId())
+        .setStreamOffset(reply.getStreamOffset())
+        .setType(reply.getType())
+        .setDataLength(reply.getDataLength());
+    return DataStreamReplyHeaderProto
+        .newBuilder()
+        .setPacketHeader(b)
+        .setBytesWritten(reply.getBytesWritten())
+        .setSuccess(reply.isSuccess())
+        .build()
+        .toByteString()
+        .asReadOnlyByteBuffer();
+  }
+
+  static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer 
request, Consumer<ByteBuf> out) {
+    ByteBuffer headerBuf = getDataStreamRequestHeaderProtoByteBuf(request);
+    final ByteBuf headerLenBuf =
+        
PooledByteBufAllocator.DEFAULT.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
+    headerLenBuf.writeInt(headerBuf.remaining());
+    out.accept(headerLenBuf);
+    out.accept(Unpooled.wrappedBuffer(headerBuf));
+    out.accept(Unpooled.wrappedBuffer(request.slice()));
+  }
+
+  static void encodeDataStreamReplyByteBuffer(DataStreamReplyByteBuffer reply, 
Consumer<ByteBuf> out) {
+    ByteBuffer headerBuf = getDataStreamReplyHeaderProtoByteBuf(reply);
+    final ByteBuf headerLenBuf =
+        
PooledByteBufAllocator.DEFAULT.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
+    headerLenBuf.writeInt(headerBuf.remaining());
+    out.accept(headerLenBuf);
+    out.accept(Unpooled.wrappedBuffer(headerBuf));
+    out.accept(Unpooled.wrappedBuffer(reply.slice()));
   }
 
   static DataStreamRequestByteBuf decodeDataStreamRequestByteBuf(ByteBuf buf) {
-    return Optional.ofNullable(DataStreamRequestHeader.read(buf::readLong, 
buf.readableBytes()))
+    return Optional.ofNullable(DataStreamRequestHeader.read(buf))
         .map(header -> checkHeader(header, buf))
         .map(header -> new DataStreamRequestByteBuf(header, decodeData(buf, 
header, ByteBuf::retain)))
         .orElse(null);
   }
 
   static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf 
buf) {
-    return Optional.ofNullable(DataStreamReplyHeader.read(buf::readLong, 
buf.readableBytes()))
+    return Optional.ofNullable(DataStreamReplyHeader.read(buf))
         .map(header -> checkHeader(header, buf))
         .map(header -> new DataStreamReplyByteBuffer(header, decodeData(buf, 
header, ByteBuf::nioBuffer)))
         .orElse(null);
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index c6b6266..5478880 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -88,7 +88,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() {
       @Override
       protected void encode(ChannelHandlerContext context, 
DataStreamRequestByteBuffer request, List<Object> out) {
-        NettyDataStreamUtils.encodeDataStreamPacketByteBuffer(request, 
out::add);
+        NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request, 
out::add);
       }
     };
   }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 9f67c75..499e6a5 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -21,6 +21,7 @@ package org.apache.ratis.netty.server;
 import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
@@ -32,13 +33,13 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements 
DataStreamRequest {
   private final ByteBuf buf;
 
-  public DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf 
buf) {
-    super(streamId, streamOffset);
+  public DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf 
buf, Type type) {
+    super(streamId, streamOffset, type);
     this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
   }
 
   public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) 
{
-    this(header.getStreamId(), header.getStreamOffset(), buf);
+    this(header.getStreamId(), header.getStreamOffset(), buf, 
header.getType());
   }
 
   @Override
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index b248572..bc98720 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -26,6 +26,7 @@ import 
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -185,13 +186,13 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   private void sendReplyNotSuccess(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), null, -1, false);
+        request.getStreamId(), request.getStreamOffset(), null, -1, false, 
request.getType());
     ctx.writeAndFlush(reply);
   }
 
   private void sendReplySuccess(DataStreamRequestByteBuf request, long 
bytesWritten, ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), null, bytesWritten, 
true);
+        request.getStreamId(), request.getStreamOffset(), null, bytesWritten, 
true, request.getType());
     ctx.writeAndFlush(reply);
   }
 
@@ -217,7 +218,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf) 
msg;
         final ByteBuf buf = request.slice();
-        final boolean isHeader = request.getStreamOffset() == -1;
+        final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
         final CompletableFuture<Long> localWrite = isHeader ?
                 streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf)).thenApply(stream -> 0L)
@@ -291,7 +292,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
       @Override
       protected void encode(ChannelHandlerContext context, 
DataStreamReplyByteBuffer reply, List<Object> out) {
-        NettyDataStreamUtils.encodeDataStreamPacketByteBuffer(reply, out::add);
+        NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add);
       }
     };
   }
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 4b82228..3a19f70 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -279,6 +279,30 @@ message RaftClientRequestProto {
   }
 }
 
+message DataStreamPacketHeaderProto {
+  enum Type {
+    STREAM_HEADER = 0;
+    STREAM_DATA = 1;
+    STREAM_CLOSE = 2;
+    START_TRANSACTION = 3;
+  }
+
+  uint64 streamId = 1;
+  uint64 streamOffset = 2;
+  Type type = 3;
+  uint64 dataLength = 4;
+}
+
+message DataStreamRequestHeaderProto {
+  DataStreamPacketHeaderProto packetHeader = 1;
+}
+
+message DataStreamReplyHeaderProto {
+  DataStreamPacketHeaderProto packetHeader = 1;
+  uint64 bytesWritten = 2;
+  bool success = 3;
+}
+
 message NotLeaderExceptionProto {
   RaftPeerProto suggestedLeader = 1;
   repeated RaftPeerProto peersInConf = 2;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index 6e115eb..f398174 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -205,6 +206,7 @@ public class TestDataStream extends BaseTest {
       final DataStreamReply reply = impl.getHeaderFuture().join();
       Assert.assertTrue(reply.isSuccess());
       Assert.assertEquals(0, reply.getBytesWritten());
+      Assert.assertEquals(reply.getType(), Type.STREAM_HEADER);
     }
 
     // check writeAsync requests
@@ -212,6 +214,7 @@ public class TestDataStream extends BaseTest {
       final DataStreamReply reply = futures.get(i).join();
       Assert.assertTrue(reply.isSuccess());
       Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
+      Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
     }
 
     for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {

Reply via email to