This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d0426e9864 Put Netty Channel handlers and Tls context creation logic
at same place (#8163)
d0426e9864 is described below
commit d0426e9864a1ab6b7b8407603f19b7a30503e46a
Author: Liang Mingqiang <[email protected]>
AuthorDate: Mon Aug 1 17:45:42 2022 -0700
Put Netty Channel handlers and Tls context creation logic at same place
(#8163)
---
.../org/apache/pinot/common/utils/TlsUtils.java | 49 ++++++++++++
.../core/transport/ChannelHandlerFactory.java | 89 ++++++++++++++++++++++
.../apache/pinot/core/transport/QueryServer.java | 48 +++---------
.../pinot/core/transport/ServerChannels.java | 40 +++-------
4 files changed, 156 insertions(+), 70 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java
index 404d03a127..0c444c2d68 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java
@@ -19,6 +19,10 @@
package org.apache.pinot.common.utils;
import com.google.common.base.Preconditions;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
@@ -339,4 +343,49 @@ public final class TlsUtils {
return _sslSocketFactory.createSocket(host, port);
}
}
+
+ /**
+ * Builds client side SslContext based on a given TlsConfig.
+ *
+ * @param tlsConfig TLS config
+ */
+ public static SslContext buildClientContext(TlsConfig tlsConfig) {
+ SslContextBuilder sslContextBuilder =
+
SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(tlsConfig.getSslProvider()));
+ if (tlsConfig.getKeyStorePath() != null) {
+ sslContextBuilder.keyManager(createKeyManagerFactory(tlsConfig));
+ }
+ if (tlsConfig.getTrustStorePath() != null) {
+ sslContextBuilder.trustManager(createTrustManagerFactory(tlsConfig));
+ }
+ try {
+ return sslContextBuilder.build();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Builds server side SslContext based on a given TlsConfig.
+ *
+ * @param tlsConfig TLS config
+ */
+ public static SslContext buildServerContext(TlsConfig tlsConfig) {
+ if (tlsConfig.getKeyStorePath() == null) {
+ throw new IllegalArgumentException("Must provide key store path for
secured server");
+ }
+ SslContextBuilder sslContextBuilder =
SslContextBuilder.forServer(createKeyManagerFactory(tlsConfig))
+ .sslProvider(SslProvider.valueOf(tlsConfig.getSslProvider()));
+ if (tlsConfig.getTrustStorePath() != null) {
+ sslContextBuilder.trustManager(createTrustManagerFactory(tlsConfig));
+ }
+ if (tlsConfig.isClientAuthEnabled()) {
+ sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+ }
+ try {
+ return sslContextBuilder.build();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
new file mode 100644
index 0000000000..c62c266af5
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.pinot.server.access.AccessControl;
+
+
+/**
+ * The {@code ChannelHandlerFactory} provides all kinds of Netty
ChannelHandlers
+ */
+public class ChannelHandlerFactory {
+
+ public static final String SSL = "ssl";
+
+ private ChannelHandlerFactory() {
+ }
+
+ /**
+ * The {@code getLengthFieldBasedFrameDecoder} return a decoder
ChannelHandler that splits the received ByteBuffers
+ * dynamically by the value of the length field in the message
+ */
+ public static ChannelHandler getLengthFieldBasedFrameDecoder() {
+ return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,
Integer.BYTES, 0, Integer.BYTES);
+ }
+
+ /**
+ * The {@code getLengthFieldPrepender} return an encoder ChannelHandler that
prepends the length of the message.
+ */
+ public static ChannelHandler getLengthFieldPrepender() {
+ return new LengthFieldPrepender(Integer.BYTES);
+ }
+
+ /**
+ * The {@code getClientTlsHandler} return a Client side Tls handler that
encrypt and decrypt everything.
+ */
+ public static ChannelHandler getClientTlsHandler(TlsConfig tlsConfig,
SocketChannel ch) {
+ return TlsUtils.buildClientContext(tlsConfig).newHandler(ch.alloc());
+ }
+
+ /**
+ * The {@code getServerTlsHandler} return a Server side Tls handler that
encrypt and decrypt everything.
+ */
+ public static ChannelHandler getServerTlsHandler(TlsConfig tlsConfig,
SocketChannel ch) {
+ return TlsUtils.buildServerContext(tlsConfig).newHandler(ch.alloc());
+ }
+
+ /**
+ * The {@code getDataTableHandler} return a {@code DataTableHandler} Netty
inbound handler on Pinot Broker side to
+ * handle the serialized data table responses sent from Pinot Server.
+ */
+ public static ChannelHandler getDataTableHandler(QueryRouter queryRouter,
ServerRoutingInstance serverRoutingInstance,
+ BrokerMetrics brokerMetrics) {
+ return new DataTableHandler(queryRouter, serverRoutingInstance,
brokerMetrics);
+ }
+
+ /**
+ * The {@code getInstanceRequestHandler} return a {@code
InstanceRequestHandler} Netty inbound handler on Pinot
+ * Server side to handle the serialized instance requests sent from Pinot
Broker.
+ */
+ public static ChannelHandler getInstanceRequestHandler(QueryScheduler
queryScheduler, ServerMetrics serverMetrics,
+ AccessControl accessControl) {
+ return new InstanceRequestHandler(queryScheduler, serverMetrics,
accessControl);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index d0acbc6000..beb64e58f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -33,16 +33,10 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.handler.ssl.ClientAuth;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.util.OsCheck;
import org.apache.pinot.server.access.AccessControl;
@@ -68,7 +62,6 @@ public class QueryServer {
private final Class<? extends ServerSocketChannel> _channelClass;
private Channel _channel;
-
/**
* Create an unsecured server instance
*
@@ -89,11 +82,10 @@ public class QueryServer {
* @param serverMetrics server metrics
* @param nettyConfig configurations for netty library
* @param tlsConfig TLS/SSL config
- * @param accessControlFactory access control factory for netty channel
+ * @param accessControl access control for netty channel
*/
public QueryServer(int port, QueryScheduler queryScheduler, ServerMetrics
serverMetrics, NettyConfig nettyConfig,
- TlsConfig tlsConfig,
- AccessControl accessControl) {
+ TlsConfig tlsConfig, AccessControl accessControl) {
_port = port;
_queryScheduler = queryScheduler;
_serverMetrics = serverMetrics;
@@ -141,13 +133,15 @@ public class QueryServer {
@Override
protected void initChannel(SocketChannel ch) {
if (_tlsConfig != null) {
- attachSSLHandler(ch);
+ // Add SSL handler first to encrypt and decrypt everything.
+ ch.pipeline()
+ .addLast(ChannelHandlerFactory.SSL,
ChannelHandlerFactory.getServerTlsHandler(_tlsConfig, ch));
}
- ch.pipeline()
- .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
0, Integer.BYTES, 0, Integer.BYTES),
- new LengthFieldPrepender(Integer.BYTES),
- new InstanceRequestHandler(_queryScheduler,
_serverMetrics, _accessControl));
+
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder());
+
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender());
+ ch.pipeline().addLast(
+
ChannelHandlerFactory.getInstanceRequestHandler(_queryScheduler,
_serverMetrics, _accessControl));
}
}).bind(_port).sync().channel();
} catch (Exception e) {
@@ -158,30 +152,6 @@ public class QueryServer {
}
}
- private void attachSSLHandler(SocketChannel ch) {
- try {
- if (_tlsConfig.getKeyStorePath() == null) {
- throw new IllegalArgumentException("Must provide key store path for
secured server");
- }
-
- SslContextBuilder sslContextBuilder = SslContextBuilder
- .forServer(TlsUtils.createKeyManagerFactory(_tlsConfig))
- .sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider()));
-
- if (_tlsConfig.getTrustStorePath() != null) {
-
sslContextBuilder.trustManager(TlsUtils.createTrustManagerFactory(_tlsConfig));
- }
-
- if (_tlsConfig.isClientAuthEnabled()) {
- sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
- }
-
- ch.pipeline().addLast("ssl",
sslContextBuilder.build().newHandler(ch.alloc()));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
public void shutDown() {
try {
_channel.close().sync();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index aeeda17819..869e497486 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -33,10 +33,6 @@ import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -50,7 +46,6 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.core.util.OsCheck;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -162,38 +157,21 @@ public class ServerChannels {
@Override
protected void initChannel(SocketChannel ch) {
if (_tlsConfig != null) {
- attachSSLHandler(ch);
+ // Add SSL handler first to encrypt and decrypt everything.
+ ch.pipeline().addLast(
+ ChannelHandlerFactory.SSL,
ChannelHandlerFactory.getClientTlsHandler(_tlsConfig, ch));
}
- ch.pipeline()
- .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
0, Integer.BYTES, 0, Integer.BYTES),
- new LengthFieldPrepender(Integer.BYTES),
- // NOTE: data table de-serialization happens inside this
handler
- // Revisit if this becomes a bottleneck
- new DataTableHandler(_queryRouter,
_serverRoutingInstance, _brokerMetrics));
+
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder());
+
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender());
+ // NOTE: data table de-serialization happens inside this handler
+ // Revisit if this becomes a bottleneck
+ ch.pipeline().addLast(
+ ChannelHandlerFactory.getDataTableHandler(_queryRouter,
_serverRoutingInstance, _brokerMetrics));
}
});
}
- void attachSSLHandler(SocketChannel ch) {
- try {
- SslContextBuilder sslContextBuilder =
-
SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider()));
-
- if (_tlsConfig.getKeyStorePath() != null) {
-
sslContextBuilder.keyManager(TlsUtils.createKeyManagerFactory(_tlsConfig));
- }
-
- if (_tlsConfig.getTrustStorePath() != null) {
-
sslContextBuilder.trustManager(TlsUtils.createTrustManagerFactory(_tlsConfig));
- }
-
- ch.pipeline().addLast("ssl",
sslContextBuilder.build().newHandler(ch.alloc()));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
void sendRequest(String rawTableName, AsyncQueryResponse
asyncQueryResponse,
ServerRoutingInstance serverRoutingInstance, byte[] requestBytes, long
timeoutMs)
throws InterruptedException, TimeoutException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]