poorbarcode commented on code in PR #21684:
URL: https://github.com/apache/pulsar/pull/21684#discussion_r1418977159


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java:
##########
@@ -19,35 +19,67 @@
 package org.apache.pulsar.common.protocol;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.ProtocolDetectionResult;
 import io.netty.handler.codec.ProtocolDetectionState;
 import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.util.IllegalReferenceCountException;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Decoder that added whether a new connection is prefixed with the 
ProxyProtocol.
  * More about the ProxyProtocol see: 
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
  */
+@Slf4j
 public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter 
{
 
     public static final String NAME = "optional-proxy-protocol-decoder";
 
+    ByteBuf cumulatedByteBuf;
+
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         if (msg instanceof ByteBuf) {
-            ProtocolDetectionResult<HAProxyProtocolVersion> result =
-                    HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
-            // should accumulate data if need more data to detect the protocol
-            if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
-                return;
+            // Combine cumulated buffers.
+            ByteBuf buf = (ByteBuf) msg;
+            if (cumulatedByteBuf != null) {
+                buf = new CompositeByteBuf(ctx.alloc(), false, 2, 
cumulatedByteBuf, buf);
             }
 
-            if (result.state() == ProtocolDetectionState.DETECTED) {
-                ctx.pipeline().addAfter(NAME, null, new 
HAProxyMessageDecoder());
-                ctx.pipeline().remove(this);
+            try {
+                ProtocolDetectionResult<HAProxyProtocolVersion> result = 
HAProxyMessageDecoder.detectProtocol(buf);
+                if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+                    // Accumulate data if need more data to detect the 
protocol.
+                    cumulatedByteBuf = 
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(ctx.alloc(),
+                            cumulatedByteBuf == null ? Unpooled.EMPTY_BUFFER : 
cumulatedByteBuf, (ByteBuf) msg);
+                    return;
+                }
+                if (result.state() == ProtocolDetectionState.DETECTED) {
+                    ctx.pipeline().addAfter(NAME, null, new 
HAProxyMessageDecoder());
+                    ctx.pipeline().remove(this);
+                }
+                super.channelRead(ctx, buf);

Review Comment:
   > When we get the result ProtocolDetectionState.INVALID, it proves the user 
is not using HAProxy, we should remove the OptionalProxyProtocolDecoder from 
the pipeline and don't need to continue to accumulate bytes.
   
   The current implementation is most stable when it is uncertain how many 
times `HaProxy Prototl Notation` will be received during the lifetime of a 
connection (such as repeated send due to an error). 
   
   The probability of receiving one package that is less than `12` bytes is 
extremely low, so it doesn't affect performance.
   
   > Otherwise, if buffer.readableBytes() < 12 then the channelRead will block.
   
   PulsarCommand at least has `12` bytes: `[frame length][cmd length][base 
cmd]`, it will not cause a stuck.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to