This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b3d9200  RATIS-1328. Avoid parse proto for each packet (#434)
b3d9200 is described below

commit b3d9200ea11721a2711fd06c43c9aa8fe9e0fdb3
Author: runzhiwang <[email protected]>
AuthorDate: Thu Mar 4 17:12:08 2021 +0800

    RATIS-1328. Avoid parse proto for each packet (#434)
---
 .../java/org/apache/ratis/protocol/DataStreamPacketHeader.java    | 5 +++++
 .../java/org/apache/ratis/protocol/DataStreamRequestHeader.java   | 8 +++++++-
 .../main/java/org/apache/ratis/netty/NettyDataStreamUtils.java    | 6 ++++++
 3 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 3bd7512..842cf28 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -25,6 +25,7 @@ import org.apache.ratis.util.SizeInBytes;
 /** The header format is streamId, streamOffset, dataLength. */
 public class DataStreamPacketHeader extends DataStreamPacketImpl {
   private static final SizeInBytes SIZE_OF_HEADER_LEN = SizeInBytes.valueOf(4);
+  private static final SizeInBytes SIZE_OF_HEADER_BODY_LEN = 
SizeInBytes.valueOf(8);
 
   private final long dataLength;
 
@@ -41,4 +42,8 @@ public class DataStreamPacketHeader extends 
DataStreamPacketImpl {
   public static int getSizeOfHeaderLen() {
     return SIZE_OF_HEADER_LEN.getSizeInt();
   }
+
+  public static int getSizeOfHeaderBodyLen() {
+    return SIZE_OF_HEADER_BODY_LEN.getSizeInt();
+  }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 8abbe3b..c8f8cfc 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -37,7 +37,13 @@ public class DataStreamRequestHeader extends 
DataStreamPacketHeader implements D
   private static final Logger LOG = 
LoggerFactory.getLogger(DataStreamRequestHeader.class);
 
   public static DataStreamRequestHeader read(ByteBuf buf) {
-    if (getSizeOfHeaderLen() > buf.readableBytes()) {
+    if (getSizeOfHeaderBodyLen() > buf.readableBytes()) {
+      return null;
+    }
+
+    long headerBodyBufLen = buf.readLong();
+    if (headerBodyBufLen > buf.readableBytes()) {
+      buf.resetReaderIndex();
       return null;
     }
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 1f4f7c5..d463b07 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -83,9 +83,15 @@ public interface NettyDataStreamUtils {
   static void encodeDataStreamRequestHeader(DataStreamRequest request, 
Consumer<Object> out,
       ByteBufAllocator allocator) {
     final ByteBuffer headerBuf = 
getDataStreamRequestHeaderProtoByteBuffer(request);
+
+    final ByteBuf headerBodyLenBuf = 
allocator.directBuffer(DataStreamPacketHeader.getSizeOfHeaderBodyLen());
+    headerBodyLenBuf.writeLong(headerBuf.remaining() + 
request.getDataLength());
+    out.accept(headerBodyLenBuf);
+
     final ByteBuf headerLenBuf = 
allocator.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
     headerLenBuf.writeInt(headerBuf.remaining());
     out.accept(headerLenBuf);
+
     out.accept(Unpooled.wrappedBuffer(headerBuf));
   }
 

Reply via email to