This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f08c1590f0 Add TLS support to mailboxes used in the multi-stage engine
(#14476)
f08c1590f0 is described below
commit f08c1590f04e455a726c28c39cf38a2109b6ba42
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Nov 19 17:22:30 2024 +0700
Add TLS support to mailboxes used in the multi-stage engine (#14476)
---
.../MultiStageBrokerRequestHandler.java | 6 ++--
.../pinot/common/utils/grpc/GrpcQueryClient.java | 21 +++++++-------
.../pinot/core/transport/grpc/GrpcQueryServer.java | 13 ++++-----
.../apache/pinot/query/mailbox/MailboxService.java | 13 +++++++--
.../query/mailbox/channel/ChannelManager.java | 33 ++++++++++++++++++----
.../query/mailbox/channel/GrpcMailboxServer.java | 28 ++++++++++++++----
.../apache/pinot/query/runtime/QueryRunner.java | 5 ++--
.../pinot/query/service/server/QueryServer.java | 2 +-
.../apache/pinot/query/QueryServerEnclosure.java | 3 +-
.../pinot/server/worker/WorkerQueryServer.java | 2 +-
10 files changed, 89 insertions(+), 37 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 25a7561b83..ae12c0e725 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -40,6 +40,7 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.querylog.QueryLogger;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -93,10 +94,11 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
String hostname =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port =
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(hostname, port, _routingManager);
- _queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port,
config), config.getProperty(
+ TlsConfig tlsConfig = config.getProperty(
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ?
TlsUtils.extractTlsConfig(config,
- CommonConstants.Broker.BROKER_TLS_PREFIX) : null);
+ CommonConstants.Broker.BROKER_TLS_PREFIX) : null;
+ _queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port,
config, tlsConfig), tlsConfig);
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port:
{} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}", hostname,
port, _brokerId, _brokerTimeoutMs,
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit());
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index 808e8d0e02..7d4a6cf487 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -62,13 +62,15 @@ public class GrpcQueryClient implements Closeable {
public GrpcQueryClient(String host, int port, GrpcConfig config) {
ManagedChannelBuilder<?> channelBuilder;
if (config.isUsePlainText()) {
- channelBuilder =
- ManagedChannelBuilder.forAddress(host,
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
- .usePlaintext();
+ channelBuilder = ManagedChannelBuilder
+ .forAddress(host, port)
+ .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
+ .usePlaintext();
} else {
- channelBuilder =
- NettyChannelBuilder.forAddress(host,
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
- .sslContext(buildSslContext(config.getTlsConfig()));
+ channelBuilder = NettyChannelBuilder
+ .forAddress(host, port)
+ .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
+ .sslContext(buildSslContext(config.getTlsConfig()));
}
// Set keep alive configs, if enabled
@@ -85,8 +87,8 @@ public class GrpcQueryClient implements Closeable {
}
public static SslContext buildSslContext(TlsConfig tlsConfig) {
- LOGGER.info("Building gRPC SSL context");
- SslContext sslContext =
CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(),
tlsConfigHashCode -> {
+ LOGGER.info("Building gRPC client SSL context");
+ return CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(),
tlsConfigHashCode -> {
try {
SSLFactory sslFactory =
RenewableTlsUtils.createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores(tlsConfig,
PinotInsecureMode::isPinotInInsecureMode);
@@ -101,10 +103,9 @@ public class GrpcQueryClient implements Closeable {
}
return sslContextBuilder.build();
} catch (SSLException e) {
- throw new RuntimeException("Failed to build gRPC SSL context", e);
+ throw new RuntimeException("Failed to build gRPC client SSL context",
e);
}
});
- return sslContext;
}
public Iterator<Server.ServerResponse> submit(Server.ServerRequest request) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 4d8608c5ea..728d12177f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -105,7 +105,7 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
_serverMetrics = serverMetrics;
if (tlsConfig != null) {
try {
- _server =
NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig))
+ _server =
NettyServerBuilder.forPort(port).sslContext(buildGrpcSslContext(tlsConfig))
.maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()).addService(this)
.addTransportFilter(new GrpcQueryTransportFilter()).build();
} catch (Exception e) {
@@ -119,13 +119,13 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
}
- public static SslContext buildGRpcSslContext(TlsConfig tlsConfig)
+ public static SslContext buildGrpcSslContext(TlsConfig tlsConfig)
throws IllegalArgumentException {
- LOGGER.info("Building gRPC SSL context");
+ LOGGER.info("Building gRPC server SSL context");
if (tlsConfig.getKeyStorePath() == null) {
- throw new IllegalArgumentException("Must provide key store path for
secured gRpc server");
+ throw new IllegalArgumentException("Must provide key store path for
secured gRPC server");
}
- SslContext sslContext =
SERVER_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(),
tlsConfigHashCode -> {
+ return SERVER_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(),
tlsConfigHashCode -> {
try {
SSLFactory sslFactory =
RenewableTlsUtils.createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores(
@@ -138,10 +138,9 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
}
return GrpcSslContexts.configure(sslContextBuilder).build();
} catch (Exception e) {
- throw new RuntimeException("Failed to build gRPC SSL context", e);
+ throw new RuntimeException("Failed to build gRPC server SSL context",
e);
}
});
- return sslContext;
}
public void start() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index c71af0386d..b3233d0011 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -23,6 +23,8 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
@@ -60,14 +62,21 @@ public class MailboxService {
private final String _hostname;
private final int _port;
private final PinotConfiguration _config;
- private final ChannelManager _channelManager = new ChannelManager();
+ private final ChannelManager _channelManager;
+ @Nullable private final TlsConfig _tlsConfig;
private GrpcMailboxServer _grpcMailboxServer;
public MailboxService(String hostname, int port, PinotConfiguration config) {
+ this(hostname, port, config, null);
+ }
+
+ public MailboxService(String hostname, int port, PinotConfiguration config,
@Nullable TlsConfig tlsConfig) {
_hostname = hostname;
_port = port;
_config = config;
+ _tlsConfig = tlsConfig;
+ _channelManager = new ChannelManager(tlsConfig);
LOGGER.info("Initialized MailboxService with hostname: {}, port: {}",
hostname, port);
}
@@ -76,7 +85,7 @@ public class MailboxService {
*/
public void start() {
LOGGER.info("Starting GrpcMailboxServer");
- _grpcMailboxServer = new GrpcMailboxServer(this, _config);
+ _grpcMailboxServer = new GrpcMailboxServer(this, _config, _tlsConfig);
_grpcMailboxServer.start();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index 12f2b4f3f9..ec98066a08 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -20,8 +20,12 @@ package org.apache.pinot.query.mailbox.channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -33,14 +37,31 @@ import org.apache.pinot.spi.utils.CommonConstants;
*/
public class ChannelManager {
private final ConcurrentHashMap<Pair<String, Integer>, ManagedChannel>
_channelMap = new ConcurrentHashMap<>();
+ private final TlsConfig _tlsConfig;
+
+ public ChannelManager(@Nullable TlsConfig tlsConfig) {
+ _tlsConfig = tlsConfig;
+ }
public ManagedChannel getChannel(String hostname, int port) {
// TODO: Revisit parameters
- // TODO: Support TLS
- return _channelMap.computeIfAbsent(Pair.of(hostname, port),
- (k) -> ManagedChannelBuilder.forAddress(k.getLeft(), k.getRight())
- .maxInboundMessageSize(
-
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
- .usePlaintext().build());
+ if (_tlsConfig != null) {
+ return _channelMap.computeIfAbsent(Pair.of(hostname, port),
+ (k) -> NettyChannelBuilder
+ .forAddress(k.getLeft(), k.getRight())
+ .maxInboundMessageSize(
+
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
+ .sslContext(GrpcQueryClient.buildSslContext(_tlsConfig))
+ .build()
+ );
+ } else {
+ return _channelMap.computeIfAbsent(Pair.of(hostname, port),
+ (k) -> ManagedChannelBuilder
+ .forAddress(k.getLeft(), k.getRight())
+ .maxInboundMessageSize(
+
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
+ .usePlaintext()
+ .build());
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index 31eafb32f4..61d7f06171 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -20,11 +20,15 @@ package org.apache.pinot.query.mailbox.channel;
import io.grpc.Server;
import io.grpc.ServerBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
+import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -42,13 +46,27 @@ public class GrpcMailboxServer extends
PinotMailboxGrpc.PinotMailboxImplBase {
private final MailboxService _mailboxService;
private final Server _server;
- public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration
config) {
+ public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration
config, @Nullable TlsConfig tlsConfig) {
_mailboxService = mailboxService;
int port = mailboxService.getPort();
- // TODO: Support TLS
- _server =
ServerBuilder.forPort(port).addService(this).maxInboundMessageSize(
-
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)).build();
+ if (tlsConfig != null) {
+ _server = NettyServerBuilder
+ .forPort(port)
+ .addService(this)
+ .sslContext(GrpcQueryServer.buildGrpcSslContext(tlsConfig))
+ .maxInboundMessageSize(config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES))
+ .build();
+ } else {
+ _server = ServerBuilder
+ .forPort(port)
+ .addService(this)
+ .maxInboundMessageSize(config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES))
+ .build();
+ }
}
public void start() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 65c8d4118d..ba6984bcf9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -37,6 +37,7 @@ import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Worker;
@@ -120,7 +121,7 @@ public class QueryRunner {
* <p>Should be called only once and before calling any other method.
*/
public void init(PinotConfiguration config, InstanceDataManager
instanceDataManager, HelixManager helixManager,
- ServerMetrics serverMetrics) {
+ ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig) {
String hostname =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
hostname =
hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
@@ -148,7 +149,7 @@ public class QueryRunner {
config,
CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_OPCHAIN_EXECUTOR,
"query-runner-on-" + port,
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_OPCHAIN_EXECUTOR);
_opChainScheduler = new OpChainSchedulerService(_executorService);
- _mailboxService = new MailboxService(hostname, port, config);
+ _mailboxService = new MailboxService(hostname, port, config, tlsConfig);
try {
_leafQueryExecutor = new ServerQueryExecutorV1Impl();
_leafQueryExecutor.init(config.subset(CommonConstants.Server.QUERY_EXECUTOR_CONFIG_PREFIX),
instanceDataManager,
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index f94710d702..f2cb5e27b3 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -97,7 +97,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
_server = NettyServerBuilder
.forPort(_port)
.addService(this)
- .sslContext(GrpcQueryServer.buildGRpcSslContext(_tlsConfig))
+ .sslContext(GrpcQueryServer.buildGrpcSslContext(_tlsConfig))
.maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build();
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 437fee82fa..80b50ef970 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -77,7 +77,8 @@ public class QueryServerEnclosure {
InstanceDataManager instanceDataManager =
factory.buildInstanceDataManager();
HelixManager helixManager = mockHelixManager(factory.buildSchemaMap());
_queryRunner = new QueryRunner();
- _queryRunner.init(new PinotConfiguration(runnerConfig),
instanceDataManager, helixManager, mockServiceMetrics());
+ _queryRunner.init(new PinotConfiguration(runnerConfig),
instanceDataManager, helixManager, mockServiceMetrics(),
+ null);
}
private HelixManager mockHelixManager(Map<String, Schema> schemaMap) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 89a4f6d80f..fddde5c429 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -42,7 +42,7 @@ public class WorkerQueryServer {
_queryServicePort =
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
QueryRunner queryRunner = new QueryRunner();
- queryRunner.init(_configuration, instanceDataManager, helixManager,
serverMetrics);
+ queryRunner.init(_configuration, instanceDataManager, helixManager,
serverMetrics, tlsConfig);
_queryWorkerService = new QueryServer(_queryServicePort, queryRunner,
tlsConfig);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]