This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c6fea33d07a Propose MSE Grpc Channel Authorization (#16475)
c6fea33d07a is described below
commit c6fea33d07ae1ee844ae49d335304fa8195afdd8
Author: Dino Occhialini <[email protected]>
AuthorDate: Thu Aug 21 00:38:15 2025 -0700
Propose MSE Grpc Channel Authorization (#16475)
---
.../query/access/AuthorizationInterceptor.java | 45 +++++
.../pinot/query/access/QueryAccessControl.java | 36 ++++
.../query/access/QueryAccessControlFactory.java | 68 ++++++++
.../apache/pinot/query/mailbox/MailboxService.java | 12 +-
.../query/mailbox/channel/GrpcMailboxServer.java | 15 +-
.../pinot/query/service/server/QueryServer.java | 32 +++-
.../query/service/server/QueryServerAuthzTest.java | 183 +++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
8 files changed, 384 insertions(+), 9 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/AuthorizationInterceptor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/AuthorizationInterceptor.java
new file mode 100644
index 00000000000..448697dbbb4
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/AuthorizationInterceptor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.access;
+
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.Status;
+
+public class AuthorizationInterceptor implements ServerInterceptor {
+ private final QueryAccessControlFactory _accessControlFactory;
+
+ public AuthorizationInterceptor(QueryAccessControlFactory
accessControlFactory) {
+ _accessControlFactory = accessControlFactory;
+ }
+
+ @Override
+ public <T, R> ServerCall.Listener<T> interceptCall(ServerCall<T, R> call,
Metadata headers,
+ ServerCallHandler<T, R> next) {
+ if (!_accessControlFactory.create().hasAccess(call.getAttributes(),
headers)) {
+ call.close(Status.PERMISSION_DENIED.withDescription("MSE Access
Denied"), headers);
+ // Skip any future operations since we have closed the call
+ return new ServerCall.Listener<T>() {
+ };
+ }
+ return next.startCall(call, headers);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControl.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControl.java
new file mode 100644
index 00000000000..69059b93a6e
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControl.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.access;
+
+import io.grpc.Attributes;
+import io.grpc.Metadata;
+
+
+public interface QueryAccessControl {
+ /**
+ * Return whether the client can call the QueryServer or GrpcMailboxServer
+ * This is similar to pinot-server AccessControl.isAuthorizedChannel but for
multi-stage grpc calls
+ * It is intended for inter-service authorization between pinot components
rather than fine-grained
+ * access control.
+ *
+ * @param attributes GRPC Attributes, potentially containing client
certificates
+ * @param metadata GRPC metadata, containing headers
+ */
+ boolean hasAccess(Attributes attributes, Metadata metadata);
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControlFactory.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControlFactory.java
new file mode 100644
index 00000000000..f33e2cc6088
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/access/QueryAccessControlFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.access;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_CHANNEL_ACCESS_CONTROL_FACTORY_CLASS;
+
+
[email protected]
[email protected]
+public abstract class QueryAccessControlFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryAccessControlFactory.class);
+ protected void init(PinotConfiguration configuration) {
+ }
+
+ public abstract QueryAccessControl create();
+
+ /**
+ * Build a QueryAccessControlFactory from a `PinotConfiguration`.
+ *
+ * @param configuration Populated PinotConfiguration
+ * @return Concrete QueryAccessControlFactory instance or null if there is
an error
+ */
+ @Nullable
+ public static QueryAccessControlFactory fromConfig(PinotConfiguration
configuration) {
+ String configuredClass =
configuration.getProperty(CONFIG_OF_MULTI_STAGE_CHANNEL_ACCESS_CONTROL_FACTORY_CLASS);
+ if (configuredClass == null) {
+ LOGGER.info("No configured QueryAccessControlFactory. You can set {} to
set one.",
+ CONFIG_OF_MULTI_STAGE_CHANNEL_ACCESS_CONTROL_FACTORY_CLASS);
+ return null;
+ }
+ try {
+ QueryAccessControlFactory factory =
PluginManager.get().createInstance(configuredClass);
+ LOGGER.info("Built QueryAccessControlFactory from configured class {}",
configuredClass);
+ factory.init(configuration);
+ return factory;
+ } catch (Exception ex) {
+ LOGGER.error("Caught exception attempting to load
QueryAccessControlFactory {}", configuredClass, ex);
+ // TODO: Consider raising a RunTime error to capture if a user's
provided AccessControlFactory is not able
+ // to load. Returning null implies that we should not check access
at all (failing open) but generally
+ // users who configure this value would want us to fail closed
(raising an exception)
+ return null;
+ }
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index a81c1c6f448..8e572afe478 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.query.access.QueryAccessControlFactory;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -65,15 +66,21 @@ public class MailboxService {
private final PinotConfiguration _config;
private final ChannelManager _channelManager;
@Nullable private final TlsConfig _tlsConfig;
+ @Nullable private final QueryAccessControlFactory _accessControlFactory;
private final int _maxByteStringSize;
private GrpcMailboxServer _grpcMailboxServer;
public MailboxService(String hostname, int port, PinotConfiguration config) {
- this(hostname, port, config, null);
+ this(hostname, port, config, null, null);
}
public MailboxService(String hostname, int port, PinotConfiguration config,
@Nullable TlsConfig tlsConfig) {
+ this(hostname, port, config, tlsConfig, null);
+ }
+
+ public MailboxService(String hostname, int port, PinotConfiguration config,
@Nullable TlsConfig tlsConfig, @Nullable
+ QueryAccessControlFactory accessControlFactory) {
_hostname = hostname;
_port = port;
_config = config;
@@ -83,6 +90,7 @@ public class MailboxService {
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
);
_channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize);
+ _accessControlFactory = accessControlFactory;
boolean splitBlocks = config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT);
@@ -100,7 +108,7 @@ public class MailboxService {
*/
public void start() {
LOGGER.info("Starting GrpcMailboxServer");
- _grpcMailboxServer = new GrpcMailboxServer(this, _config, _tlsConfig);
+ _grpcMailboxServer = new GrpcMailboxServer(this, _config, _tlsConfig,
_accessControlFactory);
_grpcMailboxServer.start();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index dcdbf299d4a..ff771110141 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -35,6 +35,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
+import org.apache.pinot.query.access.AuthorizationInterceptor;
+import org.apache.pinot.query.access.QueryAccessControlFactory;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -52,9 +54,13 @@ public class GrpcMailboxServer extends
PinotMailboxGrpc.PinotMailboxImplBase {
private final MailboxService _mailboxService;
private final Server _server;
- public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration
config, @Nullable TlsConfig tlsConfig) {
+ public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration
config, @Nullable TlsConfig tlsConfig,
+ @Nullable QueryAccessControlFactory accessControlFactory) {
_mailboxService = mailboxService;
int port = mailboxService.getPort();
+ if (accessControlFactory == null) {
+ accessControlFactory = QueryAccessControlFactory.fromConfig(config);
+ }
PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true);
PooledByteBufAllocatorMetric metric = bufAllocator.metric();
@@ -83,8 +89,11 @@ public class GrpcMailboxServer extends
PinotMailboxGrpc.PinotMailboxImplBase {
}
NettyServerBuilder builder = NettyServerBuilder
- .forPort(port)
- .intercept(new MailboxServerInterceptor())
+ .forPort(port).intercept(new MailboxServerInterceptor());
+ if (accessControlFactory != null) {
+ builder.intercept(new AuthorizationInterceptor(accessControlFactory));
+ }
+ builder
.addService(this)
.withOption(ChannelOption.ALLOCATOR, bufAllocator)
.withChildOption(ChannelOption.ALLOCATOR, bufAllocator)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index a3ab6471c93..69fafea0e8e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -44,6 +44,8 @@ import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.MseWorkerThreadContext;
+import org.apache.pinot.query.access.AuthorizationInterceptor;
+import org.apache.pinot.query.access.QueryAccessControlFactory;
import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.StageMetadata;
@@ -79,6 +81,8 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
private final QueryRunner _queryRunner;
@Nullable
private final TlsConfig _tlsConfig;
+ @Nullable
+ private final QueryAccessControlFactory _accessControlFactory;
// query submission service is only used for plan submission for now.
// TODO: with complex query submission logic we should allow asynchronous
query submission return instead of
// directly return from submission response observer.
@@ -101,19 +105,35 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
@VisibleForTesting
public QueryServer(int port, QueryRunner queryRunner, @Nullable TlsConfig
tlsConfig) {
- this("unknownServer", port, queryRunner, tlsConfig, new
PinotConfiguration());
+ this("unknownServer", port, queryRunner, tlsConfig, new
PinotConfiguration(), null);
+ }
+
+ @VisibleForTesting
+ public QueryServer(int port, QueryRunner queryRunner, @Nullable TlsConfig
tlsConfig,
+ QueryAccessControlFactory accessControlFactory) {
+ this("unknownServer", port, queryRunner, tlsConfig, new
PinotConfiguration(), accessControlFactory);
}
public QueryServer(String instanceId, int port, QueryRunner queryRunner,
@Nullable TlsConfig tlsConfig) {
- this(instanceId, port, queryRunner, tlsConfig, new PinotConfiguration());
+ this(instanceId, port, queryRunner, tlsConfig, new PinotConfiguration(),
null);
}
public QueryServer(String instanceId, int port, QueryRunner queryRunner,
@Nullable TlsConfig tlsConfig,
PinotConfiguration config) {
+ this(instanceId, port, queryRunner, tlsConfig, config, null);
+ }
+
+ public QueryServer(String instanceId, int port, QueryRunner queryRunner,
@Nullable TlsConfig tlsConfig,
+ PinotConfiguration config, @Nullable QueryAccessControlFactory
accessControlFactory) {
_instanceId = instanceId;
_port = port;
_queryRunner = queryRunner;
_tlsConfig = tlsConfig;
+ if (accessControlFactory == null) {
+ _accessControlFactory = QueryAccessControlFactory.fromConfig(config);
+ } else {
+ _accessControlFactory = accessControlFactory;
+ }
ExecutorService baseExecutorService = ExecutorServiceUtils.create(config,
CommonConstants.Server.MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX,
@@ -160,9 +180,13 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
}
private <T extends ServerBuilder<T>> Server buildGrpcServer(ServerBuilder<T>
builder) {
- return builder
+ builder
// By using directExecutor, GRPC doesn't need to manage its own
thread pool
- .directExecutor()
+ .directExecutor();
+ if (_accessControlFactory != null) {
+ builder.intercept(new AuthorizationInterceptor(_accessControlFactory));
+ }
+ return builder
.addService(this)
.maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerAuthzTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerAuthzTest.java
new file mode 100644
index 00000000000..101e0d91a0a
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerAuthzTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.server;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import io.grpc.Attributes;
+import io.grpc.Deadline;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.MetadataUtils;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.access.QueryAccessControl;
+import org.apache.pinot.query.access.QueryAccessControlFactory;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
+import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
+import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertSame;
+
+
+public class QueryServerAuthzTest {
+ private static final String AUTH_HEADER = "authorization";
+ private static final String SECRET = "my-shared-secret";
+ private static final Metadata.Key<String> AUTH_METADATA_KEY =
Metadata.Key.of(AUTH_HEADER, ASCII_STRING_MARSHALLER);
+ private static class DenyAllAccessControlFactory extends
QueryAccessControlFactory {
+ public static final QueryAccessControl DENY_ALL_ACCESS = new
QueryAccessControl() {
+ @Override
+ public boolean hasAccess(Attributes attributes, Metadata metadata) {
+ String authorization = metadata.get(AUTH_METADATA_KEY);
+ return SECRET.equals(authorization);
+ }
+ };
+
+ public QueryAccessControl create() {
+ return DENY_ALL_ACCESS;
+ }
+ }
+ private static final Random RANDOM_REQUEST_ID_GEN = new Random();
+ private static final int QUERY_SERVER_COUNT = 2;
+ private static final String KEY_OF_SERVER_INSTANCE_HOST =
"pinot.query.runner.server.hostname";
+ private static final String KEY_OF_SERVER_INSTANCE_PORT =
"pinot.query.runner.server.port";
+
+ private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
+
+ private QueryEnvironment _queryEnvironment;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
+ int availablePort = QueryTestUtils.getAvailablePort();
+ QueryRunner queryRunner = mock(QueryRunner.class);
+ QueryServer queryServer = new QueryServer(availablePort, queryRunner,
null, new DenyAllAccessControlFactory());
+ queryServer.start();
+ _queryServerMap.put(availablePort, queryServer);
+ }
+
+ List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
+
+ // reducer port doesn't matter, we are testing the worker instance not
GRPC.
+ _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1,
portList.get(0), portList.get(1),
+ QueryEnvironmentTestBase.TABLE_SCHEMAS,
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
+ QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ for (QueryServer worker : _queryServerMap.values()) {
+ worker.shutdown();
+ }
+ }
+
+ @Test
+ public void testAccessDeniedError()
+ throws Exception {
+ DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM
a");
+ Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+ Map<String, String> requestMetadata =
QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+ try {
+ submitRequest(queryRequest, requestMetadata, "the-wrong-password");
+ } catch (StatusRuntimeException ex) {
+ assertSame(ex.getStatus().getCode(), Status.PERMISSION_DENIED.getCode(),
+ "Expected permission denied from DenyAllQueryAccessControl");
+ return;
+ }
+ throw new RuntimeException("Expected DenyAllQueryAccessControl to raise a
StatusRuntimeException");
+ }
+
+ @Test
+ public void testAccessAllowed()
+ throws Exception {
+ DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM
a");
+ Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+ Map<String, String> requestMetadata =
QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+ submitRequest(queryRequest, requestMetadata, SECRET);
+ }
+
+ private Worker.QueryResponse submitRequest(Worker.QueryRequest queryRequest,
Map<String, String> requestMetadata,
+ String secretKey) {
+ String host = requestMetadata.get(KEY_OF_SERVER_INSTANCE_HOST);
+ int port =
Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT));
+ long timeoutMs =
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build();
+ Metadata headers = new Metadata();
+ headers.put(AUTH_METADATA_KEY, secretKey);
+ PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub =
PinotQueryWorkerGrpc
+ .newBlockingStub(channel)
+ .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
+ Worker.QueryResponse resp =
+ stub.withDeadline(Deadline.after(timeoutMs,
TimeUnit.MILLISECONDS)).submit(queryRequest);
+ channel.shutdown();
+ return resp;
+ }
+
+ private Worker.QueryRequest getQueryRequest(DispatchableSubPlan queryPlan,
int stageId) {
+ DispatchablePlanFragment stagePlan =
queryPlan.getQueryStageMap().get(stageId);
+ Plan.PlanNode rootNode =
PlanNodeSerializer.process(stagePlan.getPlanFragment().getFragmentRoot());
+ List<Worker.WorkerMetadata> workerMetadataList =
+
QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan.getWorkerMetadataList());
+ ByteString customProperty =
QueryPlanSerDeUtils.toProtoProperties(stagePlan.getCustomProperties());
+
+ // this particular test set requires the request to have a single
QueryServerInstance to dispatch to
+ // as it is not testing the multi-tenancy dispatch (which is in the
QueryDispatcherTest)
+ QueryServerInstance serverInstance =
stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next();
+ Worker.StageMetadata stageMetadata =
+
Worker.StageMetadata.newBuilder().setStageId(stageId).addAllWorkerMetadata(workerMetadataList)
+ .setCustomProperty(customProperty).build();
+ Worker.StagePlan protoStagePlan =
+
Worker.StagePlan.newBuilder().setRootNode(rootNode.toByteString()).setStageMetadata(stageMetadata).build();
+
+ Map<String, String> requestMetadata = new HashMap<>();
+ // the default configurations that must exist.
+ requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
+ String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
+ String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+ // extra configurations we want to test also parsed out correctly.
+ requestMetadata.put(KEY_OF_SERVER_INSTANCE_HOST,
serverInstance.getHostname());
+ requestMetadata.put(KEY_OF_SERVER_INSTANCE_PORT,
Integer.toString(serverInstance.getQueryServicePort()));
+
+ return Worker.QueryRequest.newBuilder().addStagePlan(protoStagePlan)
+
.setMetadata(QueryPlanSerDeUtils.toProtoProperties(requestMetadata)).build();
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index cdf5f378f7e..10cf20e7ab4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -266,6 +266,8 @@ public class CommonConstants {
public static final String CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED =
"pinot.multistage.engine.tls.enabled";
public static final boolean DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED = false;
+ public static final String
CONFIG_OF_MULTI_STAGE_CHANNEL_ACCESS_CONTROL_FACTORY_CLASS =
+ "pinot.multistage.engine.channel.auth.factory.class";
// This is a "beta" config and can be changed or even removed in future
releases.
public static final String
CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]