leixm commented on code in PR #742: URL: https://github.com/apache/incubator-uniffle/pull/742#discussion_r1145091131
########## common/src/main/java/org/apache/uniffle/common/netty/TransportFrameDecoder.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty; + +import java.util.LinkedList; + +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import org.apache.uniffle.common.netty.protocol.Message; + +/** + * A customized frame decoder that allows intercepting raw data. + * + * <p>This behaves like Netty's frame decoder (with hard coded parameters that match this library's + * needs), except it allows an interceptor to be installed to read data directly before it's framed. + * + * <p>Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's + * decoded, instead of building as many frames as the current buffer allows and dispatching all of + * them. This allows a child handler to install an interceptor if needed. + * + * <p>If an interceptor is installed, framing stops, and data is instead fed directly to the + * interceptor. When the interceptor indicates that it doesn't need to read any more data, framing + * resumes. Interceptors should not hold references to the data buffers provided to their handle() + * method. + */ +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter implements FrameDecoder { + private int msgSize = -1; + private Message.Type curType = Message.Type.UNKNOWN_TYPE; + private ByteBuf headerBuf = Unpooled.buffer(HEADER_SIZE, HEADER_SIZE); + private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE; + private static final int UNKNOWN_FRAME_SIZE = -1; + + private final LinkedList<ByteBuf> buffers = new LinkedList<>(); + + private long totalSize = 0; + private long nextFrameSize = UNKNOWN_FRAME_SIZE; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object data) { + ByteBuf in = (ByteBuf) data; + buffers.add(in); + totalSize += in.readableBytes(); + + while (!buffers.isEmpty()) { + ByteBuf frame = decodeNext(); + if (frame == null) { + break; + } + Message msg = Message.decode(curType, frame); Review Comment: This happens only if there is a bug in decode, right? this should not exist. ########## common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.netty.protocol.Message; + +public class MessageEncoder extends ChannelOutboundHandlerAdapter { Review Comment: sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
