This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4d82b307ef [ISSUE #6866] Move the judgment logic of grpc TLS mode to
improve the scalability of ProtocolNegotiator (#6867)
4d82b307ef is described below
commit 4d82b307ef50f5cba5717d0ebafeb3cabf336873
Author: Shuangxi Ding <[email protected]>
AuthorDate: Thu Jun 8 11:46:03 2023 +0800
[ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator (#6867)
[ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the
scalability of ProtocolNegotiator
---------
Co-authored-by: 徒钟 <[email protected]>
---
.../rocketmq/proxy/grpc/GrpcServerBuilder.java | 59 +-------------
.../proxy/grpc/OptionalSSLProtocolNegotiator.java | 93 +++++++++++++++++-----
2 files changed, 73 insertions(+), 79 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index faffb66961..0ca6a1fcbd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -19,21 +19,11 @@ package org.apache.rocketmq.proxy.grpc;
import io.grpc.BindableService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
-import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
-import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
-import
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -44,13 +34,10 @@ import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
-import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
public class GrpcServerBuilder {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -63,12 +50,7 @@ public class GrpcServerBuilder {
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
serverBuilder = NettyServerBuilder.forPort(port);
- try {
- configSslContext(serverBuilder);
- } catch (Exception e) {
- log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
- throw new RuntimeException("grpc tls set failed: " +
e.getMessage());
- }
+ serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());
// build server
int bossLoopNum =
ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
@@ -116,45 +98,6 @@ public class GrpcServerBuilder {
return new GrpcServer(this.serverBuilder.build());
}
- protected void configSslContext(NettyServerBuilder serverBuilder) throws
IOException, CertificateException {
- if (null == serverBuilder) {
- return;
- }
-
- TlsMode tlsMode = TlsSystemConfig.tlsMode;
- if (!TlsMode.DISABLED.equals(tlsMode)) {
- SslContext sslContext = loadSslContext();
- if (TlsMode.PERMISSIVE.equals(tlsMode)) {
- serverBuilder.protocolNegotiator(new
OptionalSSLProtocolNegotiator(sslContext));
- } else {
- serverBuilder.sslContext(sslContext);
- }
- }
- }
-
- protected SslContext loadSslContext() throws CertificateException,
IOException {
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- if (proxyConfig.isTlsTestModeEnable()) {
- SelfSignedCertificate selfSignedCertificate = new
SelfSignedCertificate();
- return
GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
selfSignedCertificate.privateKey())
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .clientAuth(ClientAuth.NONE)
- .build();
- } else {
- String tlsKeyPath =
ConfigurationManager.getProxyConfig().getTlsKeyPath();
- String tlsCertPath =
ConfigurationManager.getProxyConfig().getTlsCertPath();
- try (InputStream serverKeyInputStream =
Files.newInputStream(Paths.get(tlsKeyPath));
- InputStream serverCertificateStream =
Files.newInputStream(Paths.get(tlsCertPath))) {
- SslContext res =
GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream)
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .clientAuth(ClientAuth.NONE)
- .build();
- log.info("load TLS configured OK");
- return res;
- }
- }
- }
-
public GrpcServerBuilder configInterceptor() {
// grpc interceptors, including acl, logging etc.
List<AccessValidator> accessValidators =
ServiceProvider.load(AccessValidator.class);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
index bf19abf855..670e1c1a21 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.proxy.grpc;
import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
@@ -24,24 +25,36 @@ import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
+import
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import io.grpc.netty.shaded.io.netty.util.AsciiString;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator.ProtocolNegotiator {
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- private final SslContext sslContext;
+ protected static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+
/**
* the length of the ssl record header (in bytes)
*/
private static final int SSL_RECORD_HEADER_LENGTH = 5;
- public OptionalSSLProtocolNegotiator(SslContext sslContext) {
- this.sslContext = sslContext;
+ private static SslContext sslContext;
+
+ public OptionalSSLProtocolNegotiator() {
+ sslContext = loadSslContext();
}
@Override
@@ -50,43 +63,81 @@ public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator
}
@Override
- public ChannelHandler newHandler(GrpcHttp2ConnectionHandler
grpcHttp2ConnectionHandler) {
- ChannelHandler plaintext =
-
InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler);
- ChannelHandler ssl =
-
InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler);
- return new PortUnificationServerHandler(ssl, plaintext);
+ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
+ return new PortUnificationServerHandler(grpcHandler);
}
@Override
public void close() {}
+ private static SslContext loadSslContext() {
+ try {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ if (proxyConfig.isTlsTestModeEnable()) {
+ SelfSignedCertificate selfSignedCertificate = new
SelfSignedCertificate();
+ return
GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
+ selfSignedCertificate.privateKey())
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .clientAuth(ClientAuth.NONE)
+ .build();
+ } else {
+ String tlsKeyPath =
ConfigurationManager.getProxyConfig().getTlsKeyPath();
+ String tlsCertPath =
ConfigurationManager.getProxyConfig().getTlsCertPath();
+ try (InputStream serverKeyInputStream = Files.newInputStream(
+ Paths.get(tlsKeyPath));
+ InputStream serverCertificateStream =
Files.newInputStream(
+ Paths.get(tlsCertPath))) {
+ SslContext res =
GrpcSslContexts.forServer(serverCertificateStream,
+ serverKeyInputStream)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .clientAuth(ClientAuth.NONE)
+ .build();
+ log.info("grpc load TLS configured OK");
+ return res;
+ }
+ }
+ } catch (Exception e) {
+ log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
+ throw new RuntimeException("grpc tls set failed: " +
e.getMessage());
+ }
+ }
+
public static class PortUnificationServerHandler extends
ByteToMessageDecoder {
+
private final ChannelHandler ssl;
private final ChannelHandler plaintext;
- public PortUnificationServerHandler(ChannelHandler ssl, ChannelHandler
plaintext) {
- this.ssl = ssl;
- this.plaintext = plaintext;
+ public PortUnificationServerHandler(GrpcHttp2ConnectionHandler
grpcHandler) {
+ this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
+ .newHandler(grpcHandler);
+ this.plaintext = InternalProtocolNegotiators.serverPlaintext()
+ .newHandler(grpcHandler);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out)
- throws Exception {
+ throws Exception {
try {
- // in SslHandler.isEncrypted, it need at least 5 bytes to
judge is encrypted or not
- if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
- return;
- }
- if (SslHandler.isEncrypted(in)) {
+ TlsMode tlsMode = TlsSystemConfig.tlsMode;
+ if (TlsMode.ENFORCING.equals(tlsMode)) {
ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
- } else {
+ } else if (TlsMode.DISABLED.equals(tlsMode)) {
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
+ } else {
+ // in SslHandler.isEncrypted, it need at least 5 bytes to
judge is encrypted or not
+ if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
+ return;
+ }
+ if (SslHandler.isEncrypted(in)) {
+ ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
+ } else {
+ ctx.pipeline().addAfter(ctx.name(), null,
this.plaintext);
+ }
}
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
ctx.pipeline().remove(this);
} catch (Exception e) {
- log.error("process protocol negotiator failed.", e);
+ log.error("process ssl protocol negotiator failed.", e);
throw e;
}
}