This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b4496be687 [ISSUE #7010] Fix the HandshakeHandler returns when detect
haproxy version need more data (#7011)
b4496be687 is described below
commit b4496be68705c1c0b282a07a1adeab4fffd670fe
Author: ShuangxiDing <[email protected]>
AuthorDate: Tue Jul 11 19:09:09 2023 +0800
[ISSUE #7010] Fix the HandshakeHandler returns when detect haproxy version
need more data (#7011)
* Support dynamic modification of grpc tls mode to improve the scalability
of ProtocolNegotiator
* Support dynamic modification of grpc tls mode to improve the scalability
of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC and Remoting server.
* 回滚netty的升级
* Support proxy protocol for gRPC and Remoting server.
* Support proxy protocol for gRPC and Remoting server.
* Support proxy protocol for gRPC and Remoting server.
* add grpc-netty-codec-haproxy in bazel
* add grpc-netty-codec-haproxy in bazel
* Support proxy protocol for gRPC and Remoting server.
* Fix Test
* add grpc-netty-codec-haproxy in bazel
* add ProxyProtocolTest for Remoting
* Move AttributeKey from RemotingHelper to AttributeKey.
* Fix the needs more data for HandshakeHandler.
* Fix the needs more data for HandshakeHandler.
* Fix the needs more data for HandshakeHandler.
* Fix the needs more data for HandshakeHandler.
---------
Co-authored-by: 徒钟 <[email protected]>
---
.../remoting/MultiProtocolRemotingServer.java | 2 +-
.../activity/AbstractRemotingActivity.java | 16 ++++++++-------
.../remoting/activity/ClientManagerActivity.java | 24 ++++++++++++----------
.../activity/AbstractRemotingActivityTest.java | 10 +++++----
.../rocketmq/remoting/common/RemotingHelper.java | 21 +++++--------------
.../rocketmq/remoting/netty/AttributeKeys.java | 11 +++++++++-
.../remoting/netty/NettyRemotingServer.java | 23 +++++++--------------
7 files changed, 51 insertions(+), 56 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 858b1f0227..12d728fff1 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -78,7 +78,7 @@ public class MultiProtocolRemotingServer extends
NettyRemotingServer {
@Override
protected ChannelPipeline configChannel(SocketChannel ch) {
return ch.pipeline()
- .addLast(this.getDefaultEventExecutorGroup(),
HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler())
+ .addLast(this.getDefaultEventExecutorGroup(),
HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
.addLast(this.getDefaultEventExecutorGroup(),
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new ProtocolNegotiationHandler(this.remotingProtocolHandler)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
index 78cd203ec4..ce4a633976 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -19,9 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -41,11 +38,16 @@ import
org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
public abstract class AbstractRemotingActivity implements
NettyRequestProcessor {
protected final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final MessagingProcessor messagingProcessor;
@@ -126,13 +128,13 @@ public abstract class AbstractRemotingActivity implements
NettyRequestProcessor
.setProtocolType(ChannelProtocolType.REMOTING.getName())
.setChannel(channel)
.setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
-
.setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress()));
+
.setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-
Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.LANGUAGE_CODE_KEY,
channel))
+
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY,
channel))
.ifPresent(language -> context.setLanguage(language.name()));
-
Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.CLIENT_ID_KEY,
channel))
+
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY,
channel))
.ifPresent(context::setClientID);
-
Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.VERSION_KEY,
channel))
+
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY,
channel))
.ifPresent(version ->
context.setClientVersion(MQVersion.getVersionDesc(version)));
return context;
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
index 1eb81ce927..c671593a34 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -19,13 +19,20 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.util.Set;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import
org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
@@ -33,13 +40,8 @@ import
org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHead
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;
-import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
-import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.Set;
public class ClientManagerActivity extends AbstractRemotingActivity {
@@ -108,9 +110,9 @@ public class ClientManagerActivity extends
AbstractRemotingActivity {
if (channel instanceof RemotingChannel) {
RemotingChannel remotingChannel = (RemotingChannel) channel;
Channel parent = remotingChannel.parent();
- RemotingHelper.setPropertyToAttr(parent,
RemotingHelper.CLIENT_ID_KEY, clientChannelInfo.getClientId());
- RemotingHelper.setPropertyToAttr(parent,
RemotingHelper.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
- RemotingHelper.setPropertyToAttr(parent,
RemotingHelper.VERSION_KEY, clientChannelInfo.getVersion());
+ RemotingHelper.setPropertyToAttr(parent,
AttributeKeys.CLIENT_ID_KEY, clientChannelInfo.getClientId());
+ RemotingHelper.setPropertyToAttr(parent,
AttributeKeys.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
+ RemotingHelper.setPropertyToAttr(parent,
AttributeKeys.VERSION_KEY, clientChannelInfo.getVersion());
}
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
index 663a83e3c0..b2bd3a35f1 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
@@ -21,7 +21,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -35,6 +34,7 @@ import
org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
@@ -48,6 +48,8 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.concurrent.CompletableFuture;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -82,9 +84,9 @@ public class AbstractRemotingActivityTest extends
InitConfigTest {
}
};
Channel channel = ctx.channel();
- RemotingHelper.setPropertyToAttr(channel,
RemotingHelper.CLIENT_ID_KEY, CLIENT_ID);
- RemotingHelper.setPropertyToAttr(channel,
RemotingHelper.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.VERSION_KEY,
MQVersion.CURRENT_VERSION);
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.CLIENT_ID_KEY,
CLIENT_ID);
+ RemotingHelper.setPropertyToAttr(channel,
AttributeKeys.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.VERSION_KEY,
MQVersion.CURRENT_VERSION);
}
@Test
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index d0750b678f..363b22eac7 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -31,8 +30,8 @@ import
org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
@@ -51,16 +50,6 @@ public class RemotingHelper {
public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
- private static final AttributeKey<String> REMOTE_ADDR_KEY =
AttributeKey.valueOf("RemoteAddr");
-
- private static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
- private static final AttributeKey<String> PROXY_PROTOCOL_PORT =
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
-
- public static final AttributeKey<String> CLIENT_ID_KEY =
AttributeKey.valueOf("ClientId");
-
- public static final AttributeKey<Integer> VERSION_KEY =
AttributeKey.valueOf("Version");
-
- public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY =
AttributeKey.valueOf("LanguageCode");
public static final Map<Integer, String> REQUEST_CODE_MAP = new
HashMap<Integer, String>() {
{
@@ -213,7 +202,7 @@ public class RemotingHelper {
if (StringUtils.isNotBlank(addr)) {
return addr;
}
- Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
+ Attribute<String> att = channel.attr(AttributeKeys.REMOTE_ADDR_KEY);
if (att == null) {
// mocked in unit test
return parseChannelRemoteAddr0(channel);
@@ -227,11 +216,11 @@ public class RemotingHelper {
}
private static String getProxyProtocolAddress(Channel channel) {
- if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
+ if (!channel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
return null;
}
- String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR,
channel);
- String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT,
channel);
+ String proxyProtocolAddr =
getAttributeValue(AttributeKeys.PROXY_PROTOCOL_ADDR, channel);
+ String proxyProtocolPort =
getAttributeValue(AttributeKeys.PROXY_PROTOCOL_PORT, channel);
if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort ==
null) {
return null;
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
index 4e69ab82d4..ebdde31f41 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
@@ -19,12 +19,21 @@ package org.apache.rocketmq.remoting.netty;
import io.netty.util.AttributeKey;
import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class AttributeKeys {
+ public static final AttributeKey<String> REMOTE_ADDR_KEY =
AttributeKey.valueOf("RemoteAddr");
+
+ public static final AttributeKey<String> CLIENT_ID_KEY =
AttributeKey.valueOf("ClientId");
+
+ public static final AttributeKey<Integer> VERSION_KEY =
AttributeKey.valueOf("Version");
+
+ public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY =
AttributeKey.valueOf("LanguageCode");
+
public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
@@ -40,6 +49,6 @@ public class AttributeKeys {
private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP =
new ConcurrentHashMap<>();
public static AttributeKey<String> valueOf(String name) {
- return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
+ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKey::valueOf);
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 445f06cc63..8ae87a6fa5 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -37,6 +37,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ProtocolDetectionResult;
import io.netty.handler.codec.ProtocolDetectionState;
import io.netty.handler.codec.haproxy.HAProxyMessage;
@@ -73,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -115,7 +117,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// sharable handlers
- private HandshakeHandler handshakeHandler;
+ private TlsModeHandler tlsModeHandler;
private NettyEncoder encoder;
private NettyConnectManageHandler connectionManageHandler;
private NettyServerHandler serverHandler;
@@ -265,7 +267,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
*/
protected ChannelPipeline configChannel(SocketChannel ch) {
return ch.pipeline()
- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
handshakeHandler)
+ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new
HandshakeHandler())
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
@@ -402,7 +404,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
private void prepareSharableHandlers() {
- handshakeHandler = new HandshakeHandler();
+ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
@@ -429,10 +431,6 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
return defaultEventExecutorGroup;
}
- public HandshakeHandler getHandshakeHandler() {
- return handshakeHandler;
- }
-
public NettyEncoder getEncoder() {
return encoder;
}
@@ -449,17 +447,13 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
return distributionHandler;
}
- @ChannelHandler.Sharable
- public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf>
{
-
- private final TlsModeHandler tlsModeHandler;
+ public class HandshakeHandler extends ByteToMessageDecoder {
public HandshakeHandler() {
- tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf
byteBuf) {
+ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf,
List<Object> out) throws Exception {
try {
ProtocolDetectionResult<HAProxyProtocolVersion>
detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf);
if (detectionResult.state() ==
ProtocolDetectionState.NEEDS_MORE_DATA) {
@@ -479,9 +473,6 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
-
- // Hand over this message to the next .
- ctx.fireChannelRead(byteBuf.retain());
} catch (Exception e) {
log.error("process proxy protocol negotiator failed.", e);
throw e;