This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new cd8fd7bba [ISSUE #6047] Support TLS permissive mode for 5.x client 
(#6048)
cd8fd7bba is described below

commit cd8fd7bbafb80ea4d7ff1ad9e3a04fa8fbd2b09f
Author: lk <[email protected]>
AuthorDate: Wed Feb 15 10:35:18 2023 +0800

    [ISSUE #6047] Support TLS permissive mode for 5.x client (#6048)
---
 .../rocketmq/proxy/grpc/GrpcServerBuilder.java     | 52 +++++++-----
 .../proxy/grpc/OptionalSSLProtocolNegotiator.java  | 94 ++++++++++++++++++++++
 .../http2proxy/Http2ProtocolProxyHandler.java      | 27 ++++---
 3 files changed, 143 insertions(+), 30 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 5e1b73505..d496bfd10 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -26,6 +26,7 @@ import 
io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
 import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
 import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import 
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
 import java.io.IOException;
@@ -36,7 +37,6 @@ import java.security.cert.CertificateException;
 import java.util.List;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import javax.net.ssl.SSLException;
 import org.apache.rocketmq.acl.AccessValidator;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ServiceProvider;
@@ -48,6 +48,8 @@ import 
org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor;
 import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
 import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
 import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
 public class GrpcServerBuilder {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -113,32 +115,42 @@ public class GrpcServerBuilder {
         return new GrpcServer(this.serverBuilder.build());
     }
 
-    protected void configSslContext(NettyServerBuilder serverBuilder) throws 
SSLException, CertificateException {
+    protected void configSslContext(NettyServerBuilder serverBuilder) throws 
IOException, CertificateException {
         if (null == serverBuilder) {
             return;
         }
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        boolean tlsTestModeEnable = proxyConfig.isTlsTestModeEnable();
-        if (tlsTestModeEnable) {
-            SelfSignedCertificate selfSignedCertificate = new 
SelfSignedCertificate();
-            
serverBuilder.sslContext(GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
 selfSignedCertificate.privateKey())
-                .trustManager(InsecureTrustManagerFactory.INSTANCE)
-                .clientAuth(ClientAuth.NONE)
-                .build());
-            return;
+
+        TlsMode tlsMode = TlsSystemConfig.tlsMode;
+        if (!TlsMode.DISABLED.equals(tlsMode)) {
+            SslContext sslContext = loadSslContext();
+            if (TlsMode.PERMISSIVE.equals(tlsMode)) {
+                serverBuilder.protocolNegotiator(new 
OptionalSSLProtocolNegotiator(sslContext));
+            } else {
+                serverBuilder.sslContext(sslContext);
+            }
         }
+    }
 
-        String tlsKeyPath = 
ConfigurationManager.getProxyConfig().getTlsKeyPath();
-        String tlsCertPath = 
ConfigurationManager.getProxyConfig().getTlsCertPath();
-        try (InputStream serverKeyInputStream = 
Files.newInputStream(Paths.get(tlsKeyPath));
-             InputStream serverCertificateStream = 
Files.newInputStream(Paths.get(tlsCertPath))) {
-            
serverBuilder.sslContext(GrpcSslContexts.forServer(serverCertificateStream, 
serverKeyInputStream)
+    protected SslContext loadSslContext() throws CertificateException, 
IOException {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        if (proxyConfig.isTlsTestModeEnable()) {
+            SelfSignedCertificate selfSignedCertificate = new 
SelfSignedCertificate();
+            return 
GrpcSslContexts.forServer(selfSignedCertificate.certificate(), 
selfSignedCertificate.privateKey())
                 .trustManager(InsecureTrustManagerFactory.INSTANCE)
                 .clientAuth(ClientAuth.NONE)
-                .build());
-            log.info("TLS configured OK");
-        } catch (IOException e) {
-            log.error("Failed to load Server key/certificate", e);
+                .build();
+        } else {
+            String tlsKeyPath = 
ConfigurationManager.getProxyConfig().getTlsKeyPath();
+            String tlsCertPath = 
ConfigurationManager.getProxyConfig().getTlsCertPath();
+            try (InputStream serverKeyInputStream = 
Files.newInputStream(Paths.get(tlsKeyPath));
+                 InputStream serverCertificateStream = 
Files.newInputStream(Paths.get(tlsCertPath))) {
+                SslContext res = 
GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream)
+                    .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                    .clientAuth(ClientAuth.NONE)
+                    .build();
+                log.info("load TLS configured OK");
+                return res;
+            }
         }
     }
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
new file mode 100644
index 000000000..bf19abf85
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.grpc;
+
+import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
+import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
+import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
+import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
+import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
+import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
+import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
+import io.grpc.netty.shaded.io.netty.util.AsciiString;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator.ProtocolNegotiator {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    private final SslContext sslContext;
+    /**
+     * the length of the ssl record header (in bytes)
+     */
+    private static final int SSL_RECORD_HEADER_LENGTH = 5;
+
+    public OptionalSSLProtocolNegotiator(SslContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public AsciiString scheme() {
+        return AsciiString.of("https");
+    }
+
+    @Override
+    public ChannelHandler newHandler(GrpcHttp2ConnectionHandler 
grpcHttp2ConnectionHandler) {
+        ChannelHandler plaintext =
+            
InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler);
+        ChannelHandler ssl =
+            
InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler);
+        return new PortUnificationServerHandler(ssl, plaintext);
+    }
+
+    @Override
+    public void close() {}
+
+    public static class PortUnificationServerHandler extends 
ByteToMessageDecoder {
+        private final ChannelHandler ssl;
+        private final ChannelHandler plaintext;
+
+        public PortUnificationServerHandler(ChannelHandler ssl, ChannelHandler 
plaintext) {
+            this.ssl = ssl;
+            this.plaintext = plaintext;
+        }
+
+        @Override
+        protected void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out)
+            throws Exception {
+            try {
+                // in SslHandler.isEncrypted, it need at least 5 bytes to 
judge is encrypted or not
+                if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
+                    return;
+                }
+                if (SslHandler.isEncrypted(in)) {
+                    ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
+                } else {
+                    ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
+                }
+                
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
+                ctx.pipeline().remove(this);
+            } catch (Exception e) {
+                log.error("process protocol negotiator failed.", e);
+                throw e;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index 2ba2d3463..033877e16 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -37,6 +37,8 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
 public class Http2ProtocolProxyHandler implements ProtocolHandler {
     private static final Logger log = 
LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@@ -55,16 +57,21 @@ public class Http2ProtocolProxyHandler implements 
ProtocolHandler {
 
     public Http2ProtocolProxyHandler() {
         try {
-            sslContext = SslContextBuilder
-                .forClient()
-                .sslProvider(SslProvider.OPENSSL)
-                .trustManager(InsecureTrustManagerFactory.INSTANCE)
-                .applicationProtocolConfig(new ApplicationProtocolConfig(
-                    ApplicationProtocolConfig.Protocol.ALPN,
-                    
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
-                    
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
-                    ApplicationProtocolNames.HTTP_2))
-                .build();
+            TlsMode tlsMode = TlsSystemConfig.tlsMode;
+            if (TlsMode.DISABLED.equals(tlsMode)) {
+                sslContext = null;
+            } else {
+                sslContext = SslContextBuilder
+                    .forClient()
+                    .sslProvider(SslProvider.OPENSSL)
+                    .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                    .applicationProtocolConfig(new ApplicationProtocolConfig(
+                        ApplicationProtocolConfig.Protocol.ALPN,
+                        
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+                        
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+                        ApplicationProtocolNames.HTTP_2))
+                    .build();
+            }
         } catch (SSLException e) {
             log.error("Failed to create SSLContext for 
Http2ProtocolProxyHandler", e);
             throw new RuntimeException("Failed to create SSLContext for 
Http2ProtocolProxyHandler", e);

Reply via email to