leixm commented on code in PR #742:
URL: https://github.com/apache/incubator-uniffle/pull/742#discussion_r1146178309


##########
common/src/main/java/org/apache/uniffle/common/netty/TransportFrameDecoder.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty;
+
+import java.util.LinkedList;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import org.apache.uniffle.common.netty.protocol.Message;
+
+/**
+ * A customized frame decoder that allows intercepting raw data.
+ *
+ * <p>This behaves like Netty's frame decoder (with hard coded parameters that 
match this library's
+ * needs), except it allows an interceptor to be installed to read data 
directly before it's framed.
+ *
+ * <p>Unlike Netty's frame decoder, each frame is dispatched to child handlers 
as soon as it's
+ * decoded, instead of building as many frames as the current buffer allows 
and dispatching all of
+ * them. This allows a child handler to install an interceptor if needed.
+ *
+ * <p>If an interceptor is installed, framing stops, and data is instead fed 
directly to the
+ * interceptor. When the interceptor indicates that it doesn't need to read 
any more data, framing
+ * resumes. Interceptors should not hold references to the data buffers 
provided to their handle()
+ * method.
+ */
+public class TransportFrameDecoder extends ChannelInboundHandlerAdapter 
implements FrameDecoder {
+  private int msgSize = -1;
+  private Message.Type curType = Message.Type.UNKNOWN_TYPE;
+  private ByteBuf headerBuf = Unpooled.buffer(HEADER_SIZE, HEADER_SIZE);
+  private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
+  private static final int UNKNOWN_FRAME_SIZE = -1;
+
+  private final LinkedList<ByteBuf> buffers = new LinkedList<>();
+
+  private long totalSize = 0;
+  private long nextFrameSize = UNKNOWN_FRAME_SIZE;
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object data) {
+    ByteBuf in = (ByteBuf) data;
+    buffers.add(in);
+    totalSize += in.readableBytes();
+
+    while (!buffers.isEmpty()) {
+      ByteBuf frame = decodeNext();
+      if (frame == null) {
+        break;
+      }
+      Message msg = Message.decode(curType, frame);
+      frame.release();
+      ctx.fireChannelRead(msg);
+      clear();
+    }
+  }
+
+  private void clear() {
+    curType = Message.Type.UNKNOWN_TYPE;
+    msgSize = -1;
+    headerBuf.clear();
+  }
+
+  private long decodeFrameSize() {
+    if (nextFrameSize != UNKNOWN_FRAME_SIZE || totalSize < HEADER_SIZE) {
+      return nextFrameSize;
+    }
+
+    // We know there's enough data. If the first buffer contains all the data, 
great. Otherwise,
+    // hold the bytes for the frame length in a composite buffer until we have 
enough data to read
+    // the frame size. Normally, it should be rare to need more than one 
buffer to read the frame
+    // size.
+    ByteBuf first = buffers.getFirst();
+    if (first.readableBytes() >= HEADER_SIZE) {
+      msgSize = first.readInt();
+      curType = Message.Type.decode(first);
+      nextFrameSize = msgSize;
+      totalSize -= HEADER_SIZE;
+      if (!first.isReadable()) {
+        buffers.removeFirst().release();
+      }
+      return nextFrameSize;
+    }
+
+    while (headerBuf.readableBytes() < HEADER_SIZE) {
+      ByteBuf next = buffers.getFirst();
+      int toRead = Math.min(next.readableBytes(), HEADER_SIZE - 
headerBuf.readableBytes());
+      headerBuf.writeBytes(next, toRead);
+      if (!next.isReadable()) {
+        buffers.removeFirst().release();
+      }
+    }
+
+    msgSize = headerBuf.readInt();
+    curType = Message.Type.decode(headerBuf);
+    nextFrameSize = msgSize;
+    totalSize -= HEADER_SIZE;
+    return nextFrameSize;
+  }
+
+  private ByteBuf decodeNext() {
+    long frameSize = decodeFrameSize();
+    if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) {
+      return null;
+    }
+
+    // Reset size for next frame.
+    nextFrameSize = UNKNOWN_FRAME_SIZE;
+
+    Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: 
%s", frameSize);

Review Comment:
   Maybe not very important, Spark does the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to