This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f08c1590f0 Add TLS support to mailboxes used in the multi-stage engine 
(#14476)
f08c1590f0 is described below

commit f08c1590f04e455a726c28c39cf38a2109b6ba42
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Nov 19 17:22:30 2024 +0700

    Add TLS support to mailboxes used in the multi-stage engine (#14476)
---
 .../MultiStageBrokerRequestHandler.java            |  6 ++--
 .../pinot/common/utils/grpc/GrpcQueryClient.java   | 21 +++++++-------
 .../pinot/core/transport/grpc/GrpcQueryServer.java | 13 ++++-----
 .../apache/pinot/query/mailbox/MailboxService.java | 13 +++++++--
 .../query/mailbox/channel/ChannelManager.java      | 33 ++++++++++++++++++----
 .../query/mailbox/channel/GrpcMailboxServer.java   | 28 ++++++++++++++----
 .../apache/pinot/query/runtime/QueryRunner.java    |  5 ++--
 .../pinot/query/service/server/QueryServer.java    |  2 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |  3 +-
 .../pinot/server/worker/WorkerQueryServer.java     |  2 +-
 10 files changed, 89 insertions(+), 37 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 25a7561b83..ae12c0e725 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -40,6 +40,7 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.querylog.QueryLogger;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -93,10 +94,11 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     String hostname = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     int port = 
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
     _workerManager = new WorkerManager(hostname, port, _routingManager);
-    _queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, 
config), config.getProperty(
+    TlsConfig tlsConfig = config.getProperty(
         CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
         CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? 
TlsUtils.extractTlsConfig(config,
-        CommonConstants.Broker.BROKER_TLS_PREFIX) : null);
+        CommonConstants.Broker.BROKER_TLS_PREFIX) : null;
+    _queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, 
config, tlsConfig), tlsConfig);
     LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: 
{} with broker id: {}, timeout: {}ms, "
             + "query log max length: {}, query log max rate: {}", hostname, 
port, _brokerId, _brokerTimeoutMs,
         _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit());
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index 808e8d0e02..7d4a6cf487 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -62,13 +62,15 @@ public class GrpcQueryClient implements Closeable {
   public GrpcQueryClient(String host, int port, GrpcConfig config) {
     ManagedChannelBuilder<?> channelBuilder;
     if (config.isUsePlainText()) {
-      channelBuilder =
-          ManagedChannelBuilder.forAddress(host, 
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
-              .usePlaintext();
+      channelBuilder = ManagedChannelBuilder
+          .forAddress(host, port)
+          .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
+          .usePlaintext();
     } else {
-      channelBuilder =
-          NettyChannelBuilder.forAddress(host, 
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
-              .sslContext(buildSslContext(config.getTlsConfig()));
+      channelBuilder = NettyChannelBuilder
+          .forAddress(host, port)
+          .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
+          .sslContext(buildSslContext(config.getTlsConfig()));
     }
 
     // Set keep alive configs, if enabled
@@ -85,8 +87,8 @@ public class GrpcQueryClient implements Closeable {
   }
 
   public static SslContext buildSslContext(TlsConfig tlsConfig) {
-    LOGGER.info("Building gRPC SSL context");
-    SslContext sslContext = 
CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(), 
tlsConfigHashCode -> {
+    LOGGER.info("Building gRPC client SSL context");
+    return CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(), 
tlsConfigHashCode -> {
       try {
         SSLFactory sslFactory = 
RenewableTlsUtils.createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores(tlsConfig,
             PinotInsecureMode::isPinotInInsecureMode);
@@ -101,10 +103,9 @@ public class GrpcQueryClient implements Closeable {
         }
         return sslContextBuilder.build();
       } catch (SSLException e) {
-        throw new RuntimeException("Failed to build gRPC SSL context", e);
+        throw new RuntimeException("Failed to build gRPC client SSL context", 
e);
       }
     });
-    return sslContext;
   }
 
   public Iterator<Server.ServerResponse> submit(Server.ServerRequest request) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 4d8608c5ea..728d12177f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -105,7 +105,7 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
     _serverMetrics = serverMetrics;
     if (tlsConfig != null) {
       try {
-        _server = 
NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig))
+        _server = 
NettyServerBuilder.forPort(port).sslContext(buildGrpcSslContext(tlsConfig))
             
.maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()).addService(this)
             .addTransportFilter(new GrpcQueryTransportFilter()).build();
       } catch (Exception e) {
@@ -119,13 +119,13 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
         ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   }
 
-  public static SslContext buildGRpcSslContext(TlsConfig tlsConfig)
+  public static SslContext buildGrpcSslContext(TlsConfig tlsConfig)
       throws IllegalArgumentException {
-    LOGGER.info("Building gRPC SSL context");
+    LOGGER.info("Building gRPC server SSL context");
     if (tlsConfig.getKeyStorePath() == null) {
-      throw new IllegalArgumentException("Must provide key store path for 
secured gRpc server");
+      throw new IllegalArgumentException("Must provide key store path for 
secured gRPC server");
     }
-    SslContext sslContext = 
SERVER_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(), 
tlsConfigHashCode -> {
+    return SERVER_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(), 
tlsConfigHashCode -> {
       try {
         SSLFactory sslFactory =
             
RenewableTlsUtils.createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores(
@@ -138,10 +138,9 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
         }
         return GrpcSslContexts.configure(sslContextBuilder).build();
       } catch (Exception e) {
-        throw new RuntimeException("Failed to build gRPC SSL context", e);
+        throw new RuntimeException("Failed to build gRPC server SSL context", 
e);
       }
     });
-    return sslContext;
   }
 
   public void start() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index c71af0386d..b3233d0011 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -23,6 +23,8 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
@@ -60,14 +62,21 @@ public class MailboxService {
   private final String _hostname;
   private final int _port;
   private final PinotConfiguration _config;
-  private final ChannelManager _channelManager = new ChannelManager();
+  private final ChannelManager _channelManager;
+  @Nullable private final TlsConfig _tlsConfig;
 
   private GrpcMailboxServer _grpcMailboxServer;
 
   public MailboxService(String hostname, int port, PinotConfiguration config) {
+    this(hostname, port, config, null);
+  }
+
+  public MailboxService(String hostname, int port, PinotConfiguration config, 
@Nullable TlsConfig tlsConfig) {
     _hostname = hostname;
     _port = port;
     _config = config;
+    _tlsConfig = tlsConfig;
+    _channelManager = new ChannelManager(tlsConfig);
     LOGGER.info("Initialized MailboxService with hostname: {}, port: {}", 
hostname, port);
   }
 
@@ -76,7 +85,7 @@ public class MailboxService {
    */
   public void start() {
     LOGGER.info("Starting GrpcMailboxServer");
-    _grpcMailboxServer = new GrpcMailboxServer(this, _config);
+    _grpcMailboxServer = new GrpcMailboxServer(this, _config, _tlsConfig);
     _grpcMailboxServer.start();
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index 12f2b4f3f9..ec98066a08 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -20,8 +20,12 @@ package org.apache.pinot.query.mailbox.channel;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
@@ -33,14 +37,31 @@ import org.apache.pinot.spi.utils.CommonConstants;
  */
 public class ChannelManager {
   private final ConcurrentHashMap<Pair<String, Integer>, ManagedChannel> 
_channelMap = new ConcurrentHashMap<>();
+  private final TlsConfig _tlsConfig;
+
+  public ChannelManager(@Nullable TlsConfig tlsConfig) {
+    _tlsConfig = tlsConfig;
+  }
 
   public ManagedChannel getChannel(String hostname, int port) {
     // TODO: Revisit parameters
-    // TODO: Support TLS
-    return _channelMap.computeIfAbsent(Pair.of(hostname, port),
-        (k) -> ManagedChannelBuilder.forAddress(k.getLeft(), k.getRight())
-            .maxInboundMessageSize(
-                
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
-            .usePlaintext().build());
+    if (_tlsConfig != null) {
+      return _channelMap.computeIfAbsent(Pair.of(hostname, port),
+          (k) -> NettyChannelBuilder
+              .forAddress(k.getLeft(), k.getRight())
+              .maxInboundMessageSize(
+                  
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
+              .sslContext(GrpcQueryClient.buildSslContext(_tlsConfig))
+              .build()
+      );
+    } else {
+      return _channelMap.computeIfAbsent(Pair.of(hostname, port),
+          (k) -> ManagedChannelBuilder
+              .forAddress(k.getLeft(), k.getRight())
+              .maxInboundMessageSize(
+                  
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
+              .usePlaintext()
+              .build());
+    }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index 31eafb32f4..61d7f06171 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -20,11 +20,15 @@ package org.apache.pinot.query.mailbox.channel;
 
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
+import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -42,13 +46,27 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
   private final MailboxService _mailboxService;
   private final Server _server;
 
-  public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration 
config) {
+  public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration 
config, @Nullable TlsConfig tlsConfig) {
     _mailboxService = mailboxService;
     int port = mailboxService.getPort();
-    // TODO: Support TLS
-    _server = 
ServerBuilder.forPort(port).addService(this).maxInboundMessageSize(
-        
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-            
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)).build();
+    if (tlsConfig != null) {
+      _server = NettyServerBuilder
+          .forPort(port)
+          .addService(this)
+          .sslContext(GrpcQueryServer.buildGrpcSslContext(tlsConfig))
+          .maxInboundMessageSize(config.getProperty(
+              
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+              
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES))
+          .build();
+    } else {
+      _server = ServerBuilder
+          .forPort(port)
+          .addService(this)
+          .maxInboundMessageSize(config.getProperty(
+              
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+              
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES))
+          .build();
+    }
   }
 
   public void start() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 65c8d4118d..ba6984bcf9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -37,6 +37,7 @@ import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Worker;
@@ -120,7 +121,7 @@ public class QueryRunner {
    * <p>Should be called only once and before calling any other method.
    */
   public void init(PinotConfiguration config, InstanceDataManager 
instanceDataManager, HelixManager helixManager,
-      ServerMetrics serverMetrics) {
+      ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig) {
     String hostname = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
       hostname = 
hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
@@ -148,7 +149,7 @@ public class QueryRunner {
         config, 
CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_OPCHAIN_EXECUTOR, 
"query-runner-on-" + port,
         CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_OPCHAIN_EXECUTOR);
     _opChainScheduler = new OpChainSchedulerService(_executorService);
-    _mailboxService = new MailboxService(hostname, port, config);
+    _mailboxService = new MailboxService(hostname, port, config, tlsConfig);
     try {
       _leafQueryExecutor = new ServerQueryExecutorV1Impl();
       
_leafQueryExecutor.init(config.subset(CommonConstants.Server.QUERY_EXECUTOR_CONFIG_PREFIX),
 instanceDataManager,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index f94710d702..f2cb5e27b3 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -97,7 +97,7 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
           _server = NettyServerBuilder
               .forPort(_port)
               .addService(this)
-              .sslContext(GrpcQueryServer.buildGRpcSslContext(_tlsConfig))
+              .sslContext(GrpcQueryServer.buildGrpcSslContext(_tlsConfig))
               .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
               .build();
         }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 437fee82fa..80b50ef970 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -77,7 +77,8 @@ public class QueryServerEnclosure {
     InstanceDataManager instanceDataManager = 
factory.buildInstanceDataManager();
     HelixManager helixManager = mockHelixManager(factory.buildSchemaMap());
     _queryRunner = new QueryRunner();
-    _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager, helixManager, mockServiceMetrics());
+    _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager, helixManager, mockServiceMetrics(),
+        null);
   }
 
   private HelixManager mockHelixManager(Map<String, Schema> schemaMap) {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 89a4f6d80f..fddde5c429 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -42,7 +42,7 @@ public class WorkerQueryServer {
     _queryServicePort = 
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
         CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     QueryRunner queryRunner = new QueryRunner();
-    queryRunner.init(_configuration, instanceDataManager, helixManager, 
serverMetrics);
+    queryRunner.init(_configuration, instanceDataManager, helixManager, 
serverMetrics, tlsConfig);
     _queryWorkerService = new QueryServer(_queryServicePort, queryRunner, 
tlsConfig);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to