This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 90c5382aee [ISSUE #7061] Support forward HAProxyMessage for Multi
Protocol server. (#7062)
90c5382aee is described below
commit 90c5382aee07879a80309f257f04114201ccaac6
Author: ShuangxiDing <[email protected]>
AuthorDate: Fri Jul 21 20:28:58 2023 +0800
[ISSUE #7061] Support forward HAProxyMessage for Multi Protocol server.
(#7062)
* Support dynamic modification of grpc tls mode to improve the scalability
of ProtocolNegotiator
* Support dynamic modification of grpc tls mode to improve the scalability
of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC and Remoting server.
* 回滚netty的升级
* Support proxy protocol for gRPC and Remoting server.
* Support proxy protocol for gRPC and Remoting server.
* Support proxy protocol for gRPC and Remoting server.
* add grpc-netty-codec-haproxy in bazel
* add grpc-netty-codec-haproxy in bazel
* Support proxy protocol for gRPC and Remoting server.
* Fix Test
* add grpc-netty-codec-haproxy in bazel
* add ProxyProtocolTest for Remoting
* Support HAProxyMessage forward for multi protocol server.
---------
Co-authored-by: 徒钟 <[email protected]>
---
.../http2proxy/HAProxyMessageForwarder.java | 129 +++++++++++++++++++++
.../http2proxy/Http2ProtocolProxyHandler.java | 23 ++--
.../http2proxy/Http2ProxyBackendHandler.java | 2 +
.../http2proxy/Http2ProxyFrontendHandler.java | 28 +++--
4 files changed, 164 insertions(+), 18 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
new file mode 100644
index 0000000000..8f139d3d9a
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
+import io.netty.util.Attribute;
+import io.netty.util.DefaultAttributeMap;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
+
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+
+ private static final Field FIELD_ATTRIBUTE =
+ FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
+
+ private final Channel outboundChannel;
+
+ public HAProxyMessageForwarder(final Channel outboundChannel) {
+ this.outboundChannel = outboundChannel;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ try {
+ forwardHAProxyMessage(ctx.channel(), outboundChannel);
+ ctx.fireChannelRead(msg);
+ } catch (Exception e) {
+ log.error("Forward HAProxyMessage from Remoting to gRPC server
error.", e);
+ throw e;
+ } finally {
+ ctx.pipeline().remove(this);
+ }
+ }
+
+ private void forwardHAProxyMessage(Channel inboundChannel, Channel
outboundChannel) throws Exception {
+ if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+ return;
+ }
+
+ if (!(inboundChannel instanceof DefaultAttributeMap)) {
+ return;
+ }
+
+ Attribute<?>[] attributes = (Attribute<?>[])
FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
+ if (ArrayUtils.isEmpty(attributes)) {
+ return;
+ }
+
+ String sourceAddress = null, destinationAddress = null;
+ int sourcePort = 0, destinationPort = 0;
+ List<HAProxyTLV> haProxyTLVs = new ArrayList<>();
+
+ for (Attribute<?> attribute : attributes) {
+ String attributeKey = attribute.key().name();
+ if (!StringUtils.startsWith(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
+ continue;
+ }
+ String attributeValue = (String) attribute.get();
+ if (StringUtils.isEmpty(attributeValue)) {
+ continue;
+ }
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_ADDR) {
+ sourceAddress = attributeValue;
+ }
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_PORT) {
+ sourcePort = Integer.parseInt(attributeValue);
+ }
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR) {
+ destinationAddress = attributeValue;
+ }
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) {
+ destinationPort = Integer.parseInt(attributeValue);
+ }
+ if (StringUtils.startsWith(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
+ String typeString = StringUtils.substringAfter(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
+ ByteBuf byteBuf = Unpooled.buffer();
+
byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
+ HAProxyTLV haProxyTLV = new
HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
+ haProxyTLVs.add(haProxyTLV);
+ }
+ }
+
+ HAProxyProxiedProtocol proxiedProtocol =
AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
+ HAProxyProxiedProtocol.TCP4;
+
+ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2,
HAProxyCommand.PROXY,
+ proxiedProtocol, sourceAddress, destinationAddress,
sourcePort, destinationPort, haProxyTLVs);
+ outboundChannel.writeAndFlush(message).sync();
+ }
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index 913f35c93d..c37db92af4 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -24,13 +24,14 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-import javax.net.ssl.SSLException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -38,8 +39,11 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import javax.net.ssl.SSLException;
+
public class Http2ProtocolProxyHandler implements ProtocolHandler {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
private static final String LOCAL_HOST = "127.0.0.1";
@@ -101,11 +105,8 @@ public class Http2ProtocolProxyHandler implements
ProtocolHandler {
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
- if (sslContext != null) {
- ch.pipeline()
- .addLast(sslContext.newHandler(ch.alloc(),
LOCAL_HOST, config.getGrpcServerPort()));
- }
- ch.pipeline().addLast(new
Http2ProxyBackendHandler(inboundChannel));
+ ch.pipeline().addLast(null,
Http2ProxyBackendHandler.HANDLER_NAME,
+ new Http2ProxyBackendHandler(inboundChannel));
}
})
.option(ChannelOption.AUTO_READ, false)
@@ -120,7 +121,15 @@ public class Http2ProtocolProxyHandler implements
ProtocolHandler {
}
final Channel outboundChannel = f.channel();
+ if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+ ctx.pipeline().addLast(new
HAProxyMessageForwarder(outboundChannel));
+
outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
+ }
- ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel));
+ SslHandler sslHandler = null;
+ if (sslContext != null) {
+ sslHandler = sslContext.newHandler(outboundChannel.alloc(),
LOCAL_HOST, config.getGrpcServerPort());
+ }
+ ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel,
sslHandler));
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
index 0195b0c1c6..fd5408fae3 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
@@ -29,6 +29,8 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+ public static final String HANDLER_NAME = "Http2ProxyBackendHandler";
+
private final Channel inboundChannel;
public Http2ProxyBackendHandler(Channel inboundChannel) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
index 87147a3226..9b37e85e53 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
@@ -19,36 +19,42 @@ package
org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.ssl.SslHandler;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+
+ public static final String HANDLER_NAME = "SslHandler";
+
// As we use inboundChannel.eventLoop() when building the Bootstrap this
does not need to be volatile as
// the outboundChannel will use the same EventLoop (and therefore Thread)
as the inboundChannel.
private final Channel outboundChannel;
+ private final SslHandler sslHandler;
- public Http2ProxyFrontendHandler(final Channel outboundChannel) {
+ public Http2ProxyFrontendHandler(final Channel outboundChannel, final
SslHandler sslHandler) {
this.outboundChannel = outboundChannel;
+ this.sslHandler = sslHandler;
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
- outboundChannel.writeAndFlush(msg).addListener(new
ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.isSuccess()) {
- // was able to flush out data, start to read the next
chunk
- ctx.channel().read();
- } else {
- future.channel().close();
- }
+ if (sslHandler != null &&
outboundChannel.pipeline().get(HANDLER_NAME) == null) {
+
outboundChannel.pipeline().addBefore(Http2ProxyBackendHandler.HANDLER_NAME,
HANDLER_NAME, sslHandler);
+ }
+
+
outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future
-> {
+ if (future.isSuccess()) {
+ // was able to flush out data, start to read the next chunk
+ ctx.channel().read();
+ } else {
+ future.channel().close();
}
});
}