This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 00fc42b8be [ISSUE #6957] Support Proxy Protocol for gRPC and Remoting 
Server (#6958)
00fc42b8be is described below

commit 00fc42b8be848fc3f5c550cbab007b92f128dc38
Author: ShuangxiDing <[email protected]>
AuthorDate: Tue Jul 4 18:02:16 2023 +0800

    [ISSUE #6957] Support Proxy Protocol for gRPC and Remoting Server (#6958)
---
 WORKSPACE                                          |   1 +
 .../rocketmq/common/constant/HAProxyConstants.java |  28 +++++
 pom.xml                                            |   5 +
 proxy/BUILD.bazel                                  |   2 +
 proxy/pom.xml                                      |   4 +
 .../rocketmq/proxy/grpc/GrpcServerBuilder.java     |   2 +-
 ...tor.java => ProxyAndTlsProtocolNegotiator.java} | 139 ++++++++++++++++++---
 .../proxy/grpc/constant/AttributeKeys.java         |  44 +++++++
 .../proxy/grpc/interceptor/HeaderInterceptor.java  |  32 ++++-
 .../remoting/MultiProtocolRemotingServer.java      |   5 +-
 .../rocketmq/remoting/common/RemotingHelper.java   |  42 +++++--
 .../rocketmq/remoting/netty/AttributeKeys.java     |  45 +++++++
 .../remoting/netty/NettyRemotingServer.java        | 129 ++++++++++++++++---
 .../rocketmq/remoting/ProxyProtocolTest.java       | 116 +++++++++++++++++
 .../java/org/apache/rocketmq/remoting/TlsTest.java |  28 +++--
 15 files changed, 563 insertions(+), 59 deletions(-)

diff --git a/WORKSPACE b/WORKSPACE
index fbb694efee..e3a8f37dc1 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -104,6 +104,7 @@ maven_install(
         "software.amazon.awssdk:s3:2.20.29",
         "com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
         "com.adobe.testing:s3mock-junit4:2.11.0",
+        "io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
     ],
     fetch_sources = True,
     repositories = [
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
 
b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
new file mode 100644
index 0000000000..c1ae0cca18
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.constant;
+
+public class HAProxyConstants {
+
+    public static final String PROXY_PROTOCOL_PREFIX = "proxy_protocol_";
+    public static final String PROXY_PROTOCOL_ADDR = PROXY_PROTOCOL_PREFIX + 
"addr";
+    public static final String PROXY_PROTOCOL_PORT = PROXY_PROTOCOL_PREFIX + 
"port";
+    public static final String PROXY_PROTOCOL_SERVER_ADDR = 
PROXY_PROTOCOL_PREFIX + "server_addr";
+    public static final String PROXY_PROTOCOL_SERVER_PORT = 
PROXY_PROTOCOL_PREFIX + "server_port";
+    public static final String PROXY_PROTOCOL_TLV_PREFIX = 
PROXY_PROTOCOL_PREFIX + "tlv_0x";
+}
diff --git a/pom.xml b/pom.xml
index a3b4746026..12bc2dbd5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -888,6 +888,11 @@
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>io.github.aliyunmq</groupId>
+                <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
+                <version>1.0.0</version>
+            </dependency>
             <dependency>
                 <groupId>com.conversantmedia</groupId>
                 <artifactId>disruptor</artifactId>
diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel
index fcb85e46fb..b4f3c16e22 100644
--- a/proxy/BUILD.bazel
+++ b/proxy/BUILD.bazel
@@ -46,6 +46,7 @@ java_library(
         "@maven//:io_grpc_grpc_services",
         "@maven//:io_grpc_grpc_stub",
         "@maven//:io_netty_netty_all",
+        "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
         "@maven//:io_openmessaging_storage_dledger",
         "@maven//:io_opentelemetry_opentelemetry_api",
         "@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
@@ -94,6 +95,7 @@ java_library(
         "@maven//:io_grpc_grpc_netty_shaded",
         "@maven//:io_grpc_grpc_stub",
         "@maven//:io_netty_netty_all",
+        "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
         "@maven//:org_apache_commons_commons_lang3",
         "@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
         "@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
diff --git a/proxy/pom.xml b/proxy/pom.xml
index f14155737b..3fbea107ab 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -75,6 +75,10 @@
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java-util</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.github.aliyunmq</groupId>
+            <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 0ca6a1fcbd..437b9216b1 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -50,7 +50,7 @@ public class GrpcServerBuilder {
     protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
         serverBuilder = NettyServerBuilder.forPort(port);
 
-        serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());
+        serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
 
         // build server
         int bossLoopNum = 
ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
similarity index 51%
rename from 
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
rename to 
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index 670e1c1a21..ceb9becc0c 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -16,36 +16,53 @@
  */
 package org.apache.rocketmq.proxy.grpc;
 
+import io.grpc.Attributes;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
 import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
 import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
+import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent;
 import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
 import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
 import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
+import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
 import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
+import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionResult;
+import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage;
+import 
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import 
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
 import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
 import 
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.grpc.netty.shaded.io.netty.util.AsciiString;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
+import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
 import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
-public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator.ProtocolNegotiator {
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class ProxyAndTlsProtocolNegotiator implements 
InternalProtocolNegotiator.ProtocolNegotiator {
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
 
+    private static final String HA_PROXY_DECODER = "HAProxyDecoder";
+    private static final String HA_PROXY_HANDLER = "HAProxyHandler";
+    private static final String TLS_MODE_HANDLER = "TlsModeHandler";
     /**
      * the length of the ssl record header (in bytes)
      */
@@ -53,7 +70,7 @@ public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator
 
     private static SslContext sslContext;
 
-    public OptionalSSLProtocolNegotiator() {
+    public ProxyAndTlsProtocolNegotiator() {
         sslContext = loadSslContext();
     }
 
@@ -64,11 +81,12 @@ public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator
 
     @Override
     public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
-        return new PortUnificationServerHandler(grpcHandler);
+        return new ProxyAndTlsProtocolHandler(grpcHandler);
     }
 
     @Override
-    public void close() {}
+    public void close() {
+    }
 
     private static SslContext loadSslContext() {
         try {
@@ -85,8 +103,8 @@ public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator
                 String tlsCertPath = 
ConfigurationManager.getProxyConfig().getTlsCertPath();
                 try (InputStream serverKeyInputStream = Files.newInputStream(
                         Paths.get(tlsKeyPath));
-                        InputStream serverCertificateStream = 
Files.newInputStream(
-                                Paths.get(tlsCertPath))) {
+                     InputStream serverCertificateStream = 
Files.newInputStream(
+                             Paths.get(tlsCertPath))) {
                     SslContext res = 
GrpcSslContexts.forServer(serverCertificateStream,
                                     serverKeyInputStream)
                             .trustManager(InsecureTrustManagerFactory.INSTANCE)
@@ -102,12 +120,95 @@ public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator
         }
     }
 
-    public static class PortUnificationServerHandler extends 
ByteToMessageDecoder {
+    private static class ProxyAndTlsProtocolHandler extends 
ByteToMessageDecoder {
+
+        private final GrpcHttp2ConnectionHandler grpcHandler;
+
+        public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler 
grpcHandler) {
+            this.grpcHandler = grpcHandler;
+        }
+
+        @Override
+        protected void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) {
+            try {
+                ProtocolDetectionResult<HAProxyProtocolVersion> ha = 
HAProxyMessageDecoder.detectProtocol(
+                        in);
+                if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+                    return;
+                }
+                if (ha.state() == ProtocolDetectionState.DETECTED) {
+                    ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new 
HAProxyMessageDecoder())
+                            .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new 
HAProxyMessageHandler())
+                            .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new 
TlsModeHandler(grpcHandler));
+                } else {
+                    ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new 
TlsModeHandler(grpcHandler));
+                }
+
+                
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
+                ctx.pipeline().remove(this);
+            } catch (Exception e) {
+                log.error("process proxy protocol negotiator failed.", e);
+                throw e;
+            }
+        }
+    }
+
+    private static class HAProxyMessageHandler extends 
ChannelInboundHandlerAdapter {
+
+        private ProtocolNegotiationEvent pne = 
InternalProtocolNegotiationEvent.getDefault();
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+            if (msg instanceof HAProxyMessage) {
+                replaceEventWithMessage((HAProxyMessage) msg);
+                ctx.fireUserEventTriggered(pne);
+            } else {
+                super.channelRead(ctx, msg);
+            }
+            ctx.pipeline().remove(this);
+        }
+
+        /**
+         * The definition of key refers to the implementation of nginx
+         * <a 
href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr";>ngx_http_core_module</a>
+         *
+         * @param msg
+         */
+        private void replaceEventWithMessage(HAProxyMessage msg) {
+            Attributes.Builder builder = 
InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
+            if (StringUtils.isNotBlank(msg.sourceAddress())) {
+                builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, 
msg.sourceAddress());
+            }
+            if (msg.sourcePort() > 0) {
+                builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, 
String.valueOf(msg.sourcePort()));
+            }
+            if (StringUtils.isNotBlank(msg.destinationAddress())) {
+                builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, 
msg.destinationAddress());
+            }
+            if (msg.destinationPort() > 0) {
+                builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, 
String.valueOf(msg.destinationPort()));
+            }
+            if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+                msg.tlvs().forEach(tlv -> {
+                    Attributes.Key<String> key = AttributeKeys.valueOf(
+                            HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + 
String.format("%02x", tlv.typeByteValue()));
+                    String value = 
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+                    builder.set(key, value);
+                });
+            }
+            pne = InternalProtocolNegotiationEvent
+                    
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
+        }
+    }
+
+    private static class TlsModeHandler extends ByteToMessageDecoder {
+
+        private ProtocolNegotiationEvent pne = 
InternalProtocolNegotiationEvent.getDefault();
 
         private final ChannelHandler ssl;
         private final ChannelHandler plaintext;
 
-        public PortUnificationServerHandler(GrpcHttp2ConnectionHandler 
grpcHandler) {
+        public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) {
             this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
                     .newHandler(grpcHandler);
             this.plaintext = InternalProtocolNegotiators.serverPlaintext()
@@ -115,8 +216,7 @@ public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator
         }
 
         @Override
-        protected void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out)
-                throws Exception {
+        protected void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) {
             try {
                 TlsMode tlsMode = TlsSystemConfig.tlsMode;
                 if (TlsMode.ENFORCING.equals(tlsMode)) {
@@ -134,12 +234,21 @@ public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator
                         ctx.pipeline().addAfter(ctx.name(), null, 
this.plaintext);
                     }
                 }
-                
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
+                ctx.fireUserEventTriggered(pne);
                 ctx.pipeline().remove(this);
             } catch (Exception e) {
                 log.error("process ssl protocol negotiator failed.", e);
                 throw e;
             }
         }
+
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
+            if (evt instanceof ProtocolNegotiationEvent) {
+                pne = (ProtocolNegotiationEvent) evt;
+            } else {
+                super.userEventTriggered(ctx, evt);
+            }
+        }
     }
 }
\ No newline at end of file
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
new file mode 100644
index 0000000000..096a5ba3d3
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.grpc.constant;
+
+import io.grpc.Attributes;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AttributeKeys {
+
+    public static final Attributes.Key<String> PROXY_PROTOCOL_ADDR =
+            Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+
+    public static final Attributes.Key<String> PROXY_PROTOCOL_PORT =
+            Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
+    public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_ADDR =
+            Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
+
+    public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_PORT =
+            Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
+
+    private static final Map<String, Attributes.Key<String>> 
ATTRIBUTES_KEY_MAP = new ConcurrentHashMap<>();
+
+    public static Attributes.Key<String> valueOf(String name) {
+        return ATTRIBUTES_KEY_MAP.computeIfAbsent(name, key -> 
Attributes.Key.create(name));
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
index 1cbb003610..13893e5eda 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
@@ -18,11 +18,16 @@
 package org.apache.rocketmq.proxy.grpc.interceptor;
 
 import com.google.common.net.HostAndPort;
+import io.grpc.Attributes;
 import io.grpc.Grpc;
 import io.grpc.Metadata;
 import io.grpc.ServerCall;
 import io.grpc.ServerCallHandler;
 import io.grpc.ServerInterceptor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
@@ -33,13 +38,27 @@ public class HeaderInterceptor implements ServerInterceptor 
{
         Metadata headers,
         ServerCallHandler<R, W> next
     ) {
-        SocketAddress remoteSocketAddress = 
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
-        String remoteAddress = parseSocketAddress(remoteSocketAddress);
+        String remoteAddress = getProxyProtocolAddress(call.getAttributes());
+        if (StringUtils.isBlank(remoteAddress)) {
+            SocketAddress remoteSocketAddress = 
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+            remoteAddress = parseSocketAddress(remoteSocketAddress);
+        }
         headers.put(InterceptorConstants.REMOTE_ADDRESS, remoteAddress);
 
         SocketAddress localSocketAddress = 
call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
         String localAddress = parseSocketAddress(localSocketAddress);
         headers.put(InterceptorConstants.LOCAL_ADDRESS, localAddress);
+
+        for (Attributes.Key<?> key : call.getAttributes().keys()) {
+            if (!StringUtils.startsWith(key.toString(), 
HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
+                continue;
+            }
+            Metadata.Key<String> headerKey
+                    = Metadata.Key.of(key.toString(), 
Metadata.ASCII_STRING_MARSHALLER);
+            String headerValue = String.valueOf(call.getAttributes().get(key));
+            headers.put(headerKey, headerValue);
+        }
+
         return next.startCall(call, headers);
     }
 
@@ -55,4 +74,13 @@ public class HeaderInterceptor implements ServerInterceptor {
 
         return "";
     }
+
+    private String getProxyProtocolAddress(Attributes attributes) {
+        String proxyProtocolAddr = 
attributes.get(AttributeKeys.PROXY_PROTOCOL_ADDR);
+        String proxyProtocolPort = 
attributes.get(AttributeKeys.PROXY_PROTOCOL_PORT);
+        if (StringUtils.isBlank(proxyProtocolAddr) || 
StringUtils.isBlank(proxyProtocolPort)) {
+            return null;
+        }
+        return proxyProtocolAddr + ":" + proxyProtocolPort;
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 1142132b78..858b1f0227 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -20,8 +20,6 @@ package org.apache.rocketmq.proxy.remoting;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
-import java.io.IOException;
-import java.security.cert.CertificateException;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,6 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
+import java.io.IOException;
+import java.security.cert.CertificateException;
+
 /**
  * support remoting and http2 protocol at one port
  */
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 75e25a83a1..d0750b678f 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -21,14 +21,8 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -43,6 +37,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
 public class RemotingHelper {
     public static final String DEFAULT_CHARSET = "UTF-8";
     public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
@@ -50,6 +53,9 @@ public class RemotingHelper {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
     private static final AttributeKey<String> REMOTE_ADDR_KEY = 
AttributeKey.valueOf("RemoteAddr");
 
+    private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = 
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+    private static final AttributeKey<String> PROXY_PROTOCOL_PORT = 
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
     public static final AttributeKey<String> CLIENT_ID_KEY = 
AttributeKey.valueOf("ClientId");
 
     public static final AttributeKey<Integer> VERSION_KEY = 
AttributeKey.valueOf("Version");
@@ -203,12 +209,16 @@ public class RemotingHelper {
         if (null == channel) {
             return "";
         }
+        String addr = getProxyProtocolAddress(channel);
+        if (StringUtils.isNotBlank(addr)) {
+            return addr;
+        }
         Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
         if (att == null) {
             // mocked in unit test
             return parseChannelRemoteAddr0(channel);
         }
-        String addr = att.get();
+        addr = att.get();
         if (addr == null) {
             addr = parseChannelRemoteAddr0(channel);
             att.set(addr);
@@ -216,6 +226,18 @@ public class RemotingHelper {
         return addr;
     }
 
+    private static String getProxyProtocolAddress(Channel channel) {
+        if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
+            return null;
+        }
+        String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, 
channel);
+        String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, 
channel);
+        if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == 
null) {
+            return null;
+        }
+        return proxyProtocolAddr + ":" + proxyProtocolPort;
+    }
+
     private static String parseChannelRemoteAddr0(final Channel channel) {
         SocketAddress remote = channel.remoteAddress();
         final String addr = remote != null ? remote.toString() : "";
@@ -255,7 +277,7 @@ public class RemotingHelper {
         return "";
     }
 
-    public static int parseSocketAddressPort(SocketAddress socketAddress) {
+    public static Integer parseSocketAddressPort(SocketAddress socketAddress) {
         if (socketAddress instanceof InetSocketAddress) {
             return ((InetSocketAddress) socketAddress).getPort();
         }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
new file mode 100644
index 0000000000..4e69ab82d4
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+
+import io.netty.util.AttributeKey;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AttributeKeys {
+
+    public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
+            AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+
+    public static final AttributeKey<String> PROXY_PROTOCOL_PORT =
+            AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
+    public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_ADDR =
+            AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
+
+    public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_PORT =
+            AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
+
+    private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = 
new ConcurrentHashMap<>();
+
+    public static AttributeKey<String> valueOf(String name) {
+        return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 9f39d672e7..94ffd8d07a 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -36,27 +37,25 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ProtocolDetectionResult;
+import io.netty.handler.codec.ProtocolDetectionState;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.CharsetUtil;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.cert.CertificateException;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -71,6 +70,19 @@ import 
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 @SuppressWarnings("NullableProblems")
 public class NettyRemotingServer extends NettyRemotingAbstract implements 
RemotingServer {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
@@ -96,6 +108,9 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> 
remotingServerTable = new ConcurrentHashMap<>();
 
     public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+    public static final String HA_PROXY_DECODER = "HAProxyDecoder";
+    public static final String HA_PROXY_HANDLER = "HAProxyHandler";
+    public static final String TLS_MODE_HANDLER = "TlsModeHandler";
     public static final String TLS_HANDLER_NAME = "sslHandler";
     public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
 
@@ -387,7 +402,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     }
 
     private void prepareSharableHandlers() {
-        handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
+        handshakeHandler = new HandshakeHandler();
         encoder = new NettyEncoder();
         connectionManageHandler = new NettyConnectManageHandler();
         serverHandler = new NettyServerHandler();
@@ -437,11 +452,51 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     @ChannelHandler.Sharable
     public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> 
{
 
+        private final TlsModeHandler tlsModeHandler;
+
+        public HandshakeHandler() {
+            tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
+        }
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
+            try {
+                ProtocolDetectionResult<HAProxyProtocolVersion> ha = 
HAProxyMessageDecoder.detectProtocol(in);
+                if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+                    return;
+                }
+                if (ha.state() == ProtocolDetectionState.DETECTED) {
+                    ctx.pipeline().addAfter(defaultEventExecutorGroup, 
ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
+                            .addAfter(defaultEventExecutorGroup, 
HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
+                            .addAfter(defaultEventExecutorGroup, 
HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
+                } else {
+                    ctx.pipeline().addAfter(defaultEventExecutorGroup, 
ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
+                }
+
+                try {
+                    // Remove this handler
+                    ctx.pipeline().remove(this);
+                } catch (NoSuchElementException e) {
+                    log.error("Error while removing HandshakeHandler", e);
+                }
+
+                // Hand over this message to the next .
+                ctx.fireChannelRead(in.retain());
+            } catch (Exception e) {
+                log.error("process proxy protocol negotiator failed.", e);
+                throw e;
+            }
+        }
+    }
+
+    @ChannelHandler.Sharable
+    public class TlsModeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
         private final TlsMode tlsMode;
 
         private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
 
-        HandshakeHandler(TlsMode tlsMode) {
+        TlsModeHandler(TlsMode tlsMode) {
             this.tlsMode = tlsMode;
         }
 
@@ -461,7 +516,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                     case ENFORCING:
                         if (null != sslContext) {
                             ctx.pipeline()
-                                .addAfter(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, 
sslContext.newHandler(ctx.channel().alloc()))
+                                .addAfter(defaultEventExecutorGroup, 
TLS_MODE_HANDLER, TLS_HANDLER_NAME, 
sslContext.newHandler(ctx.channel().alloc()))
                                 .addAfter(defaultEventExecutorGroup, 
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
                             log.info("Handlers prepended to channel pipeline 
to establish SSL connection");
                         } else {
@@ -483,7 +538,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 // Remove this handler
                 ctx.pipeline().remove(this);
             } catch (NoSuchElementException e) {
-                log.error("Error while removing HandshakeHandler", e);
+                log.error("Error while removing TlsModeHandler", e);
             }
 
             // Hand over this message to the next .
@@ -706,4 +761,46 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
             return NettyRemotingServer.this.getCallbackExecutor();
         }
     }
+
+    public static class HAProxyMessageHandler extends 
ChannelInboundHandlerAdapter {
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+            if (msg instanceof HAProxyMessage) {
+                fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
+            } else {
+                super.channelRead(ctx, msg);
+            }
+            ctx.pipeline().remove(this);
+        }
+
+        /**
+         * The definition of key refers to the implementation of nginx
+         * <a 
href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr";>ngx_http_core_module</a>
+         * @param msg
+         * @param channel
+         */
+        private void fillChannelWithMessage(HAProxyMessage msg, Channel 
channel) {
+            if (StringUtils.isNotBlank(msg.sourceAddress())) {
+                
channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
+            }
+            if (msg.sourcePort() > 0) {
+                
channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
+            }
+            if (StringUtils.isNotBlank(msg.destinationAddress())) {
+                
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
+            }
+            if (msg.destinationPort() > 0) {
+                
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
+            }
+            if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+                msg.tlvs().forEach(tlv -> {
+                    AttributeKey<String> key = AttributeKeys.valueOf(
+                            HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + 
String.format("%02x", tlv.typeByteValue()));
+                    String value = 
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+                    channel.attr(key).set(value);
+                });
+            }
+        }
+    }
 }
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java 
b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
new file mode 100644
index 0000000000..c39fd2132b
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProxyProtocolTest {
+
+    private RemotingServer remotingServer;
+    private RemotingClient remotingClient;
+
+    @Before
+    public void setUp() throws Exception {
+        NettyClientConfig clientConfig = new NettyClientConfig();
+        clientConfig.setUseTLS(false);
+
+        remotingServer = RemotingServerTest.createRemotingServer();
+        remotingClient = RemotingServerTest.createRemotingClient(clientConfig);
+
+        await().pollDelay(Duration.ofMillis(10))
+                .pollInterval(Duration.ofMillis(10))
+                .atMost(20, TimeUnit.SECONDS).until(() -> 
isHostConnectable(getServerAddress()));
+    }
+
+    @Test
+    public void testProxyProtocol() throws Exception {
+        sendHAProxyMessage(remotingClient);
+        requestThenAssertResponse(remotingClient);
+    }
+
+    private void requestThenAssertResponse(RemotingClient remotingClient) 
throws Exception {
+        RemotingCommand response = 
remotingClient.invokeSync(getServerAddress(), createRequest(), 10000 * 3);
+        assertNotNull(response);
+        assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
+        assertThat(response.getExtFields()).hasSize(2);
+        
assertThat(response.getExtFields().get("messageTitle")).isEqualTo("Welcome");
+    }
+
+    private void sendHAProxyMessage(RemotingClient remotingClient) throws 
Exception {
+        Method getAndCreateChannel = 
NettyRemotingClient.class.getDeclaredMethod("getAndCreateChannel", 
String.class);
+        getAndCreateChannel.setAccessible(true);
+        NettyRemotingClient nettyRemotingClient = (NettyRemotingClient) 
remotingClient;
+        Channel channel = (Channel) 
getAndCreateChannel.invoke(nettyRemotingClient, getServerAddress());
+        HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, 
HAProxyCommand.PROXY,
+                HAProxyProxiedProtocol.TCP4, "127.0.0.1", "127.0.0.2", 8000, 
9000);
+
+        ByteBuf byteBuf = Unpooled.directBuffer();
+        Method encode = 
HAProxyMessageEncoder.class.getDeclaredMethod("encodeV2", HAProxyMessage.class, 
ByteBuf.class);
+        encode.setAccessible(true);
+        encode.invoke(HAProxyMessageEncoder.INSTANCE, message, byteBuf);
+        channel.writeAndFlush(byteBuf).sync();
+    }
+
+    private static RemotingCommand createRequest() {
+        RequestHeader requestHeader = new RequestHeader();
+        requestHeader.setCount(1);
+        requestHeader.setMessageTitle("Welcome");
+        return RemotingCommand.createRequestCommand(0, requestHeader);
+    }
+
+
+    private String getServerAddress() {
+        return "localhost:" + remotingServer.localListenPort();
+    }
+
+    private boolean isHostConnectable(String addr) {
+        try (Socket socket = new Socket()) {
+            socket.connect(NetworkUtil.string2SocketAddress(addr));
+            return true;
+        } catch (IOException ignored) {
+        }
+        return false;
+    }
+}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java 
b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
index 3da7abf573..de7edbbfba 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
@@ -17,19 +17,6 @@
 
 package org.apache.rocketmq.remoting;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.net.Socket;
-import java.time.Duration;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -47,6 +34,20 @@ import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
 import static 
org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER;
 import static 
org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH;
 import static 
org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPASSWORD;
@@ -234,6 +235,7 @@ public class TlsTest {
     @Test
     public void serverAcceptsUntrustedClientCert() throws Exception {
         requestThenAssertResponse();
+//        Thread.sleep(1000000L);
     }
 
     /**


Reply via email to