This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c6fea33d07a Propose MSE Grpc Channel Authorization (#16475)
c6fea33d07a is described below

commit c6fea33d07ae1ee844ae49d335304fa8195afdd8
Author: Dino Occhialini <[email protected]>
AuthorDate: Thu Aug 21 00:38:15 2025 -0700

    Propose MSE Grpc Channel Authorization (#16475)
---
 .../query/access/AuthorizationInterceptor.java     |  45 +++++
 .../pinot/query/access/QueryAccessControl.java     |  36 ++++
 .../query/access/QueryAccessControlFactory.java    |  68 ++++++++
 .../apache/pinot/query/mailbox/MailboxService.java |  12 +-
 .../query/mailbox/channel/GrpcMailboxServer.java   |  15 +-
 .../pinot/query/service/server/QueryServer.java    |  32 +++-
 .../query/service/server/QueryServerAuthzTest.java | 183 +++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 8 files changed, 384 insertions(+), 9 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/AuthorizationInterceptor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/AuthorizationInterceptor.java
new file mode 100644
index 00000000000..448697dbbb4
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/AuthorizationInterceptor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.access;
+
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.Status;
+
+public class AuthorizationInterceptor implements ServerInterceptor {
+  private final QueryAccessControlFactory _accessControlFactory;
+
+  public AuthorizationInterceptor(QueryAccessControlFactory 
accessControlFactory) {
+    _accessControlFactory = accessControlFactory;
+  }
+
+  @Override
+  public <T, R> ServerCall.Listener<T> interceptCall(ServerCall<T, R> call, 
Metadata headers,
+      ServerCallHandler<T, R> next) {
+    if (!_accessControlFactory.create().hasAccess(call.getAttributes(), 
headers)) {
+      call.close(Status.PERMISSION_DENIED.withDescription("MSE Access 
Denied"), headers);
+      // Skip any future operations since we have closed the call
+      return new ServerCall.Listener<T>() {
+      };
+    }
+    return next.startCall(call, headers);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControl.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControl.java
new file mode 100644
index 00000000000..69059b93a6e
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControl.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.access;
+
+import io.grpc.Attributes;
+import io.grpc.Metadata;
+
+
+public interface QueryAccessControl {
+  /**
+   * Return whether the client can call the QueryServer or GrpcMailboxServer
+   * This is similar to pinot-server AccessControl.isAuthorizedChannel but for 
multi-stage grpc calls
+   * It is intended for inter-service authorization between pinot components 
rather than fine-grained
+   * access control.
+   *
+   * @param attributes GRPC Attributes, potentially containing client 
certificates
+   * @param metadata GRPC metadata, containing headers
+   */
+  boolean hasAccess(Attributes attributes, Metadata metadata);
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControlFactory.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControlFactory.java
new file mode 100644
index 00000000000..f33e2cc6088
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControlFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.access;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_CHANNEL_ACCESS_CONTROL_FACTORY_CLASS;
+
+
[email protected]
[email protected]
+public abstract class QueryAccessControlFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryAccessControlFactory.class);
+  protected void init(PinotConfiguration configuration) {
+  }
+
+  public abstract QueryAccessControl create();
+
+  /**
+   * Build a QueryAccessControlFactory from a `PinotConfiguration`.
+   *
+   * @param configuration Populated PinotConfiguration
+   * @return Concrete QueryAccessControlFactory instance or null if there is 
an error
+   */
+  @Nullable
+  public static QueryAccessControlFactory fromConfig(PinotConfiguration 
configuration) {
+    String configuredClass = 
configuration.getProperty(CONFIG_OF_MULTI_STAGE_CHANNEL_ACCESS_CONTROL_FACTORY_CLASS);
+    if (configuredClass == null) {
+      LOGGER.info("No configured QueryAccessControlFactory. You can set {} to 
set one.",
+          CONFIG_OF_MULTI_STAGE_CHANNEL_ACCESS_CONTROL_FACTORY_CLASS);
+      return null;
+    }
+    try {
+      QueryAccessControlFactory factory = 
PluginManager.get().createInstance(configuredClass);
+      LOGGER.info("Built QueryAccessControlFactory from configured class {}", 
configuredClass);
+      factory.init(configuration);
+      return factory;
+    } catch (Exception ex) {
+      LOGGER.error("Caught exception attempting to load 
QueryAccessControlFactory {}", configuredClass, ex);
+      // TODO: Consider raising a RunTime error to capture if a user's 
provided AccessControlFactory is not able
+      //     to load. Returning null implies that we should not check access 
at all (failing open) but generally
+      //     users who configure this value would want us to fail closed 
(raising an exception)
+      return null;
+    }
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index a81c1c6f448..8e572afe478 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.query.access.QueryAccessControlFactory;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -65,15 +66,21 @@ public class MailboxService {
   private final PinotConfiguration _config;
   private final ChannelManager _channelManager;
   @Nullable private final TlsConfig _tlsConfig;
+  @Nullable private final QueryAccessControlFactory _accessControlFactory;
   private final int _maxByteStringSize;
 
   private GrpcMailboxServer _grpcMailboxServer;
 
   public MailboxService(String hostname, int port, PinotConfiguration config) {
-    this(hostname, port, config, null);
+    this(hostname, port, config, null, null);
   }
 
   public MailboxService(String hostname, int port, PinotConfiguration config, 
@Nullable TlsConfig tlsConfig) {
+    this(hostname, port, config, tlsConfig, null);
+  }
+
+  public MailboxService(String hostname, int port, PinotConfiguration config, 
@Nullable TlsConfig tlsConfig, @Nullable
+  QueryAccessControlFactory accessControlFactory) {
     _hostname = hostname;
     _port = port;
     _config = config;
@@ -83,6 +90,7 @@ public class MailboxService {
         
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
     );
     _channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize);
+    _accessControlFactory = accessControlFactory;
     boolean splitBlocks = config.getProperty(
         
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
         
CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT);
@@ -100,7 +108,7 @@ public class MailboxService {
    */
   public void start() {
     LOGGER.info("Starting GrpcMailboxServer");
-    _grpcMailboxServer = new GrpcMailboxServer(this, _config, _tlsConfig);
+    _grpcMailboxServer = new GrpcMailboxServer(this, _config, _tlsConfig, 
_accessControlFactory);
     _grpcMailboxServer.start();
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index dcdbf299d4a..ff771110141 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -35,6 +35,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
+import org.apache.pinot.query.access.AuthorizationInterceptor;
+import org.apache.pinot.query.access.QueryAccessControlFactory;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -52,9 +54,13 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
   private final MailboxService _mailboxService;
   private final Server _server;
 
-  public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration 
config, @Nullable TlsConfig tlsConfig) {
+  public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration 
config, @Nullable TlsConfig tlsConfig,
+      @Nullable QueryAccessControlFactory accessControlFactory) {
     _mailboxService = mailboxService;
     int port = mailboxService.getPort();
+    if (accessControlFactory == null) {
+      accessControlFactory = QueryAccessControlFactory.fromConfig(config);
+    }
 
     PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true);
     PooledByteBufAllocatorMetric metric = bufAllocator.metric();
@@ -83,8 +89,11 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
     }
 
     NettyServerBuilder builder = NettyServerBuilder
-        .forPort(port)
-        .intercept(new MailboxServerInterceptor())
+        .forPort(port).intercept(new MailboxServerInterceptor());
+    if (accessControlFactory != null) {
+      builder.intercept(new AuthorizationInterceptor(accessControlFactory));
+    }
+    builder
         .addService(this)
         .withOption(ChannelOption.ALLOCATOR, bufAllocator)
         .withChildOption(ChannelOption.ALLOCATOR, bufAllocator)
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index a3ab6471c93..69fafea0e8e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -44,6 +44,8 @@ import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.query.MseWorkerThreadContext;
+import org.apache.pinot.query.access.AuthorizationInterceptor;
+import org.apache.pinot.query.access.QueryAccessControlFactory;
 import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
 import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
 import org.apache.pinot.query.routing.StageMetadata;
@@ -79,6 +81,8 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   private final QueryRunner _queryRunner;
   @Nullable
   private final TlsConfig _tlsConfig;
+  @Nullable
+  private final QueryAccessControlFactory _accessControlFactory;
   // query submission service is only used for plan submission for now.
   // TODO: with complex query submission logic we should allow asynchronous 
query submission return instead of
   //   directly return from submission response observer.
@@ -101,19 +105,35 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   @VisibleForTesting
   public QueryServer(int port, QueryRunner queryRunner, @Nullable TlsConfig 
tlsConfig) {
-    this("unknownServer", port, queryRunner, tlsConfig, new 
PinotConfiguration());
+    this("unknownServer", port, queryRunner, tlsConfig, new 
PinotConfiguration(), null);
+  }
+
+  @VisibleForTesting
+  public QueryServer(int port, QueryRunner queryRunner, @Nullable TlsConfig 
tlsConfig,
+      QueryAccessControlFactory accessControlFactory) {
+    this("unknownServer", port, queryRunner, tlsConfig, new 
PinotConfiguration(), accessControlFactory);
   }
 
   public QueryServer(String instanceId, int port, QueryRunner queryRunner, 
@Nullable TlsConfig tlsConfig) {
-    this(instanceId, port, queryRunner, tlsConfig, new PinotConfiguration());
+    this(instanceId, port, queryRunner, tlsConfig, new PinotConfiguration(), 
null);
   }
 
   public QueryServer(String instanceId, int port, QueryRunner queryRunner, 
@Nullable TlsConfig tlsConfig,
       PinotConfiguration config) {
+    this(instanceId, port, queryRunner, tlsConfig, config, null);
+  }
+
+  public QueryServer(String instanceId, int port, QueryRunner queryRunner, 
@Nullable TlsConfig tlsConfig,
+      PinotConfiguration config, @Nullable QueryAccessControlFactory 
accessControlFactory) {
     _instanceId = instanceId;
     _port = port;
     _queryRunner = queryRunner;
     _tlsConfig = tlsConfig;
+    if (accessControlFactory == null) {
+      _accessControlFactory = QueryAccessControlFactory.fromConfig(config);
+    } else {
+      _accessControlFactory = accessControlFactory;
+    }
 
     ExecutorService baseExecutorService = ExecutorServiceUtils.create(config,
         CommonConstants.Server.MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX,
@@ -160,9 +180,13 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   }
 
   private <T extends ServerBuilder<T>> Server buildGrpcServer(ServerBuilder<T> 
builder) {
-    return builder
+    builder
          // By using directExecutor, GRPC doesn't need to manage its own 
thread pool
-        .directExecutor()
+        .directExecutor();
+    if (_accessControlFactory != null) {
+      builder.intercept(new AuthorizationInterceptor(_accessControlFactory));
+    }
+    return builder
         .addService(this)
         .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
         .build();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerAuthzTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerAuthzTest.java
new file mode 100644
index 00000000000..101e0d91a0a
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerAuthzTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.server;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import io.grpc.Attributes;
+import io.grpc.Deadline;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.MetadataUtils;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.access.QueryAccessControl;
+import org.apache.pinot.query.access.QueryAccessControlFactory;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
+import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
+import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertSame;
+
+
+public class QueryServerAuthzTest {
+  private static final String AUTH_HEADER = "authorization";
+  private static final String SECRET = "my-shared-secret";
+  private static final Metadata.Key<String> AUTH_METADATA_KEY = 
Metadata.Key.of(AUTH_HEADER, ASCII_STRING_MARSHALLER);
+  private static class DenyAllAccessControlFactory extends 
QueryAccessControlFactory {
+    public static final QueryAccessControl DENY_ALL_ACCESS = new 
QueryAccessControl() {
+      @Override
+      public boolean hasAccess(Attributes attributes, Metadata metadata) {
+        String authorization = metadata.get(AUTH_METADATA_KEY);
+        return SECRET.equals(authorization);
+      }
+    };
+
+    public QueryAccessControl create() {
+      return DENY_ALL_ACCESS;
+    }
+  }
+  private static final Random RANDOM_REQUEST_ID_GEN = new Random();
+  private static final int QUERY_SERVER_COUNT = 2;
+  private static final String KEY_OF_SERVER_INSTANCE_HOST = 
"pinot.query.runner.server.hostname";
+  private static final String KEY_OF_SERVER_INSTANCE_PORT = 
"pinot.query.runner.server.port";
+
+  private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
+
+  private QueryEnvironment _queryEnvironment;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
+      int availablePort = QueryTestUtils.getAvailablePort();
+      QueryRunner queryRunner = mock(QueryRunner.class);
+      QueryServer queryServer = new QueryServer(availablePort, queryRunner, 
null, new DenyAllAccessControlFactory());
+      queryServer.start();
+      _queryServerMap.put(availablePort, queryServer);
+    }
+
+    List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
+
+    // reducer port doesn't matter, we are testing the worker instance not 
GRPC.
+    _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, 
portList.get(0), portList.get(1),
+        QueryEnvironmentTestBase.TABLE_SCHEMAS, 
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
+        QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (QueryServer worker : _queryServerMap.values()) {
+      worker.shutdown();
+    }
+  }
+
+  @Test
+  public void testAccessDeniedError()
+      throws Exception {
+    DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM 
a");
+    Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+    Map<String, String> requestMetadata = 
QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+    try {
+      submitRequest(queryRequest, requestMetadata, "the-wrong-password");
+    } catch (StatusRuntimeException ex) {
+      assertSame(ex.getStatus().getCode(), Status.PERMISSION_DENIED.getCode(),
+          "Expected permission denied from DenyAllQueryAccessControl");
+      return;
+    }
+    throw new RuntimeException("Expected DenyAllQueryAccessControl to raise a 
StatusRuntimeException");
+  }
+
+  @Test
+  public void testAccessAllowed()
+      throws Exception {
+    DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM 
a");
+    Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+    Map<String, String> requestMetadata = 
QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+    submitRequest(queryRequest, requestMetadata, SECRET);
+  }
+
+  private Worker.QueryResponse submitRequest(Worker.QueryRequest queryRequest, 
Map<String, String> requestMetadata,
+      String secretKey) {
+    String host = requestMetadata.get(KEY_OF_SERVER_INSTANCE_HOST);
+    int port = 
Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT));
+    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+    ManagedChannel channel = ManagedChannelBuilder.forAddress(host, 
port).usePlaintext().build();
+    Metadata headers = new Metadata();
+    headers.put(AUTH_METADATA_KEY, secretKey);
+    PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub = 
PinotQueryWorkerGrpc
+        .newBlockingStub(channel)
+        .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
+    Worker.QueryResponse resp =
+        stub.withDeadline(Deadline.after(timeoutMs, 
TimeUnit.MILLISECONDS)).submit(queryRequest);
+    channel.shutdown();
+    return resp;
+  }
+
+  private Worker.QueryRequest getQueryRequest(DispatchableSubPlan queryPlan, 
int stageId) {
+    DispatchablePlanFragment stagePlan = 
queryPlan.getQueryStageMap().get(stageId);
+    Plan.PlanNode rootNode = 
PlanNodeSerializer.process(stagePlan.getPlanFragment().getFragmentRoot());
+    List<Worker.WorkerMetadata> workerMetadataList =
+        
QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan.getWorkerMetadataList());
+    ByteString customProperty = 
QueryPlanSerDeUtils.toProtoProperties(stagePlan.getCustomProperties());
+
+    // this particular test set requires the request to have a single 
QueryServerInstance to dispatch to
+    // as it is not testing the multi-tenancy dispatch (which is in the 
QueryDispatcherTest)
+    QueryServerInstance serverInstance = 
stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next();
+    Worker.StageMetadata stageMetadata =
+        
Worker.StageMetadata.newBuilder().setStageId(stageId).addAllWorkerMetadata(workerMetadataList)
+            .setCustomProperty(customProperty).build();
+    Worker.StagePlan protoStagePlan =
+        
Worker.StagePlan.newBuilder().setRootNode(rootNode.toByteString()).setStageMetadata(stageMetadata).build();
+
+    Map<String, String> requestMetadata = new HashMap<>();
+    // the default configurations that must exist.
+    requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
+        String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+    
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
+        String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+    // extra configurations we want to test also parsed out correctly.
+    requestMetadata.put(KEY_OF_SERVER_INSTANCE_HOST, 
serverInstance.getHostname());
+    requestMetadata.put(KEY_OF_SERVER_INSTANCE_PORT, 
Integer.toString(serverInstance.getQueryServicePort()));
+
+    return Worker.QueryRequest.newBuilder().addStagePlan(protoStagePlan)
+        
.setMetadata(QueryPlanSerDeUtils.toProtoProperties(requestMetadata)).build();
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index cdf5f378f7e..10cf20e7ab4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -266,6 +266,8 @@ public class CommonConstants {
 
     public static final String CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED = 
"pinot.multistage.engine.tls.enabled";
     public static final boolean DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED = false;
+    public static final String 
CONFIG_OF_MULTI_STAGE_CHANNEL_ACCESS_CONTROL_FACTORY_CLASS =
+        "pinot.multistage.engine.channel.auth.factory.class";
 
     // This is a "beta" config and can be changed or even removed in future 
releases.
     public static final String 
CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to