This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b3d9200 RATIS-1328. Avoid parse proto for each packet (#434)
b3d9200 is described below
commit b3d9200ea11721a2711fd06c43c9aa8fe9e0fdb3
Author: runzhiwang <[email protected]>
AuthorDate: Thu Mar 4 17:12:08 2021 +0800
RATIS-1328. Avoid parse proto for each packet (#434)
---
.../java/org/apache/ratis/protocol/DataStreamPacketHeader.java | 5 +++++
.../java/org/apache/ratis/protocol/DataStreamRequestHeader.java | 8 +++++++-
.../main/java/org/apache/ratis/netty/NettyDataStreamUtils.java | 6 ++++++
3 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 3bd7512..842cf28 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -25,6 +25,7 @@ import org.apache.ratis.util.SizeInBytes;
/** The header format is streamId, streamOffset, dataLength. */
public class DataStreamPacketHeader extends DataStreamPacketImpl {
private static final SizeInBytes SIZE_OF_HEADER_LEN = SizeInBytes.valueOf(4);
+ private static final SizeInBytes SIZE_OF_HEADER_BODY_LEN =
SizeInBytes.valueOf(8);
private final long dataLength;
@@ -41,4 +42,8 @@ public class DataStreamPacketHeader extends
DataStreamPacketImpl {
public static int getSizeOfHeaderLen() {
return SIZE_OF_HEADER_LEN.getSizeInt();
}
+
+ public static int getSizeOfHeaderBodyLen() {
+ return SIZE_OF_HEADER_BODY_LEN.getSizeInt();
+ }
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 8abbe3b..c8f8cfc 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -37,7 +37,13 @@ public class DataStreamRequestHeader extends
DataStreamPacketHeader implements D
private static final Logger LOG =
LoggerFactory.getLogger(DataStreamRequestHeader.class);
public static DataStreamRequestHeader read(ByteBuf buf) {
- if (getSizeOfHeaderLen() > buf.readableBytes()) {
+ if (getSizeOfHeaderBodyLen() > buf.readableBytes()) {
+ return null;
+ }
+
+ long headerBodyBufLen = buf.readLong();
+ if (headerBodyBufLen > buf.readableBytes()) {
+ buf.resetReaderIndex();
return null;
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 1f4f7c5..d463b07 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -83,9 +83,15 @@ public interface NettyDataStreamUtils {
static void encodeDataStreamRequestHeader(DataStreamRequest request,
Consumer<Object> out,
ByteBufAllocator allocator) {
final ByteBuffer headerBuf =
getDataStreamRequestHeaderProtoByteBuffer(request);
+
+ final ByteBuf headerBodyLenBuf =
allocator.directBuffer(DataStreamPacketHeader.getSizeOfHeaderBodyLen());
+ headerBodyLenBuf.writeLong(headerBuf.remaining() +
request.getDataLength());
+ out.accept(headerBodyLenBuf);
+
final ByteBuf headerLenBuf =
allocator.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
headerLenBuf.writeInt(headerBuf.remaining());
out.accept(headerLenBuf);
+
out.accept(Unpooled.wrappedBuffer(headerBuf));
}