coderzc commented on code in PR #21684: URL: https://github.com/apache/pulsar/pull/21684#discussion_r1418889787
########## pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java: ########## @@ -19,35 +19,67 @@ package org.apache.pulsar.common.protocol; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ProtocolDetectionResult; import io.netty.handler.codec.ProtocolDetectionState; import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import io.netty.util.IllegalReferenceCountException; +import lombok.extern.slf4j.Slf4j; /** * Decoder that added whether a new connection is prefixed with the ProxyProtocol. * More about the ProxyProtocol see: http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt. */ +@Slf4j public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter { public static final String NAME = "optional-proxy-protocol-decoder"; + ByteBuf cumulatedByteBuf; + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { - ProtocolDetectionResult<HAProxyProtocolVersion> result = - HAProxyMessageDecoder.detectProtocol((ByteBuf) msg); - // should accumulate data if need more data to detect the protocol - if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { - return; + // Combine cumulated buffers. + ByteBuf buf = (ByteBuf) msg; + if (cumulatedByteBuf != null) { + buf = new CompositeByteBuf(ctx.alloc(), false, 2, cumulatedByteBuf, buf); } - if (result.state() == ProtocolDetectionState.DETECTED) { - ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder()); - ctx.pipeline().remove(this); + try { + ProtocolDetectionResult<HAProxyProtocolVersion> result = HAProxyMessageDecoder.detectProtocol(buf); + if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { + // Accumulate data if need more data to detect the protocol. + cumulatedByteBuf = ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(ctx.alloc(), + cumulatedByteBuf == null ? Unpooled.EMPTY_BUFFER : cumulatedByteBuf, (ByteBuf) msg); + return; + } + if (result.state() == ProtocolDetectionState.DETECTED) { + ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder()); + ctx.pipeline().remove(this); + } + super.channelRead(ctx, buf); Review Comment: When we get the result `ProtocolDetectionState.INVALID`, it proves the user is not using HAProxy, we should remove the `OptionalProxyProtocolDecoder` from the pipeline and don't need to accumulate bytes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
