This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b4496be687 [ISSUE #7010] Fix the HandshakeHandler returns when detect 
haproxy version need more data (#7011)
b4496be687 is described below

commit b4496be68705c1c0b282a07a1adeab4fffd670fe
Author: ShuangxiDing <[email protected]>
AuthorDate: Tue Jul 11 19:09:09 2023 +0800

    [ISSUE #7010] Fix the HandshakeHandler returns when detect haproxy version 
need more data (#7011)
    
    * Support dynamic modification of grpc tls mode to improve the scalability 
of ProtocolNegotiator
    
    * Support dynamic modification of grpc tls mode to improve the scalability 
of ProtocolNegotiator
    
    * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * 回滚netty的升级
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * add grpc-netty-codec-haproxy in bazel
    
    * add grpc-netty-codec-haproxy in bazel
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * Fix Test
    
    * add grpc-netty-codec-haproxy in bazel
    
    * add ProxyProtocolTest for Remoting
    
    * Move AttributeKey from RemotingHelper to AttributeKey.
    
    * Fix the needs more data for HandshakeHandler.
    
    * Fix the needs more data for HandshakeHandler.
    
    * Fix the needs more data for HandshakeHandler.
    
    * Fix the needs more data for HandshakeHandler.
    
    ---------
    
    Co-authored-by: 徒钟 <[email protected]>
---
 .../remoting/MultiProtocolRemotingServer.java      |  2 +-
 .../activity/AbstractRemotingActivity.java         | 16 ++++++++-------
 .../remoting/activity/ClientManagerActivity.java   | 24 ++++++++++++----------
 .../activity/AbstractRemotingActivityTest.java     | 10 +++++----
 .../rocketmq/remoting/common/RemotingHelper.java   | 21 +++++--------------
 .../rocketmq/remoting/netty/AttributeKeys.java     | 11 +++++++++-
 .../remoting/netty/NettyRemotingServer.java        | 23 +++++++--------------
 7 files changed, 51 insertions(+), 56 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 858b1f0227..12d728fff1 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -78,7 +78,7 @@ public class MultiProtocolRemotingServer extends 
NettyRemotingServer {
     @Override
     protected ChannelPipeline configChannel(SocketChannel ch) {
         return ch.pipeline()
-            .addLast(this.getDefaultEventExecutorGroup(), 
HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler())
+            .addLast(this.getDefaultEventExecutorGroup(), 
HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
             .addLast(this.getDefaultEventExecutorGroup(),
                 new IdleStateHandler(0, 0, 
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                 new ProtocolNegotiationHandler(this.remotingProtocolHandler)
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
index 78cd203ec4..ce4a633976 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -19,9 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
 import org.apache.rocketmq.acl.common.AclException;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -41,11 +38,16 @@ import 
org.apache.rocketmq.proxy.processor.MessagingProcessor;
 import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
 import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
 public abstract class AbstractRemotingActivity implements 
NettyRequestProcessor {
     protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected final MessagingProcessor messagingProcessor;
@@ -126,13 +128,13 @@ public abstract class AbstractRemotingActivity implements 
NettyRequestProcessor
             .setProtocolType(ChannelProtocolType.REMOTING.getName())
             .setChannel(channel)
             
.setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
-            
.setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress()));
+            
.setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
-        
Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.LANGUAGE_CODE_KEY,
 channel))
+        
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY,
 channel))
             .ifPresent(language -> context.setLanguage(language.name()));
-        
Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.CLIENT_ID_KEY,
 channel))
+        
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY,
 channel))
             .ifPresent(context::setClientID);
-        
Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.VERSION_KEY,
 channel))
+        
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, 
channel))
             .ifPresent(version -> 
context.setClientVersion(MQVersion.getVersionDesc(version)));
 
         return context;
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
index 1eb81ce927..c671593a34 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -19,13 +19,20 @@ package org.apache.rocketmq.proxy.remoting.activity;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import java.util.Set;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ProducerChangeListener;
 import org.apache.rocketmq.broker.client.ProducerGroupEvent;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import 
org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
@@ -33,13 +40,8 @@ import 
org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHead
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;
-import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
-import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.Set;
 
 public class ClientManagerActivity extends AbstractRemotingActivity {
 
@@ -108,9 +110,9 @@ public class ClientManagerActivity extends 
AbstractRemotingActivity {
         if (channel instanceof RemotingChannel) {
             RemotingChannel remotingChannel = (RemotingChannel) channel;
             Channel parent = remotingChannel.parent();
-            RemotingHelper.setPropertyToAttr(parent, 
RemotingHelper.CLIENT_ID_KEY, clientChannelInfo.getClientId());
-            RemotingHelper.setPropertyToAttr(parent, 
RemotingHelper.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
-            RemotingHelper.setPropertyToAttr(parent, 
RemotingHelper.VERSION_KEY, clientChannelInfo.getVersion());
+            RemotingHelper.setPropertyToAttr(parent, 
AttributeKeys.CLIENT_ID_KEY, clientChannelInfo.getClientId());
+            RemotingHelper.setPropertyToAttr(parent, 
AttributeKeys.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
+            RemotingHelper.setPropertyToAttr(parent, 
AttributeKeys.VERSION_KEY, clientChannelInfo.getVersion());
         }
 
     }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
index 663a83e3c0..b2bd3a35f1 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
@@ -21,7 +21,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
-import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.acl.common.AclException;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -35,6 +34,7 @@ import 
org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
 import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
 import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
@@ -48,6 +48,8 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -82,9 +84,9 @@ public class AbstractRemotingActivityTest extends 
InitConfigTest {
             }
         };
         Channel channel = ctx.channel();
-        RemotingHelper.setPropertyToAttr(channel, 
RemotingHelper.CLIENT_ID_KEY, CLIENT_ID);
-        RemotingHelper.setPropertyToAttr(channel, 
RemotingHelper.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
-        RemotingHelper.setPropertyToAttr(channel, RemotingHelper.VERSION_KEY, 
MQVersion.CURRENT_VERSION);
+        RemotingHelper.setPropertyToAttr(channel, AttributeKeys.CLIENT_ID_KEY, 
CLIENT_ID);
+        RemotingHelper.setPropertyToAttr(channel, 
AttributeKeys.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
+        RemotingHelper.setPropertyToAttr(channel, AttributeKeys.VERSION_KEY, 
MQVersion.CURRENT_VERSION);
     }
 
     @Test
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index d0750b678f..363b22eac7 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.constant.HAProxyConstants;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -31,8 +30,8 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
 import org.apache.rocketmq.remoting.netty.NettySystemConfig;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
@@ -51,16 +50,6 @@ public class RemotingHelper {
     public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
-    private static final AttributeKey<String> REMOTE_ADDR_KEY = 
AttributeKey.valueOf("RemoteAddr");
-
-    private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = 
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
-    private static final AttributeKey<String> PROXY_PROTOCOL_PORT = 
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
-
-    public static final AttributeKey<String> CLIENT_ID_KEY = 
AttributeKey.valueOf("ClientId");
-
-    public static final AttributeKey<Integer> VERSION_KEY = 
AttributeKey.valueOf("Version");
-
-    public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = 
AttributeKey.valueOf("LanguageCode");
 
     public static final Map<Integer, String> REQUEST_CODE_MAP = new 
HashMap<Integer, String>() {
         {
@@ -213,7 +202,7 @@ public class RemotingHelper {
         if (StringUtils.isNotBlank(addr)) {
             return addr;
         }
-        Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
+        Attribute<String> att = channel.attr(AttributeKeys.REMOTE_ADDR_KEY);
         if (att == null) {
             // mocked in unit test
             return parseChannelRemoteAddr0(channel);
@@ -227,11 +216,11 @@ public class RemotingHelper {
     }
 
     private static String getProxyProtocolAddress(Channel channel) {
-        if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
+        if (!channel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
             return null;
         }
-        String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, 
channel);
-        String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, 
channel);
+        String proxyProtocolAddr = 
getAttributeValue(AttributeKeys.PROXY_PROTOCOL_ADDR, channel);
+        String proxyProtocolPort = 
getAttributeValue(AttributeKeys.PROXY_PROTOCOL_PORT, channel);
         if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == 
null) {
             return null;
         }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
index 4e69ab82d4..ebdde31f41 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
@@ -19,12 +19,21 @@ package org.apache.rocketmq.remoting.netty;
 
 import io.netty.util.AttributeKey;
 import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class AttributeKeys {
 
+    public static final AttributeKey<String> REMOTE_ADDR_KEY = 
AttributeKey.valueOf("RemoteAddr");
+
+    public static final AttributeKey<String> CLIENT_ID_KEY = 
AttributeKey.valueOf("ClientId");
+
+    public static final AttributeKey<Integer> VERSION_KEY = 
AttributeKey.valueOf("Version");
+
+    public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = 
AttributeKey.valueOf("LanguageCode");
+
     public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
             AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
 
@@ -40,6 +49,6 @@ public class AttributeKeys {
     private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = 
new ConcurrentHashMap<>();
 
     public static AttributeKey<String> valueOf(String name) {
-        return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
+        return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKey::valueOf);
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 445f06cc63..8ae87a6fa5 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -37,6 +37,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.ProtocolDetectionResult;
 import io.netty.handler.codec.ProtocolDetectionState;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
@@ -73,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.CertificateException;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -115,7 +117,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
 
     // sharable handlers
-    private HandshakeHandler handshakeHandler;
+    private TlsModeHandler tlsModeHandler;
     private NettyEncoder encoder;
     private NettyConnectManageHandler connectionManageHandler;
     private NettyServerHandler serverHandler;
@@ -265,7 +267,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
      */
     protected ChannelPipeline configChannel(SocketChannel ch) {
         return ch.pipeline()
-            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, 
handshakeHandler)
+            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new 
HandshakeHandler())
             .addLast(defaultEventExecutorGroup,
                 encoder,
                 new NettyDecoder(),
@@ -402,7 +404,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     }
 
     private void prepareSharableHandlers() {
-        handshakeHandler = new HandshakeHandler();
+        tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
         encoder = new NettyEncoder();
         connectionManageHandler = new NettyConnectManageHandler();
         serverHandler = new NettyServerHandler();
@@ -429,10 +431,6 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         return defaultEventExecutorGroup;
     }
 
-    public HandshakeHandler getHandshakeHandler() {
-        return handshakeHandler;
-    }
-
     public NettyEncoder getEncoder() {
         return encoder;
     }
@@ -449,17 +447,13 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         return distributionHandler;
     }
 
-    @ChannelHandler.Sharable
-    public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> 
{
-
-        private final TlsModeHandler tlsModeHandler;
+    public class HandshakeHandler extends ByteToMessageDecoder {
 
         public HandshakeHandler() {
-            tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
         }
 
         @Override
-        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf 
byteBuf) {
+        protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, 
List<Object> out) throws Exception {
             try {
                 ProtocolDetectionResult<HAProxyProtocolVersion> 
detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf);
                 if (detectionResult.state() == 
ProtocolDetectionState.NEEDS_MORE_DATA) {
@@ -479,9 +473,6 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 } catch (NoSuchElementException e) {
                     log.error("Error while removing HandshakeHandler", e);
                 }
-
-                // Hand over this message to the next .
-                ctx.fireChannelRead(byteBuf.retain());
             } catch (Exception e) {
                 log.error("process proxy protocol negotiator failed.", e);
                 throw e;

Reply via email to