smallzhongfeng commented on code in PR #742: URL: https://github.com/apache/incubator-uniffle/pull/742#discussion_r1145784004
########## common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty.protocol; + +import java.util.List; + +import io.netty.buffer.ByteBuf; + +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.util.ByteBufUtils; + +public class Encoders { + public static void encodeShuffleServerInfo(ShuffleServerInfo shuffleServerInfo, ByteBuf byteBuf) { + ByteBufUtils.writeLengthAndString(byteBuf, shuffleServerInfo.getId()); + ByteBufUtils.writeLengthAndString(byteBuf, shuffleServerInfo.getHost()); + byteBuf.writeInt(shuffleServerInfo.getGrpcPort()); + byteBuf.writeInt(shuffleServerInfo.getNettyPort()); + } + + public static void encodeShuffleBlockInfo(ShuffleBlockInfo shuffleBlockInfo, ByteBuf byteBuf) { + byteBuf.writeInt(shuffleBlockInfo.getPartitionId()); + byteBuf.writeLong(shuffleBlockInfo.getBlockId()); + byteBuf.writeInt(shuffleBlockInfo.getLength()); + byteBuf.writeInt(shuffleBlockInfo.getShuffleId()); + byteBuf.writeLong(shuffleBlockInfo.getCrc()); + byteBuf.writeLong(shuffleBlockInfo.getTaskAttemptId()); + // todo: avoid copy + ByteBufUtils.copyByteBuf(shuffleBlockInfo.getData(), byteBuf); + shuffleBlockInfo.getData().release(); + List<ShuffleServerInfo> shuffleServerInfoList = shuffleBlockInfo.getShuffleServerInfos(); + byteBuf.writeInt(shuffleServerInfoList.size()); + for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfoList) { + Encoders.encodeShuffleServerInfo(shuffleServerInfo, byteBuf); + } + byteBuf.writeInt(shuffleBlockInfo.getUncompressLength()); + byteBuf.writeLong(shuffleBlockInfo.getFreeMemory()); + } + + + public static int encodeLengthOfShuffleServerInfo(ShuffleServerInfo shuffleServerInfo) { + return ByteBufUtils.encodedLength(shuffleServerInfo.getId()) + + ByteBufUtils.encodedLength(shuffleServerInfo.getHost()) + + 2 * Integer.BYTES; + } + + public static int encodeLengthOfShuffleBlockInfo(ShuffleBlockInfo shuffleBlockInfo) { + int encodeLength = 4 * Long.BYTES + 4 * Integer.BYTES + ByteBufUtils.encodedLength(shuffleBlockInfo.getData()) + + Integer.BYTES; Review Comment: nit: control Indentation. ########## common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTestUtils.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty.protocol; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.uniffle.common.ShuffleBlockInfo; + +public class NettyProtocolTestUtils { + + private static boolean compareShuffleBlockInfo(ShuffleBlockInfo blockInfo1, ShuffleBlockInfo blockInfo2) { + return blockInfo1.getPartitionId() == blockInfo2.getPartitionId() + && blockInfo1.getBlockId() == blockInfo2.getBlockId() + && blockInfo1.getLength() == blockInfo2.getLength() + && blockInfo1.getShuffleId() == blockInfo2.getShuffleId() + && blockInfo1.getCrc() == blockInfo2.getCrc() + && blockInfo1.getTaskAttemptId() == blockInfo2.getTaskAttemptId() + && blockInfo1.getUncompressLength() == blockInfo2.getUncompressLength() + && blockInfo1.getFreeMemory() == blockInfo2.getFreeMemory() + && blockInfo1.getData().equals(blockInfo2.getData()) + && blockInfo1.getShuffleServerInfos().equals(blockInfo2.getShuffleServerInfos()); + } + + private static boolean compareBlockList(List<ShuffleBlockInfo> list1, List<ShuffleBlockInfo> list2) { + if (list1 == null || list2 == null || list1.size() != list2.size()) { + return false; + } + for (int i = 0; i < list1.size(); i++) { + if (!compareShuffleBlockInfo(list1.get(i), list2.get(i))) { + return false; + } + } + return true; + } + + private static boolean comparePartitionToBlockList(Map<Integer, List<ShuffleBlockInfo>> m1, + Map<Integer, List<ShuffleBlockInfo>> m2) { + if (m1 == null || m2 == null || m1.size() != m2.size()) { + return false; + } + Iterator<Map.Entry<Integer, List<ShuffleBlockInfo>>> iter1 = m1.entrySet().iterator(); + while (iter1.hasNext()) { + Map.Entry<Integer, List<ShuffleBlockInfo>> entry1 = iter1.next(); + if (!compareBlockList(entry1.getValue(), m2.get(entry1.getKey()))) { + return false; + } + } + return true; + } + + public static boolean compareSendShuffleDataRequest(SendShuffleDataRequest req1, + SendShuffleDataRequest req2) { + if (req1 == req2) { + return true; + } + if (req1 == null || req2 == null) { + return false; + } + boolean isEqual = req1.requestId == req2.requestId + && req1.getShuffleId() == req2.getShuffleId() Review Comment: nit: control Indentation. ########## common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTestUtils.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty.protocol; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.uniffle.common.ShuffleBlockInfo; + +public class NettyProtocolTestUtils { + + private static boolean compareShuffleBlockInfo(ShuffleBlockInfo blockInfo1, ShuffleBlockInfo blockInfo2) { + return blockInfo1.getPartitionId() == blockInfo2.getPartitionId() + && blockInfo1.getBlockId() == blockInfo2.getBlockId() Review Comment: nit: control Indentation. ########## common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty.protocol; + +import java.util.List; + +import io.netty.buffer.ByteBuf; + +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.util.ByteBufUtils; + +public class Encoders { + public static void encodeShuffleServerInfo(ShuffleServerInfo shuffleServerInfo, ByteBuf byteBuf) { + ByteBufUtils.writeLengthAndString(byteBuf, shuffleServerInfo.getId()); + ByteBufUtils.writeLengthAndString(byteBuf, shuffleServerInfo.getHost()); + byteBuf.writeInt(shuffleServerInfo.getGrpcPort()); + byteBuf.writeInt(shuffleServerInfo.getNettyPort()); + } + + public static void encodeShuffleBlockInfo(ShuffleBlockInfo shuffleBlockInfo, ByteBuf byteBuf) { + byteBuf.writeInt(shuffleBlockInfo.getPartitionId()); + byteBuf.writeLong(shuffleBlockInfo.getBlockId()); + byteBuf.writeInt(shuffleBlockInfo.getLength()); + byteBuf.writeInt(shuffleBlockInfo.getShuffleId()); + byteBuf.writeLong(shuffleBlockInfo.getCrc()); + byteBuf.writeLong(shuffleBlockInfo.getTaskAttemptId()); + // todo: avoid copy + ByteBufUtils.copyByteBuf(shuffleBlockInfo.getData(), byteBuf); + shuffleBlockInfo.getData().release(); + List<ShuffleServerInfo> shuffleServerInfoList = shuffleBlockInfo.getShuffleServerInfos(); + byteBuf.writeInt(shuffleServerInfoList.size()); + for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfoList) { + Encoders.encodeShuffleServerInfo(shuffleServerInfo, byteBuf); + } + byteBuf.writeInt(shuffleBlockInfo.getUncompressLength()); + byteBuf.writeLong(shuffleBlockInfo.getFreeMemory()); + } + + + public static int encodeLengthOfShuffleServerInfo(ShuffleServerInfo shuffleServerInfo) { + return ByteBufUtils.encodedLength(shuffleServerInfo.getId()) + + ByteBufUtils.encodedLength(shuffleServerInfo.getHost()) Review Comment: nit: control Indentation. ########## common/src/test/java/org/apache/uniffle/common/netty/EncoderAndDecoderTest.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.collect.Maps; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.netty.protocol.NettyProtocolTestUtils; +import org.apache.uniffle.common.netty.protocol.RpcResponse; +import org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest; +import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.common.util.NettyUtils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class EncoderAndDecoderTest { + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private ChannelFuture channelFuture; + private static final SendShuffleDataRequest DATA_REQUEST = generateShuffleDataRequest(); + private static final String EXPECTED_MESSAGE = "test_message"; + private static final long REQUEST_ID = 1; + private static final StatusCode STATUS_CODE = StatusCode.SUCCESS; + private static final int PORT = 12345; + + static class MockResponseHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof RpcResponse) { + RpcResponse rpcResponse = (RpcResponse) msg; + assertEquals(REQUEST_ID, rpcResponse.getRequestId()); + assertEquals(STATUS_CODE, rpcResponse.getStatusCode()); + assertEquals(EXPECTED_MESSAGE, rpcResponse.getRetMessage()); + } else if (msg instanceof SendShuffleDataRequest) { + SendShuffleDataRequest sendShuffleDataRequest = (SendShuffleDataRequest) msg; + assertTrue(NettyProtocolTestUtils.compareSendShuffleDataRequest(sendShuffleDataRequest, DATA_REQUEST)); + sendShuffleDataRequest.getPartitionToBlocks().values().stream().flatMap(Collection::stream) + .forEach(shuffleBlockInfo -> shuffleBlockInfo.getData().release()); + sendShuffleDataRequest.getPartitionToBlocks().values().stream().flatMap(Collection::stream) + .forEach(shuffleBlockInfo -> assertEquals(0,shuffleBlockInfo.getData().refCnt())); + RpcResponse rpcResponse = new RpcResponse(REQUEST_ID, STATUS_CODE, EXPECTED_MESSAGE); + ctx.writeAndFlush(rpcResponse); + } else { + throw new RssException("receive unexpected message!"); Review Comment: I think we can add a class of netty transport exceptions in the followup pr, which may better distinguish these exceptions. ########## common/src/main/java/org/apache/uniffle/common/netty/TransportFrameDecoder.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty; + +import java.util.LinkedList; + +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import org.apache.uniffle.common.netty.protocol.Message; + +/** + * A customized frame decoder that allows intercepting raw data. + * + * <p>This behaves like Netty's frame decoder (with hard coded parameters that match this library's + * needs), except it allows an interceptor to be installed to read data directly before it's framed. + * + * <p>Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's + * decoded, instead of building as many frames as the current buffer allows and dispatching all of + * them. This allows a child handler to install an interceptor if needed. + * + * <p>If an interceptor is installed, framing stops, and data is instead fed directly to the + * interceptor. When the interceptor indicates that it doesn't need to read any more data, framing + * resumes. Interceptors should not hold references to the data buffers provided to their handle() + * method. + */ +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter implements FrameDecoder { + private int msgSize = -1; + private Message.Type curType = Message.Type.UNKNOWN_TYPE; + private ByteBuf headerBuf = Unpooled.buffer(HEADER_SIZE, HEADER_SIZE); + private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE; + private static final int UNKNOWN_FRAME_SIZE = -1; + + private final LinkedList<ByteBuf> buffers = new LinkedList<>(); + + private long totalSize = 0; + private long nextFrameSize = UNKNOWN_FRAME_SIZE; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object data) { + ByteBuf in = (ByteBuf) data; + buffers.add(in); + totalSize += in.readableBytes(); + + while (!buffers.isEmpty()) { + ByteBuf frame = decodeNext(); + if (frame == null) { + break; + } + Message msg = Message.decode(curType, frame); + frame.release(); + ctx.fireChannelRead(msg); + clear(); + } + } + + private void clear() { + curType = Message.Type.UNKNOWN_TYPE; + msgSize = -1; + headerBuf.clear(); + } + + private long decodeFrameSize() { + if (nextFrameSize != UNKNOWN_FRAME_SIZE || totalSize < HEADER_SIZE) { + return nextFrameSize; + } + + // We know there's enough data. If the first buffer contains all the data, great. Otherwise, + // hold the bytes for the frame length in a composite buffer until we have enough data to read + // the frame size. Normally, it should be rare to need more than one buffer to read the frame + // size. + ByteBuf first = buffers.getFirst(); + if (first.readableBytes() >= HEADER_SIZE) { + msgSize = first.readInt(); + curType = Message.Type.decode(first); + nextFrameSize = msgSize; + totalSize -= HEADER_SIZE; + if (!first.isReadable()) { + buffers.removeFirst().release(); + } + return nextFrameSize; + } + + while (headerBuf.readableBytes() < HEADER_SIZE) { + ByteBuf next = buffers.getFirst(); + int toRead = Math.min(next.readableBytes(), HEADER_SIZE - headerBuf.readableBytes()); + headerBuf.writeBytes(next, toRead); + if (!next.isReadable()) { + buffers.removeFirst().release(); + } + } + + msgSize = headerBuf.readInt(); + curType = Message.Type.decode(headerBuf); + nextFrameSize = msgSize; + totalSize -= HEADER_SIZE; + return nextFrameSize; + } + + private ByteBuf decodeNext() { + long frameSize = decodeFrameSize(); + if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) { + return null; + } + + // Reset size for next frame. + nextFrameSize = UNKNOWN_FRAME_SIZE; + + Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize); Review Comment: It seems that we can `frameSize <= MAX_FRAME_SIZE`? ########## common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty.protocol; + +import java.util.List; + +import io.netty.buffer.ByteBuf; + +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.util.ByteBufUtils; + +public class Encoders { Review Comment: Yes, but I think it can be optimized in the followup PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
