jerqi commented on code in PR #1313:
URL:
https://github.com/apache/incubator-uniffle/pull/1313#discussion_r1393696374
##########
common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java:
##########
@@ -17,50 +17,81 @@
package org.apache.uniffle.common.netty;
+import java.util.List;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.MessageToMessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.netty.protocol.Message;
-import org.apache.uniffle.common.netty.protocol.Transferable;
+import org.apache.uniffle.common.netty.protocol.MessageWithHeader;
+import org.apache.uniffle.common.netty.protocol.RpcResponse;
/**
* Encoder used by the server side to encode server-to-client responses. This
encoder is stateless
- * so it is safe to be shared by multiple threads. The content of encode
consists of two parts,
- * header and message body. The encoded binary stream contains encodeLength (4
bytes), messageType
- * (1 byte) and messageBody (encodeLength bytes).
+ * so it is safe to be shared by multiple threads.
*/
@ChannelHandler.Sharable
-public class MessageEncoder extends ChannelOutboundHandlerAdapter {
+public final class MessageEncoder extends MessageToMessageEncoder<Message> {
- private static final Logger LOG =
LoggerFactory.getLogger(MessageEncoder.class);
+ private static final Logger logger =
LoggerFactory.getLogger(MessageEncoder.class);
public static final MessageEncoder INSTANCE = new MessageEncoder();
private MessageEncoder() {}
+ /**
+ * * Encodes a Message by invoking its encode() method. For non-data
messages, we will add one
+ * ByteBuf to 'out' containing the total frame length, the message type, and
the message itself.
+ * In the case of a ChunkFetchSuccess, we will also add the ManagedBuffer
corresponding to the
+ * data to 'out', in order to enable zero-copy transfer.
+ */
@Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) {
- Message message = (Message) msg;
- int encodeLength = message.encodedLength();
- ByteBuf byteBuf = ctx.alloc().buffer(FrameDecoder.HEADER_SIZE +
encodeLength);
- try {
- byteBuf.writeInt(encodeLength);
- byteBuf.writeByte(message.type().id());
- message.encode(byteBuf);
- } catch (Exception e) {
- LOG.error("Unexpected exception during process encode!", e);
- byteBuf.release();
- throw e;
+ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out)
throws Exception {
+ Object body = null;
+ int bodyLength = 0;
+
+ // If the message has a body, take it out to enable zero-copy transfer for
the payload.
+ if (in.body() != null) {
+ try {
+ bodyLength = (int) in.body().size();
+ body = in.body().convertToNetty();
+ } catch (Exception e) {
+ in.body().release();
+ if (in instanceof RpcResponse) {
+ RpcResponse resp = (RpcResponse) in;
+ // Re-encode this message as a failure response.
+ String error = e.getMessage() != null ? e.getMessage() : "null";
+ logger.error(
+ String.format("Error processing %s for client %s", in,
ctx.channel().remoteAddress()),
+ e);
+ encode(ctx, resp.createFailureResponse(error), out);
+ } else {
+ throw e;
+ }
+ return;
+ }
}
- ctx.writeAndFlush(byteBuf);
- // do transferTo send data after encode buffer send.
- if (message instanceof Transferable) {
- ((Transferable) message).transferTo(ctx.channel());
+
+ Message.Type msgType = in.type();
+ // message size, message type size, body size, message encoded length
+ int headerLength = 4 + msgType.encodedLength() + 4 + in.encodedLength();
Review Comment:
What's the meaning of `4`? Could we replace them with constants?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]