This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4d82b307ef [ISSUE #6866] Move the judgment logic of grpc TLS mode to 
improve the scalability of ProtocolNegotiator (#6867)
4d82b307ef is described below

commit 4d82b307ef50f5cba5717d0ebafeb3cabf336873
Author: Shuangxi Ding <[email protected]>
AuthorDate: Thu Jun 8 11:46:03 2023 +0800

    [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator (#6867)
    
    [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the 
scalability of ProtocolNegotiator
    
    ---------
    
    Co-authored-by: 徒钟 <[email protected]>
---
 .../rocketmq/proxy/grpc/GrpcServerBuilder.java     | 59 +-------------
 .../proxy/grpc/OptionalSSLProtocolNegotiator.java  | 93 +++++++++++++++++-----
 2 files changed, 73 insertions(+), 79 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index faffb66961..0ca6a1fcbd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -19,21 +19,11 @@ package org.apache.rocketmq.proxy.grpc;
 import io.grpc.BindableService;
 import io.grpc.ServerInterceptor;
 import io.grpc.ServerServiceDefinition;
-import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
 import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
 import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
 import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
 import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
-import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
-import 
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.security.cert.CertificateException;
 import java.util.List;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -44,13 +34,10 @@ import org.apache.rocketmq.common.utils.ServiceProvider;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor;
 import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
 import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
 import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
-import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
 public class GrpcServerBuilder {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -63,12 +50,7 @@ public class GrpcServerBuilder {
     protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
         serverBuilder = NettyServerBuilder.forPort(port);
 
-        try {
-            configSslContext(serverBuilder);
-        } catch (Exception e) {
-            log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
-            throw new RuntimeException("grpc tls set failed: " + 
e.getMessage());
-        }
+        serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());
 
         // build server
         int bossLoopNum = 
ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
@@ -116,45 +98,6 @@ public class GrpcServerBuilder {
         return new GrpcServer(this.serverBuilder.build());
     }
 
-    protected void configSslContext(NettyServerBuilder serverBuilder) throws 
IOException, CertificateException {
-        if (null == serverBuilder) {
-            return;
-        }
-
-        TlsMode tlsMode = TlsSystemConfig.tlsMode;
-        if (!TlsMode.DISABLED.equals(tlsMode)) {
-            SslContext sslContext = loadSslContext();
-            if (TlsMode.PERMISSIVE.equals(tlsMode)) {
-                serverBuilder.protocolNegotiator(new 
OptionalSSLProtocolNegotiator(sslContext));
-            } else {
-                serverBuilder.sslContext(sslContext);
-            }
-        }
-    }
-
-    protected SslContext loadSslContext() throws CertificateException, 
IOException {
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        if (proxyConfig.isTlsTestModeEnable()) {
-            SelfSignedCertificate selfSignedCertificate = new 
SelfSignedCertificate();
-            return 
GrpcSslContexts.forServer(selfSignedCertificate.certificate(), 
selfSignedCertificate.privateKey())
-                .trustManager(InsecureTrustManagerFactory.INSTANCE)
-                .clientAuth(ClientAuth.NONE)
-                .build();
-        } else {
-            String tlsKeyPath = 
ConfigurationManager.getProxyConfig().getTlsKeyPath();
-            String tlsCertPath = 
ConfigurationManager.getProxyConfig().getTlsCertPath();
-            try (InputStream serverKeyInputStream = 
Files.newInputStream(Paths.get(tlsKeyPath));
-                 InputStream serverCertificateStream = 
Files.newInputStream(Paths.get(tlsCertPath))) {
-                SslContext res = 
GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream)
-                    .trustManager(InsecureTrustManagerFactory.INSTANCE)
-                    .clientAuth(ClientAuth.NONE)
-                    .build();
-                log.info("load TLS configured OK");
-                return res;
-            }
-        }
-    }
-
     public GrpcServerBuilder configInterceptor() {
         // grpc interceptors, including acl, logging etc.
         List<AccessValidator> accessValidators = 
ServiceProvider.load(AccessValidator.class);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
index bf19abf855..670e1c1a21 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.proxy.grpc;
 
 import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
 import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
 import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
@@ -24,24 +25,36 @@ import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
 import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
 import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
 import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
+import 
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.grpc.netty.shaded.io.netty.util.AsciiString;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.List;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
 public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator.ProtocolNegotiator {
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    private final SslContext sslContext;
+    protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+
     /**
      * the length of the ssl record header (in bytes)
      */
     private static final int SSL_RECORD_HEADER_LENGTH = 5;
 
-    public OptionalSSLProtocolNegotiator(SslContext sslContext) {
-        this.sslContext = sslContext;
+    private static SslContext sslContext;
+
+    public OptionalSSLProtocolNegotiator() {
+        sslContext = loadSslContext();
     }
 
     @Override
@@ -50,43 +63,81 @@ public class OptionalSSLProtocolNegotiator implements 
InternalProtocolNegotiator
     }
 
     @Override
-    public ChannelHandler newHandler(GrpcHttp2ConnectionHandler 
grpcHttp2ConnectionHandler) {
-        ChannelHandler plaintext =
-            
InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler);
-        ChannelHandler ssl =
-            
InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler);
-        return new PortUnificationServerHandler(ssl, plaintext);
+    public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
+        return new PortUnificationServerHandler(grpcHandler);
     }
 
     @Override
     public void close() {}
 
+    private static SslContext loadSslContext() {
+        try {
+            ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+            if (proxyConfig.isTlsTestModeEnable()) {
+                SelfSignedCertificate selfSignedCertificate = new 
SelfSignedCertificate();
+                return 
GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
+                                selfSignedCertificate.privateKey())
+                        .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                        .clientAuth(ClientAuth.NONE)
+                        .build();
+            } else {
+                String tlsKeyPath = 
ConfigurationManager.getProxyConfig().getTlsKeyPath();
+                String tlsCertPath = 
ConfigurationManager.getProxyConfig().getTlsCertPath();
+                try (InputStream serverKeyInputStream = Files.newInputStream(
+                        Paths.get(tlsKeyPath));
+                        InputStream serverCertificateStream = 
Files.newInputStream(
+                                Paths.get(tlsCertPath))) {
+                    SslContext res = 
GrpcSslContexts.forServer(serverCertificateStream,
+                                    serverKeyInputStream)
+                            .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                            .clientAuth(ClientAuth.NONE)
+                            .build();
+                    log.info("grpc load TLS configured OK");
+                    return res;
+                }
+            }
+        } catch (Exception e) {
+            log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
+            throw new RuntimeException("grpc tls set failed: " + 
e.getMessage());
+        }
+    }
+
     public static class PortUnificationServerHandler extends 
ByteToMessageDecoder {
+
         private final ChannelHandler ssl;
         private final ChannelHandler plaintext;
 
-        public PortUnificationServerHandler(ChannelHandler ssl, ChannelHandler 
plaintext) {
-            this.ssl = ssl;
-            this.plaintext = plaintext;
+        public PortUnificationServerHandler(GrpcHttp2ConnectionHandler 
grpcHandler) {
+            this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
+                    .newHandler(grpcHandler);
+            this.plaintext = InternalProtocolNegotiators.serverPlaintext()
+                    .newHandler(grpcHandler);
         }
 
         @Override
         protected void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out)
-            throws Exception {
+                throws Exception {
             try {
-                // in SslHandler.isEncrypted, it need at least 5 bytes to 
judge is encrypted or not
-                if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
-                    return;
-                }
-                if (SslHandler.isEncrypted(in)) {
+                TlsMode tlsMode = TlsSystemConfig.tlsMode;
+                if (TlsMode.ENFORCING.equals(tlsMode)) {
                     ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
-                } else {
+                } else if (TlsMode.DISABLED.equals(tlsMode)) {
                     ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
+                } else {
+                    // in SslHandler.isEncrypted, it need at least 5 bytes to 
judge is encrypted or not
+                    if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
+                        return;
+                    }
+                    if (SslHandler.isEncrypted(in)) {
+                        ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
+                    } else {
+                        ctx.pipeline().addAfter(ctx.name(), null, 
this.plaintext);
+                    }
                 }
                 
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
                 ctx.pipeline().remove(this);
             } catch (Exception e) {
-                log.error("process protocol negotiator failed.", e);
+                log.error("process ssl protocol negotiator failed.", e);
                 throw e;
             }
         }

Reply via email to