This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4ff4eea3113 Fix usage of compaction supervisor when
enableInputSourceSecurity is true (#18043)
4ff4eea3113 is described below
commit 4ff4eea311399b14b23eca5f30344076b2b2fbe8
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu May 29 10:42:27 2025 +0530
Fix usage of compaction supervisor when enableInputSourceSecurity is true
(#18043)
Changes:
- Update `CompactionSupervisorSpec.getInputSourceResources()` to return
empty since it doesn't
read from any external source (the write datasource is authorized
separately in SupervisorResource).
- Update ScheduledBatchSupervisorSpec.getInputSourceResources() to return
since it launches MSQ tasks
whose input sources are authorized in the SQL layer.
- Add utility method `AuthorizationUtils.createExternalResourceReadAction`.
- Refactor to use the above utility method where applicable.
---
.../apache/druid/indexing/kafka/KafkaIndexTask.java | 18 ++++++++++--------
.../apache/druid/indexing/kafka/KafkaSamplerSpec.java | 9 +--------
.../indexing/kafka/supervisor/KafkaSupervisorSpec.java | 10 ++--------
.../druid/indexing/kinesis/KinesisIndexTask.java | 14 ++++++--------
.../druid/indexing/kinesis/KinesisSamplerSpec.java | 9 +--------
.../kinesis/supervisor/KinesisSupervisorSpec.java | 10 ++--------
.../druid/indexing/common/task/HadoopIndexTask.java | 6 +-----
.../apache/druid/indexing/common/task/IndexTask.java | 5 +----
.../batch/parallel/ParallelIndexSupervisorTask.java | 4 +---
.../parallel/PartialDimensionCardinalityTask.java | 6 ++----
.../parallel/PartialDimensionDistributionTask.java | 6 ++----
.../batch/parallel/PartialHashSegmentGenerateTask.java | 6 ++----
.../parallel/PartialRangeSegmentGenerateTask.java | 6 ++----
.../common/task/batch/parallel/SinglePhaseSubTask.java | 5 +----
.../indexing/compact/CompactionSupervisorSpec.java | 12 ++++++++++++
.../overlord/sampler/IndexTaskSamplerSpec.java | 6 ++----
.../scheduledbatch/ScheduledBatchSupervisorSpec.java | 12 ++++++++++++
.../indexing/compact/CompactionSupervisorSpecTest.java | 11 +++++++++++
.../ScheduledBatchSupervisorSpecTest.java | 17 +++++++++++++++++
.../org/apache/druid/error/DruidExceptionMatcher.java | 8 ++++----
.../indexing/overlord/supervisor/SupervisorSpec.java | 9 +++------
.../druid/server/security/AuthorizationUtils.java | 12 ++++++++++++
.../overlord/supervisor/SupervisorSpecTest.java | 13 ++++++++++---
.../sql/calcite/external/DruidExternTableMacro.java | 8 +++-----
.../external/SchemaAwareUserDefinedTableMacro.java | 7 ++-----
25 files changed, 122 insertions(+), 107 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index c7f477ebb50..3f64a6d05e5 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -33,13 +33,10 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -48,6 +45,14 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<KafkaTopicPartition,
{
private static final String TYPE = "index_kafka";
+ /**
+ * Resources that a {@link KafkaIndexTask} is authorized to use. Includes
+ * performing a read action on external resource of type
+ */
+ public static final Set<ResourceAction> INPUT_SOURCE_RESOURCES = Set.of(
+
AuthorizationUtils.createExternalResourceReadAction(KafkaIndexTaskModule.SCHEME)
+ );
+
private final ObjectMapper configMapper;
// This value can be tuned in some tests
@@ -154,10 +159,7 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<KafkaTopicPartition,
@Override
public Set<ResourceAction> getInputSourceResources()
{
- return Collections.singleton(new ResourceAction(
- new Resource(KafkaIndexTaskModule.SCHEME, ResourceType.EXTERNAL),
- Action.READ
- ));
+ return INPUT_SOURCE_RESOURCES;
}
@Override
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
index e0683fb605b..d718e35d7cd 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
@@ -30,14 +30,10 @@ import
org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -93,9 +89,6 @@ public class KafkaSamplerSpec extends
SeekableStreamSamplerSpec
@Nonnull
public Set<ResourceAction> getInputSourceResources() throws UOE
{
- return Collections.singleton(new ResourceAction(
- new Resource(KafkaIndexTaskModule.SCHEME, ResourceType.EXTERNAL),
- Action.READ
- ));
+ return KafkaIndexTask.INPUT_SOURCE_RESOURCES;
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 13b3972f2ec..4962ae393cc 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
@@ -40,14 +41,10 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -109,10 +106,7 @@ public class KafkaSupervisorSpec extends
SeekableStreamSupervisorSpec
@Override
public Set<ResourceAction> getInputSourceResources()
{
- return Collections.singleton(new ResourceAction(
- new Resource(TASK_TYPE, ResourceType.EXTERNAL),
- Action.READ
- ));
+ return KafkaIndexTask.INPUT_SOURCE_RESOURCES;
}
@Override
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index f9ca54dd996..766f2958766 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -35,14 +35,11 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.utils.RuntimeInfo;
import javax.annotation.Nonnull;
-import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -50,6 +47,10 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, Ki
{
private static final String TYPE = "index_kinesis";
+ public static final Set<ResourceAction> INPUT_SOURCE_RESOURCES = Set.of(
+
AuthorizationUtils.createExternalResourceReadAction(KinesisIndexingServiceModule.SCHEME)
+ );
+
// GetRecords returns maximum 10MB per call
//
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
private static final long GET_RECORDS_MAX_BYTES_PER_CALL = 10_000_000L;
@@ -173,10 +174,7 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, Ki
@Override
public Set<ResourceAction> getInputSourceResources()
{
- return Collections.singleton(new ResourceAction(
- new Resource(KinesisIndexingServiceModule.SCHEME,
ResourceType.EXTERNAL),
- Action.READ
- ));
+ return INPUT_SOURCE_RESOURCES;
}
@Override
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
index 81f8b774f04..80751745d64 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -31,14 +31,10 @@ import
org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.Set;
public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
@@ -94,9 +90,6 @@ public class KinesisSamplerSpec extends
SeekableStreamSamplerSpec
@Override
public Set<ResourceAction> getInputSourceResources() throws UOE
{
- return Collections.singleton(new ResourceAction(
- new Resource(KinesisIndexingServiceModule.SCHEME,
ResourceType.EXTERNAL),
- Action.READ
- ));
+ return KinesisIndexTask.INPUT_SOURCE_RESOURCES;
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index 4aa60ea0327..ba6bb3ad851 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -39,14 +40,10 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -127,10 +124,7 @@ public class KinesisSupervisorSpec extends
SeekableStreamSupervisorSpec
@Override
public Set<ResourceAction> getInputSourceResources()
{
- return Collections.singleton(new ResourceAction(
- new Resource(SUPERVISOR_TYPE, ResourceType.EXTERNAL),
- Action.READ
- ));
+ return KinesisIndexTask.INPUT_SOURCE_RESOURCES;
}
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index e894080a569..01696af04d0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -62,12 +62,9 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.util.ToolRunner;
@@ -87,7 +84,6 @@ import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -202,7 +198,7 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
@Override
public Set<ResourceAction> getInputSourceResources()
{
- return Collections.singleton(new ResourceAction(new
Resource(INPUT_SOURCE_TYPE, ResourceType.EXTERNAL), Action.READ));
+ return
Set.of(AuthorizationUtils.createExternalResourceReadAction(INPUT_SOURCE_TYPE));
}
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 530d21e5f1d..d9eb9152b54 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -93,12 +93,9 @@ import
org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -311,7 +308,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler, Pe
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
- .map(i -> new ResourceAction(new Resource(i,
ResourceType.EXTERNAL), Action.READ))
+ .map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 50378db7d69..1d134f81ddb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -82,9 +82,7 @@ import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublis
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -294,7 +292,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
- .map(i -> new ResourceAction(new Resource(i,
ResourceType.EXTERNAL), Action.READ))
+
.map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index 6a47af05adb..ff8bb94cf79 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -45,10 +45,8 @@ import
org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.partition.HashPartitioner;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -142,7 +140,7 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
- .map(i -> new ResourceAction(new Resource(i,
ResourceType.EXTERNAL), Action.READ))
+
.map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index f206c985080..151b1ac4cbf 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -51,10 +51,8 @@ import
org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
@@ -183,7 +181,7 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
- .map(i -> new ResourceAction(new Resource(i,
ResourceType.EXTERNAL), Action.READ))
+
.map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 92bba5b3862..ed8cfa75581 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -35,10 +35,8 @@ import org.apache.druid.indexing.common.task.TaskResource;
import
org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
import
org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.Interval;
@@ -138,7 +136,7 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
- .map(i -> new ResourceAction(new Resource(i,
ResourceType.EXTERNAL), Action.READ))
+
.map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 933df9ee778..be4ca7efb20 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -37,10 +37,8 @@ import org.apache.druid.indexing.common.task.TaskResource;
import
org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
import
org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.joda.time.Interval;
@@ -156,7 +154,7 @@ public class PartialRangeSegmentGenerateTask extends
PartialSegmentGenerateTask<
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
- .map(i -> new ResourceAction(new Resource(i,
ResourceType.EXTERNAL), Action.READ))
+
.map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 17ed802b86f..d021dccbbac 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -65,12 +65,9 @@ import
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResul
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
-import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
@@ -201,7 +198,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
- .map(i -> new ResourceAction(new Resource(i,
ResourceType.EXTERNAL), Action.READ))
+
.map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
index 71864c33322..dc5803596ec 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
@@ -26,11 +26,14 @@ import org.apache.druid.common.config.Configs;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.security.ResourceAction;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
public class CompactionSupervisorSpec implements SupervisorSpec
{
@@ -120,6 +123,15 @@ public class CompactionSupervisorSpec implements
SupervisorSpec
return "";
}
+ @Nonnull
+ @Override
+ public Set<ResourceAction> getInputSourceResources() throws
UnsupportedOperationException
+ {
+ // No external resource is read. The datasource being written to is
authorized
+ // separately in SupervisorResource itself
+ return Set.of();
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
index c78d28c8c82..cd27eb75bf7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
@@ -31,10 +31,8 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -103,7 +101,7 @@ public class IndexTaskSamplerSpec implements SamplerSpec
{
return inputSource.getTypes()
.stream()
- .map(i -> new ResourceAction(new Resource(i,
ResourceType.EXTERNAL), Action.READ))
+
.map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet());
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java
index a5cc072aa7a..0ceed2df8e0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java
@@ -33,10 +33,13 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.explain.ExplainAttributes;
import org.apache.druid.query.explain.ExplainPlan;
import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.server.security.ResourceAction;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
public class ScheduledBatchSupervisorSpec implements SupervisorSpec
@@ -189,6 +192,15 @@ public class ScheduledBatchSupervisorSpec implements
SupervisorSpec
return "";
}
+ @Nonnull
+ @Override
+ public Set<ResourceAction> getInputSourceResources() throws
UnsupportedOperationException
+ {
+ // Scheduled supervisor currently launches MSQ tasks for which
+ // the input sources are determined in the SQL layer.
+ return Set.of();
+ }
+
public CronSchedulerConfig getSchedulerConfig()
{
return schedulerConfig;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
index 21c2dda3cd8..b1732acabbb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
@@ -216,6 +216,17 @@ public class CompactionSupervisorSpecTest
Assert.assertEquals(activeSpec.getDataSources(),
suspendedSpec.getDataSources());
}
+ @Test
+ public void test_getInputSourceResources_returnsEmpty()
+ {
+ final CompactionSupervisorSpec supervisorSpec = new
CompactionSupervisorSpec(
+
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
+ true,
+ scheduler
+ );
+ Assert.assertTrue(supervisorSpec.getInputSourceResources().isEmpty());
+ }
+
private void testSerde(CompactionSupervisorSpec spec)
{
try {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpecTest.java
index 36717f16d0f..5ceb571f4be 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpecTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.query.explain.ExplainAttributes;
import org.apache.druid.query.explain.ExplainPlan;
import org.apache.druid.query.http.ClientSqlQuery;
import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -225,6 +226,22 @@ public class ScheduledBatchSupervisorSpecTest
);
}
+ @Test
+ public void test_getInputSourceResources_returnsEmpty()
+ {
+ final ScheduledBatchSupervisorSpec supervisorSpec = new
ScheduledBatchSupervisorSpec(
+ query,
+ new UnixCronSchedulerConfig("* * * * *"),
+ true,
+ null,
+ null,
+ OBJECT_MAPPER,
+ scheduler,
+ brokerClient
+ );
+ Assert.assertTrue(supervisorSpec.getInputSourceResources().isEmpty());
+ }
+
private void testSerde(final ScheduledBatchSupervisorSpec spec)
{
try {
diff --git
a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
index dc8e7d73724..cdaf88785ec 100644
--- a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
+++ b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
@@ -40,12 +40,12 @@ public class DruidExceptionMatcher extends
DiagnosingMatcher<Throwable>
);
}
- public static DruidExceptionMatcher notFound()
+ public static DruidExceptionMatcher unsupported()
{
return new DruidExceptionMatcher(
- DruidException.Persona.USER,
- DruidException.Category.NOT_FOUND,
- "notFound"
+ DruidException.Persona.OPERATOR,
+ DruidException.Category.UNSUPPORTED,
+ "general"
);
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index a1a4aaaae62..9ff217d5404 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -24,8 +24,6 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.error.DruidException;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.UOE;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
@@ -88,10 +86,9 @@ public interface SupervisorSpec
@Nonnull
default Set<ResourceAction> getInputSourceResources() throws
UnsupportedOperationException
{
- throw new UOE(StringUtils.format(
- "SuperviserSpec type [%s], does not support input source based
security",
- getType()
- ));
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.UNSUPPORTED)
+ .build("Supervisor type[%s] does not support input
source based security", getType());
}
/**
diff --git
a/server/src/main/java/org/apache/druid/server/security/AuthorizationUtils.java
b/server/src/main/java/org/apache/druid/server/security/AuthorizationUtils.java
index 94ae93e3adb..2bdd0d586b7 100644
---
a/server/src/main/java/org/apache/druid/server/security/AuthorizationUtils.java
+++
b/server/src/main/java/org/apache/druid/server/security/AuthorizationUtils.java
@@ -487,6 +487,18 @@ public class AuthorizationUtils
}
}
+ /**
+ * Creates a {@link ResourceAction} to {@link Action#READ read} an
+ * {@link ResourceType#EXTERNAL external} resource.
+ */
+ public static ResourceAction createExternalResourceReadAction(String
resourceName)
+ {
+ return new ResourceAction(
+ new Resource(resourceName, ResourceType.EXTERNAL),
+ Action.READ
+ );
+ }
+
/**
* This method constructs a 'superuser' set of permissions composed of
{@link Action#READ} and {@link Action#WRITE}
* permissions for all known {@link ResourceType#knownTypes()} for any
{@link Authorizer} implementation which is
diff --git
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
index 4bbc9dbd9ae..8ba97c4d651 100644
---
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
+++
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
@@ -19,7 +19,9 @@
package org.apache.druid.indexing.overlord.supervisor;
-import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
@@ -50,7 +52,7 @@ public class SupervisorSpecTest
@Override
public String getType()
{
- return null;
+ return "abc";
}
@Override
@@ -63,6 +65,11 @@ public class SupervisorSpecTest
@Test
public void test()
{
- Assert.assertThrows(UOE.class, () ->
SUPERVISOR_SPEC.getInputSourceResources());
+ MatcherAssert.assertThat(
+ Assert.assertThrows(DruidException.class,
SUPERVISOR_SPEC::getInputSourceResources),
+ DruidExceptionMatcher.unsupported().expectMessageIs(
+ "Supervisor type[abc] does not support input source based security"
+ )
+ );
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
index d698efc1817..2b8559538e9 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
@@ -25,10 +25,8 @@ import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.NlsString;
import org.apache.druid.data.input.InputSource;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.table.DruidTable;
import javax.validation.constraints.NotNull;
@@ -58,7 +56,7 @@ public class DruidExternTableMacro extends
DruidUserDefinedTableMacro
try {
InputSource inputSource = ((DruidTableMacro)
macro).getJsonMapper().readValue(inputSourceStr, InputSource.class);
return inputSource.getTypes().stream()
- .map(inputSourceType -> new ResourceAction(new
Resource(inputSourceType, ResourceType.EXTERNAL), Action.READ))
+ .map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet());
}
catch (JsonProcessingException e) {
@@ -71,7 +69,7 @@ public class DruidExternTableMacro extends
DruidUserDefinedTableMacro
private String getInputSourceArgument(final SqlCall call)
{
// this covers case where parameters are used positionally
- if (call.getOperandList().size() > 0) {
+ if (!call.getOperandList().isEmpty()) {
if (call.getOperandList().get(0) instanceof SqlCharStringLiteral) {
return ((SqlCharStringLiteral) call.getOperandList().get(0)).toValue();
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
index b5b0c392833..a861d68cd9a 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
@@ -37,10 +37,8 @@ import org.apache.calcite.sql.type.SqlOperandMetadata;
import org.apache.calcite.sql.type.SqlOperandTypeInference;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
import org.apache.druid.sql.calcite.table.ExternalTable;
@@ -169,8 +167,7 @@ public abstract class SchemaAwareUserDefinedTableMacro
if (table instanceof ExternalTable && inputSourceTypeSecurityEnabled) {
resourceActions.addAll(((ExternalTable) table)
.getInputSourceTypeSupplier().get().stream()
- .map(inputSourceType ->
- new ResourceAction(new
Resource(inputSourceType, ResourceType.EXTERNAL), Action.READ))
+
.map(AuthorizationUtils::createExternalResourceReadAction)
.collect(Collectors.toSet()));
} else {
resourceActions.addAll(base.computeResources(call,
inputSourceTypeSecurityEnabled));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]