This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 00fc42b8be [ISSUE #6957] Support Proxy Protocol for gRPC and Remoting
Server (#6958)
00fc42b8be is described below
commit 00fc42b8be848fc3f5c550cbab007b92f128dc38
Author: ShuangxiDing <[email protected]>
AuthorDate: Tue Jul 4 18:02:16 2023 +0800
[ISSUE #6957] Support Proxy Protocol for gRPC and Remoting Server (#6958)
---
WORKSPACE | 1 +
.../rocketmq/common/constant/HAProxyConstants.java | 28 +++++
pom.xml | 5 +
proxy/BUILD.bazel | 2 +
proxy/pom.xml | 4 +
.../rocketmq/proxy/grpc/GrpcServerBuilder.java | 2 +-
...tor.java => ProxyAndTlsProtocolNegotiator.java} | 139 ++++++++++++++++++---
.../proxy/grpc/constant/AttributeKeys.java | 44 +++++++
.../proxy/grpc/interceptor/HeaderInterceptor.java | 32 ++++-
.../remoting/MultiProtocolRemotingServer.java | 5 +-
.../rocketmq/remoting/common/RemotingHelper.java | 42 +++++--
.../rocketmq/remoting/netty/AttributeKeys.java | 45 +++++++
.../remoting/netty/NettyRemotingServer.java | 129 ++++++++++++++++---
.../rocketmq/remoting/ProxyProtocolTest.java | 116 +++++++++++++++++
.../java/org/apache/rocketmq/remoting/TlsTest.java | 28 +++--
15 files changed, 563 insertions(+), 59 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index fbb694efee..e3a8f37dc1 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -104,6 +104,7 @@ maven_install(
"software.amazon.awssdk:s3:2.20.29",
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
+ "io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
],
fetch_sources = True,
repositories = [
diff --git
a/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
new file mode 100644
index 0000000000..c1ae0cca18
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.constant;
+
+public class HAProxyConstants {
+
+ public static final String PROXY_PROTOCOL_PREFIX = "proxy_protocol_";
+ public static final String PROXY_PROTOCOL_ADDR = PROXY_PROTOCOL_PREFIX +
"addr";
+ public static final String PROXY_PROTOCOL_PORT = PROXY_PROTOCOL_PREFIX +
"port";
+ public static final String PROXY_PROTOCOL_SERVER_ADDR =
PROXY_PROTOCOL_PREFIX + "server_addr";
+ public static final String PROXY_PROTOCOL_SERVER_PORT =
PROXY_PROTOCOL_PREFIX + "server_port";
+ public static final String PROXY_PROTOCOL_TLV_PREFIX =
PROXY_PROTOCOL_PREFIX + "tlv_0x";
+}
diff --git a/pom.xml b/pom.xml
index a3b4746026..12bc2dbd5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -888,6 +888,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.github.aliyunmq</groupId>
+ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
+ <version>1.0.0</version>
+ </dependency>
<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel
index fcb85e46fb..b4f3c16e22 100644
--- a/proxy/BUILD.bazel
+++ b/proxy/BUILD.bazel
@@ -46,6 +46,7 @@ java_library(
"@maven//:io_grpc_grpc_services",
"@maven//:io_grpc_grpc_stub",
"@maven//:io_netty_netty_all",
+ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
"@maven//:io_openmessaging_storage_dledger",
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
@@ -94,6 +95,7 @@ java_library(
"@maven//:io_grpc_grpc_netty_shaded",
"@maven//:io_grpc_grpc_stub",
"@maven//:io_netty_netty_all",
+ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
diff --git a/proxy/pom.xml b/proxy/pom.xml
index f14155737b..3fbea107ab 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -75,6 +75,10 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.github.aliyunmq</groupId>
+ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 0ca6a1fcbd..437b9216b1 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -50,7 +50,7 @@ public class GrpcServerBuilder {
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
serverBuilder = NettyServerBuilder.forPort(port);
- serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());
+ serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
// build server
int bossLoopNum =
ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
similarity index 51%
rename from
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
rename to
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index 670e1c1a21..ceb9becc0c 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -16,36 +16,53 @@
*/
package org.apache.rocketmq.proxy.grpc;
+import io.grpc.Attributes;
import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
+import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent;
import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
+import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
+import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionResult;
+import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage;
+import
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
import
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import io.grpc.netty.shaded.io.netty.util.AsciiString;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
+import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
-public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator.ProtocolNegotiator {
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator.ProtocolNegotiator {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final String HA_PROXY_DECODER = "HAProxyDecoder";
+ private static final String HA_PROXY_HANDLER = "HAProxyHandler";
+ private static final String TLS_MODE_HANDLER = "TlsModeHandler";
/**
* the length of the ssl record header (in bytes)
*/
@@ -53,7 +70,7 @@ public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator
private static SslContext sslContext;
- public OptionalSSLProtocolNegotiator() {
+ public ProxyAndTlsProtocolNegotiator() {
sslContext = loadSslContext();
}
@@ -64,11 +81,12 @@ public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
- return new PortUnificationServerHandler(grpcHandler);
+ return new ProxyAndTlsProtocolHandler(grpcHandler);
}
@Override
- public void close() {}
+ public void close() {
+ }
private static SslContext loadSslContext() {
try {
@@ -85,8 +103,8 @@ public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator
String tlsCertPath =
ConfigurationManager.getProxyConfig().getTlsCertPath();
try (InputStream serverKeyInputStream = Files.newInputStream(
Paths.get(tlsKeyPath));
- InputStream serverCertificateStream =
Files.newInputStream(
- Paths.get(tlsCertPath))) {
+ InputStream serverCertificateStream =
Files.newInputStream(
+ Paths.get(tlsCertPath))) {
SslContext res =
GrpcSslContexts.forServer(serverCertificateStream,
serverKeyInputStream)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
@@ -102,12 +120,95 @@ public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator
}
}
- public static class PortUnificationServerHandler extends
ByteToMessageDecoder {
+ private static class ProxyAndTlsProtocolHandler extends
ByteToMessageDecoder {
+
+ private final GrpcHttp2ConnectionHandler grpcHandler;
+
+ public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler
grpcHandler) {
+ this.grpcHandler = grpcHandler;
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {
+ try {
+ ProtocolDetectionResult<HAProxyProtocolVersion> ha =
HAProxyMessageDecoder.detectProtocol(
+ in);
+ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+ return;
+ }
+ if (ha.state() == ProtocolDetectionState.DETECTED) {
+ ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new
HAProxyMessageDecoder())
+ .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new
HAProxyMessageHandler())
+ .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new
TlsModeHandler(grpcHandler));
+ } else {
+ ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new
TlsModeHandler(grpcHandler));
+ }
+
+
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
+ ctx.pipeline().remove(this);
+ } catch (Exception e) {
+ log.error("process proxy protocol negotiator failed.", e);
+ throw e;
+ }
+ }
+ }
+
+ private static class HAProxyMessageHandler extends
ChannelInboundHandlerAdapter {
+
+ private ProtocolNegotiationEvent pne =
InternalProtocolNegotiationEvent.getDefault();
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ if (msg instanceof HAProxyMessage) {
+ replaceEventWithMessage((HAProxyMessage) msg);
+ ctx.fireUserEventTriggered(pne);
+ } else {
+ super.channelRead(ctx, msg);
+ }
+ ctx.pipeline().remove(this);
+ }
+
+ /**
+ * The definition of key refers to the implementation of nginx
+ * <a
href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a>
+ *
+ * @param msg
+ */
+ private void replaceEventWithMessage(HAProxyMessage msg) {
+ Attributes.Builder builder =
InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR,
msg.sourceAddress());
+ }
+ if (msg.sourcePort() > 0) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT,
String.valueOf(msg.sourcePort()));
+ }
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR,
msg.destinationAddress());
+ }
+ if (msg.destinationPort() > 0) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT,
String.valueOf(msg.destinationPort()));
+ }
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+ msg.tlvs().forEach(tlv -> {
+ Attributes.Key<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX +
String.format("%02x", tlv.typeByteValue()));
+ String value =
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ builder.set(key, value);
+ });
+ }
+ pne = InternalProtocolNegotiationEvent
+
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
+ }
+ }
+
+ private static class TlsModeHandler extends ByteToMessageDecoder {
+
+ private ProtocolNegotiationEvent pne =
InternalProtocolNegotiationEvent.getDefault();
private final ChannelHandler ssl;
private final ChannelHandler plaintext;
- public PortUnificationServerHandler(GrpcHttp2ConnectionHandler
grpcHandler) {
+ public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) {
this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
.newHandler(grpcHandler);
this.plaintext = InternalProtocolNegotiators.serverPlaintext()
@@ -115,8 +216,7 @@ public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator
}
@Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out)
- throws Exception {
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {
try {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
if (TlsMode.ENFORCING.equals(tlsMode)) {
@@ -134,12 +234,21 @@ public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator
ctx.pipeline().addAfter(ctx.name(), null,
this.plaintext);
}
}
-
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
+ ctx.fireUserEventTriggered(pne);
ctx.pipeline().remove(this);
} catch (Exception e) {
log.error("process ssl protocol negotiator failed.", e);
throw e;
}
}
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
+ if (evt instanceof ProtocolNegotiationEvent) {
+ pne = (ProtocolNegotiationEvent) evt;
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
}
}
\ No newline at end of file
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
new file mode 100644
index 0000000000..096a5ba3d3
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.grpc.constant;
+
+import io.grpc.Attributes;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AttributeKeys {
+
+ public static final Attributes.Key<String> PROXY_PROTOCOL_ADDR =
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+
+ public static final Attributes.Key<String> PROXY_PROTOCOL_PORT =
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
+ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_ADDR =
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
+
+ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_PORT =
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
+
+ private static final Map<String, Attributes.Key<String>>
ATTRIBUTES_KEY_MAP = new ConcurrentHashMap<>();
+
+ public static Attributes.Key<String> valueOf(String name) {
+ return ATTRIBUTES_KEY_MAP.computeIfAbsent(name, key ->
Attributes.Key.create(name));
+ }
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
index 1cbb003610..13893e5eda 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
@@ -18,11 +18,16 @@
package org.apache.rocketmq.proxy.grpc.interceptor;
import com.google.common.net.HostAndPort;
+import io.grpc.Attributes;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
+
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -33,13 +38,27 @@ public class HeaderInterceptor implements ServerInterceptor
{
Metadata headers,
ServerCallHandler<R, W> next
) {
- SocketAddress remoteSocketAddress =
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
- String remoteAddress = parseSocketAddress(remoteSocketAddress);
+ String remoteAddress = getProxyProtocolAddress(call.getAttributes());
+ if (StringUtils.isBlank(remoteAddress)) {
+ SocketAddress remoteSocketAddress =
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+ remoteAddress = parseSocketAddress(remoteSocketAddress);
+ }
headers.put(InterceptorConstants.REMOTE_ADDRESS, remoteAddress);
SocketAddress localSocketAddress =
call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
String localAddress = parseSocketAddress(localSocketAddress);
headers.put(InterceptorConstants.LOCAL_ADDRESS, localAddress);
+
+ for (Attributes.Key<?> key : call.getAttributes().keys()) {
+ if (!StringUtils.startsWith(key.toString(),
HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
+ continue;
+ }
+ Metadata.Key<String> headerKey
+ = Metadata.Key.of(key.toString(),
Metadata.ASCII_STRING_MARSHALLER);
+ String headerValue = String.valueOf(call.getAttributes().get(key));
+ headers.put(headerKey, headerValue);
+ }
+
return next.startCall(call, headers);
}
@@ -55,4 +74,13 @@ public class HeaderInterceptor implements ServerInterceptor {
return "";
}
+
+ private String getProxyProtocolAddress(Attributes attributes) {
+ String proxyProtocolAddr =
attributes.get(AttributeKeys.PROXY_PROTOCOL_ADDR);
+ String proxyProtocolPort =
attributes.get(AttributeKeys.PROXY_PROTOCOL_PORT);
+ if (StringUtils.isBlank(proxyProtocolAddr) ||
StringUtils.isBlank(proxyProtocolPort)) {
+ return null;
+ }
+ return proxyProtocolAddr + ":" + proxyProtocolPort;
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 1142132b78..858b1f0227 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -20,8 +20,6 @@ package org.apache.rocketmq.proxy.remoting;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
-import java.io.IOException;
-import java.security.cert.CertificateException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,6 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import java.io.IOException;
+import java.security.cert.CertificateException;
+
/**
* support remoting and http2 protocol at one port
*/
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 75e25a83a1..d0750b678f 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -21,14 +21,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -43,6 +37,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
public class RemotingHelper {
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
@@ -50,6 +53,9 @@ public class RemotingHelper {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
private static final AttributeKey<String> REMOTE_ADDR_KEY =
AttributeKey.valueOf("RemoteAddr");
+ private static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+ private static final AttributeKey<String> PROXY_PROTOCOL_PORT =
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
public static final AttributeKey<String> CLIENT_ID_KEY =
AttributeKey.valueOf("ClientId");
public static final AttributeKey<Integer> VERSION_KEY =
AttributeKey.valueOf("Version");
@@ -203,12 +209,16 @@ public class RemotingHelper {
if (null == channel) {
return "";
}
+ String addr = getProxyProtocolAddress(channel);
+ if (StringUtils.isNotBlank(addr)) {
+ return addr;
+ }
Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
if (att == null) {
// mocked in unit test
return parseChannelRemoteAddr0(channel);
}
- String addr = att.get();
+ addr = att.get();
if (addr == null) {
addr = parseChannelRemoteAddr0(channel);
att.set(addr);
@@ -216,6 +226,18 @@ public class RemotingHelper {
return addr;
}
+ private static String getProxyProtocolAddress(Channel channel) {
+ if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
+ return null;
+ }
+ String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR,
channel);
+ String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT,
channel);
+ if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort ==
null) {
+ return null;
+ }
+ return proxyProtocolAddr + ":" + proxyProtocolPort;
+ }
+
private static String parseChannelRemoteAddr0(final Channel channel) {
SocketAddress remote = channel.remoteAddress();
final String addr = remote != null ? remote.toString() : "";
@@ -255,7 +277,7 @@ public class RemotingHelper {
return "";
}
- public static int parseSocketAddressPort(SocketAddress socketAddress) {
+ public static Integer parseSocketAddressPort(SocketAddress socketAddress) {
if (socketAddress instanceof InetSocketAddress) {
return ((InetSocketAddress) socketAddress).getPort();
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
new file mode 100644
index 0000000000..4e69ab82d4
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+
+import io.netty.util.AttributeKey;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AttributeKeys {
+
+ public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+
+ public static final AttributeKey<String> PROXY_PROTOCOL_PORT =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
+ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_ADDR =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
+
+ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_PORT =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
+
+ private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP =
new ConcurrentHashMap<>();
+
+ public static AttributeKey<String> valueOf(String name) {
+ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
+ }
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 9f39d672e7..94ffd8d07a 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -36,27 +37,25 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ProtocolDetectionResult;
+import io.netty.handler.codec.ProtocolDetectionState;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.CharsetUtil;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.cert.CertificateException;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -71,6 +70,19 @@ import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
@SuppressWarnings("NullableProblems")
public class NettyRemotingServer extends NettyRemotingAbstract implements
RemotingServer {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
@@ -96,6 +108,9 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract>
remotingServerTable = new ConcurrentHashMap<>();
public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+ public static final String HA_PROXY_DECODER = "HAProxyDecoder";
+ public static final String HA_PROXY_HANDLER = "HAProxyHandler";
+ public static final String TLS_MODE_HANDLER = "TlsModeHandler";
public static final String TLS_HANDLER_NAME = "sslHandler";
public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
@@ -387,7 +402,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
private void prepareSharableHandlers() {
- handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
+ handshakeHandler = new HandshakeHandler();
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
@@ -437,11 +452,51 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@ChannelHandler.Sharable
public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf>
{
+ private final TlsModeHandler tlsModeHandler;
+
+ public HandshakeHandler() {
+ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
+ try {
+ ProtocolDetectionResult<HAProxyProtocolVersion> ha =
HAProxyMessageDecoder.detectProtocol(in);
+ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+ return;
+ }
+ if (ha.state() == ProtocolDetectionState.DETECTED) {
+ ctx.pipeline().addAfter(defaultEventExecutorGroup,
ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
+ .addAfter(defaultEventExecutorGroup,
HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
+ .addAfter(defaultEventExecutorGroup,
HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
+ } else {
+ ctx.pipeline().addAfter(defaultEventExecutorGroup,
ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
+ }
+
+ try {
+ // Remove this handler
+ ctx.pipeline().remove(this);
+ } catch (NoSuchElementException e) {
+ log.error("Error while removing HandshakeHandler", e);
+ }
+
+ // Hand over this message to the next .
+ ctx.fireChannelRead(in.retain());
+ } catch (Exception e) {
+ log.error("process proxy protocol negotiator failed.", e);
+ throw e;
+ }
+ }
+ }
+
+ @ChannelHandler.Sharable
+ public class TlsModeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
private final TlsMode tlsMode;
private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
- HandshakeHandler(TlsMode tlsMode) {
+ TlsModeHandler(TlsMode tlsMode) {
this.tlsMode = tlsMode;
}
@@ -461,7 +516,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
- .addAfter(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
+ .addAfter(defaultEventExecutorGroup,
TLS_MODE_HANDLER, TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup,
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline
to establish SSL connection");
} else {
@@ -483,7 +538,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
// Remove this handler
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
- log.error("Error while removing HandshakeHandler", e);
+ log.error("Error while removing TlsModeHandler", e);
}
// Hand over this message to the next .
@@ -706,4 +761,46 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
return NettyRemotingServer.this.getCallbackExecutor();
}
}
+
+ public static class HAProxyMessageHandler extends
ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ if (msg instanceof HAProxyMessage) {
+ fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
+ } else {
+ super.channelRead(ctx, msg);
+ }
+ ctx.pipeline().remove(this);
+ }
+
+ /**
+ * The definition of key refers to the implementation of nginx
+ * <a
href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a>
+ * @param msg
+ * @param channel
+ */
+ private void fillChannelWithMessage(HAProxyMessage msg, Channel
channel) {
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
+
channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
+ }
+ if (msg.sourcePort() > 0) {
+
channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
+ }
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
+
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
+ }
+ if (msg.destinationPort() > 0) {
+
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
+ }
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+ msg.tlvs().forEach(tlv -> {
+ AttributeKey<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX +
String.format("%02x", tlv.typeByteValue()));
+ String value =
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ channel.attr(key).set(value);
+ });
+ }
+ }
+ }
}
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
new file mode 100644
index 0000000000..c39fd2132b
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProxyProtocolTest {
+
+ private RemotingServer remotingServer;
+ private RemotingClient remotingClient;
+
+ @Before
+ public void setUp() throws Exception {
+ NettyClientConfig clientConfig = new NettyClientConfig();
+ clientConfig.setUseTLS(false);
+
+ remotingServer = RemotingServerTest.createRemotingServer();
+ remotingClient = RemotingServerTest.createRemotingClient(clientConfig);
+
+ await().pollDelay(Duration.ofMillis(10))
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(20, TimeUnit.SECONDS).until(() ->
isHostConnectable(getServerAddress()));
+ }
+
+ @Test
+ public void testProxyProtocol() throws Exception {
+ sendHAProxyMessage(remotingClient);
+ requestThenAssertResponse(remotingClient);
+ }
+
+ private void requestThenAssertResponse(RemotingClient remotingClient)
throws Exception {
+ RemotingCommand response =
remotingClient.invokeSync(getServerAddress(), createRequest(), 10000 * 3);
+ assertNotNull(response);
+ assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
+ assertThat(response.getExtFields()).hasSize(2);
+
assertThat(response.getExtFields().get("messageTitle")).isEqualTo("Welcome");
+ }
+
+ private void sendHAProxyMessage(RemotingClient remotingClient) throws
Exception {
+ Method getAndCreateChannel =
NettyRemotingClient.class.getDeclaredMethod("getAndCreateChannel",
String.class);
+ getAndCreateChannel.setAccessible(true);
+ NettyRemotingClient nettyRemotingClient = (NettyRemotingClient)
remotingClient;
+ Channel channel = (Channel)
getAndCreateChannel.invoke(nettyRemotingClient, getServerAddress());
+ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2,
HAProxyCommand.PROXY,
+ HAProxyProxiedProtocol.TCP4, "127.0.0.1", "127.0.0.2", 8000,
9000);
+
+ ByteBuf byteBuf = Unpooled.directBuffer();
+ Method encode =
HAProxyMessageEncoder.class.getDeclaredMethod("encodeV2", HAProxyMessage.class,
ByteBuf.class);
+ encode.setAccessible(true);
+ encode.invoke(HAProxyMessageEncoder.INSTANCE, message, byteBuf);
+ channel.writeAndFlush(byteBuf).sync();
+ }
+
+ private static RemotingCommand createRequest() {
+ RequestHeader requestHeader = new RequestHeader();
+ requestHeader.setCount(1);
+ requestHeader.setMessageTitle("Welcome");
+ return RemotingCommand.createRequestCommand(0, requestHeader);
+ }
+
+
+ private String getServerAddress() {
+ return "localhost:" + remotingServer.localListenPort();
+ }
+
+ private boolean isHostConnectable(String addr) {
+ try (Socket socket = new Socket()) {
+ socket.connect(NetworkUtil.string2SocketAddress(addr));
+ return true;
+ } catch (IOException ignored) {
+ }
+ return false;
+ }
+}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
index 3da7abf573..de7edbbfba 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
@@ -17,19 +17,6 @@
package org.apache.rocketmq.remoting;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.net.Socket;
-import java.time.Duration;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -47,6 +34,20 @@ import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import static
org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER;
import static
org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH;
import static
org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPASSWORD;
@@ -234,6 +235,7 @@ public class TlsTest {
@Test
public void serverAcceptsUntrustedClientCert() throws Exception {
requestThenAssertResponse();
+// Thread.sleep(1000000L);
}
/**