This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 90c5382aee [ISSUE #7061] Support forward HAProxyMessage for Multi 
Protocol server. (#7062)
90c5382aee is described below

commit 90c5382aee07879a80309f257f04114201ccaac6
Author: ShuangxiDing <[email protected]>
AuthorDate: Fri Jul 21 20:28:58 2023 +0800

    [ISSUE #7061] Support forward HAProxyMessage for Multi Protocol server. 
(#7062)
    
    * Support dynamic modification of grpc tls mode to improve the scalability 
of ProtocolNegotiator
    
    * Support dynamic modification of grpc tls mode to improve the scalability 
of ProtocolNegotiator
    
    * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC server.
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * 回滚netty的升级
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * add grpc-netty-codec-haproxy in bazel
    
    * add grpc-netty-codec-haproxy in bazel
    
    * Support proxy protocol for gRPC and Remoting server.
    
    * Fix Test
    
    * add grpc-netty-codec-haproxy in bazel
    
    * add ProxyProtocolTest for Remoting
    
    * Support HAProxyMessage forward for multi protocol server.
    
    ---------
    
    Co-authored-by: 徒钟 <[email protected]>
---
 .../http2proxy/HAProxyMessageForwarder.java        | 129 +++++++++++++++++++++
 .../http2proxy/Http2ProtocolProxyHandler.java      |  23 ++--
 .../http2proxy/Http2ProxyBackendHandler.java       |   2 +
 .../http2proxy/Http2ProxyFrontendHandler.java      |  28 +++--
 4 files changed, 164 insertions(+), 18 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
new file mode 100644
index 0000000000..8f139d3d9a
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
+import io.netty.util.Attribute;
+import io.netty.util.DefaultAttributeMap;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
+
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+
+    private static final Field FIELD_ATTRIBUTE =
+            FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
+
+    private final Channel outboundChannel;
+
+    public HAProxyMessageForwarder(final Channel outboundChannel) {
+        this.outboundChannel = outboundChannel;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        try {
+            forwardHAProxyMessage(ctx.channel(), outboundChannel);
+            ctx.fireChannelRead(msg);
+        } catch (Exception e) {
+            log.error("Forward HAProxyMessage from Remoting to gRPC server 
error.", e);
+            throw e;
+        } finally {
+            ctx.pipeline().remove(this);
+        }
+    }
+
+    private void forwardHAProxyMessage(Channel inboundChannel, Channel 
outboundChannel) throws Exception {
+        if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+            return;
+        }
+
+        if (!(inboundChannel instanceof DefaultAttributeMap)) {
+            return;
+        }
+
+        Attribute<?>[] attributes = (Attribute<?>[]) 
FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
+        if (ArrayUtils.isEmpty(attributes)) {
+            return;
+        }
+
+        String sourceAddress = null, destinationAddress = null;
+        int sourcePort = 0, destinationPort = 0;
+        List<HAProxyTLV> haProxyTLVs = new ArrayList<>();
+
+        for (Attribute<?> attribute : attributes) {
+            String attributeKey = attribute.key().name();
+            if (!StringUtils.startsWith(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
+                continue;
+            }
+            String attributeValue = (String) attribute.get();
+            if (StringUtils.isEmpty(attributeValue)) {
+                continue;
+            }
+            if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_ADDR) {
+                sourceAddress = attributeValue;
+            }
+            if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_PORT) {
+                sourcePort = Integer.parseInt(attributeValue);
+            }
+            if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR) {
+                destinationAddress = attributeValue;
+            }
+            if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) {
+                destinationPort = Integer.parseInt(attributeValue);
+            }
+            if (StringUtils.startsWith(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
+                String typeString = StringUtils.substringAfter(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
+                ByteBuf byteBuf = Unpooled.buffer();
+                
byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
+                HAProxyTLV haProxyTLV = new 
HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
+                haProxyTLVs.add(haProxyTLV);
+            }
+        }
+
+        HAProxyProxiedProtocol proxiedProtocol = 
AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
+                HAProxyProxiedProtocol.TCP4;
+
+        HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, 
HAProxyCommand.PROXY,
+                proxiedProtocol, sourceAddress, destinationAddress, 
sourcePort, destinationPort, haProxyTLVs);
+        outboundChannel.writeAndFlush(message).sync();
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index 913f35c93d..c37db92af4 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -24,13 +24,14 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
 import io.netty.handler.ssl.ApplicationProtocolConfig;
 import io.netty.handler.ssl.ApplicationProtocolNames;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.ssl.SslProvider;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-import javax.net.ssl.SSLException;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -38,8 +39,11 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
 import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
+import javax.net.ssl.SSLException;
+
 public class Http2ProtocolProxyHandler implements ProtocolHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
     private static final String LOCAL_HOST = "127.0.0.1";
@@ -101,11 +105,8 @@ public class Http2ProtocolProxyHandler implements 
ProtocolHandler {
             .handler(new ChannelInitializer<Channel>() {
                 @Override
                 protected void initChannel(Channel ch) throws Exception {
-                    if (sslContext != null) {
-                        ch.pipeline()
-                            .addLast(sslContext.newHandler(ch.alloc(), 
LOCAL_HOST, config.getGrpcServerPort()));
-                    }
-                    ch.pipeline().addLast(new 
Http2ProxyBackendHandler(inboundChannel));
+                    ch.pipeline().addLast(null, 
Http2ProxyBackendHandler.HANDLER_NAME,
+                            new Http2ProxyBackendHandler(inboundChannel));
                 }
             })
             .option(ChannelOption.AUTO_READ, false)
@@ -120,7 +121,15 @@ public class Http2ProtocolProxyHandler implements 
ProtocolHandler {
         }
 
         final Channel outboundChannel = f.channel();
+        if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+            ctx.pipeline().addLast(new 
HAProxyMessageForwarder(outboundChannel));
+            
outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
+        }
 
-        ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel));
+        SslHandler sslHandler = null;
+        if (sslContext != null) {
+            sslHandler = sslContext.newHandler(outboundChannel.alloc(), 
LOCAL_HOST, config.getGrpcServerPort());
+        }
+        ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel, 
sslHandler));
     }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
index 0195b0c1c6..fd5408fae3 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
@@ -29,6 +29,8 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
 
+    public static final String HANDLER_NAME = "Http2ProxyBackendHandler";
+
     private final Channel inboundChannel;
 
     public Http2ProxyBackendHandler(Channel inboundChannel) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
index 87147a3226..9b37e85e53 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
@@ -19,36 +19,42 @@ package 
org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
 
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.ssl.SslHandler;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+
+    public static final String HANDLER_NAME = "SslHandler";
+
     // As we use inboundChannel.eventLoop() when building the Bootstrap this 
does not need to be volatile as
     // the outboundChannel will use the same EventLoop (and therefore Thread) 
as the inboundChannel.
     private final Channel outboundChannel;
+    private final SslHandler sslHandler;
 
-    public Http2ProxyFrontendHandler(final Channel outboundChannel) {
+    public Http2ProxyFrontendHandler(final Channel outboundChannel, final 
SslHandler sslHandler) {
         this.outboundChannel = outboundChannel;
+        this.sslHandler = sslHandler;
     }
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, Object msg) {
         if (outboundChannel.isActive()) {
-            outboundChannel.writeAndFlush(msg).addListener(new 
ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) {
-                    if (future.isSuccess()) {
-                        // was able to flush out data, start to read the next 
chunk
-                        ctx.channel().read();
-                    } else {
-                        future.channel().close();
-                    }
+            if (sslHandler != null && 
outboundChannel.pipeline().get(HANDLER_NAME) == null) {
+                
outboundChannel.pipeline().addBefore(Http2ProxyBackendHandler.HANDLER_NAME, 
HANDLER_NAME, sslHandler);
+            }
+
+            
outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future 
-> {
+                if (future.isSuccess()) {
+                    // was able to flush out data, start to read the next chunk
+                    ctx.channel().read();
+                } else {
+                    future.channel().close();
                 }
             });
         }

Reply via email to