zhijiangW commented on a change in pull request #7368: [FLINK-10742][network]
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r376215513
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -188,90 +186,18 @@ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise)
}
}
- /**
- * Message decoder based on netty's {@link
LengthFieldBasedFrameDecoder} but avoiding the
- * additional memory copy inside {@link
#extractFrame(ChannelHandlerContext, ByteBuf, int, int)}
- * since we completely decode the {@link ByteBuf} inside {@link
#decode(ChannelHandlerContext,
- * ByteBuf)} and will not re-use it afterwards.
- *
- * <p>The frame-length encoder will be based on this transmission
scheme created by {@link NettyMessage#allocateBuffer(ByteBufAllocator, byte,
int)}:
- * <pre>
- * +------------------+------------------+--------++----------------+
- * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE |
- * +------------------+------------------+--------++----------------+
- * </pre>
- */
- static class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
- /**
- * Creates a new message decoded with the required frame
properties.
- */
- NettyMessageDecoder() {
- super(Integer.MAX_VALUE, 0, 4, -4, 4);
- }
-
- @Override
- protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
throws Exception {
- ByteBuf msg = (ByteBuf) super.decode(ctx, in);
- if (msg == null) {
- return null;
- }
-
- try {
- int magicNumber = msg.readInt();
-
- if (magicNumber != MAGIC_NUMBER) {
- throw new IllegalStateException(
- "Network stream corrupted:
received incorrect magic number.");
- }
-
- byte msgId = msg.readByte();
-
- final NettyMessage decodedMsg;
- switch (msgId) {
- case BufferResponse.ID:
- decodedMsg =
BufferResponse.readFrom(msg);
- break;
- case PartitionRequest.ID:
- decodedMsg =
PartitionRequest.readFrom(msg);
- break;
- case TaskEventRequest.ID:
- decodedMsg =
TaskEventRequest.readFrom(msg, getClass().getClassLoader());
- break;
- case ErrorResponse.ID:
- decodedMsg =
ErrorResponse.readFrom(msg);
- break;
- case CancelPartitionRequest.ID:
- decodedMsg =
CancelPartitionRequest.readFrom(msg);
- break;
- case CloseRequest.ID:
- decodedMsg =
CloseRequest.readFrom(msg);
- break;
- case AddCredit.ID:
- decodedMsg =
AddCredit.readFrom(msg);
- break;
- default:
- throw new ProtocolException(
- "Received unknown
message from producer: " + msg);
- }
-
- return decodedMsg;
- } finally {
- // ByteToMessageDecoder cleanup (only the
BufferResponse holds on to the decoded
- // msg but already retain()s the buffer once)
- msg.release();
- }
- }
- }
-
//
------------------------------------------------------------------------
// Server responses
//
------------------------------------------------------------------------
static class BufferResponse extends NettyMessage {
- private static final byte ID = 0;
+ static final byte ID = 0;
- final ByteBuf buffer;
+ // receiver ID (16), sequence number (4), backlog (4), isBuffer
(1), isCompressed (1), buffer size (4)
+ static final int MESSAGE_HEADER_LENGTH = 16 + 4 + 4 + 1 + 1 + 4;
+
+ final Buffer buffer;
Review comment:
This var should be annotated with `Nullable`, but I do not suggest bringing
nullable var if not very necessary refer to
https://github.com/apache/flink/pull/7368#pullrequestreview-354930094.
Otherwise every usage should judge the nullable condition to avoid NPE.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services