This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ff4eea3113 Fix usage of compaction supervisor when 
enableInputSourceSecurity is true (#18043)
4ff4eea3113 is described below

commit 4ff4eea311399b14b23eca5f30344076b2b2fbe8
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu May 29 10:42:27 2025 +0530

    Fix usage of compaction supervisor when enableInputSourceSecurity is true 
(#18043)
    
    Changes:
    - Update `CompactionSupervisorSpec.getInputSourceResources()` to return 
empty since it doesn't
    read from any external source (the write datasource is authorized 
separately in SupervisorResource).
    - Update ScheduledBatchSupervisorSpec.getInputSourceResources() to return 
since it launches MSQ tasks
    whose input sources are authorized in the SQL layer.
    - Add utility method `AuthorizationUtils.createExternalResourceReadAction`.
    - Refactor to use the above utility method where applicable.
---
 .../apache/druid/indexing/kafka/KafkaIndexTask.java    | 18 ++++++++++--------
 .../apache/druid/indexing/kafka/KafkaSamplerSpec.java  |  9 +--------
 .../indexing/kafka/supervisor/KafkaSupervisorSpec.java | 10 ++--------
 .../druid/indexing/kinesis/KinesisIndexTask.java       | 14 ++++++--------
 .../druid/indexing/kinesis/KinesisSamplerSpec.java     |  9 +--------
 .../kinesis/supervisor/KinesisSupervisorSpec.java      | 10 ++--------
 .../druid/indexing/common/task/HadoopIndexTask.java    |  6 +-----
 .../apache/druid/indexing/common/task/IndexTask.java   |  5 +----
 .../batch/parallel/ParallelIndexSupervisorTask.java    |  4 +---
 .../parallel/PartialDimensionCardinalityTask.java      |  6 ++----
 .../parallel/PartialDimensionDistributionTask.java     |  6 ++----
 .../batch/parallel/PartialHashSegmentGenerateTask.java |  6 ++----
 .../parallel/PartialRangeSegmentGenerateTask.java      |  6 ++----
 .../common/task/batch/parallel/SinglePhaseSubTask.java |  5 +----
 .../indexing/compact/CompactionSupervisorSpec.java     | 12 ++++++++++++
 .../overlord/sampler/IndexTaskSamplerSpec.java         |  6 ++----
 .../scheduledbatch/ScheduledBatchSupervisorSpec.java   | 12 ++++++++++++
 .../indexing/compact/CompactionSupervisorSpecTest.java | 11 +++++++++++
 .../ScheduledBatchSupervisorSpecTest.java              | 17 +++++++++++++++++
 .../org/apache/druid/error/DruidExceptionMatcher.java  |  8 ++++----
 .../indexing/overlord/supervisor/SupervisorSpec.java   |  9 +++------
 .../druid/server/security/AuthorizationUtils.java      | 12 ++++++++++++
 .../overlord/supervisor/SupervisorSpecTest.java        | 13 ++++++++++---
 .../sql/calcite/external/DruidExternTableMacro.java    |  8 +++-----
 .../external/SchemaAwareUserDefinedTableMacro.java     |  7 ++-----
 25 files changed, 122 insertions(+), 107 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index c7f477ebb50..3f64a6d05e5 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -33,13 +33,10 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 
 import javax.annotation.Nonnull;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -48,6 +45,14 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<KafkaTopicPartition,
 {
   private static final String TYPE = "index_kafka";
 
+  /**
+   * Resources that a {@link KafkaIndexTask} is authorized to use. Includes
+   * performing a read action on external resource of type
+   */
+  public static final Set<ResourceAction> INPUT_SOURCE_RESOURCES = Set.of(
+      
AuthorizationUtils.createExternalResourceReadAction(KafkaIndexTaskModule.SCHEME)
+  );
+
   private final ObjectMapper configMapper;
 
   // This value can be tuned in some tests
@@ -154,10 +159,7 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<KafkaTopicPartition,
   @Override
   public Set<ResourceAction> getInputSourceResources()
   {
-    return Collections.singleton(new ResourceAction(
-        new Resource(KafkaIndexTaskModule.SCHEME, ResourceType.EXTERNAL),
-        Action.READ
-    ));
+    return INPUT_SOURCE_RESOURCES;
   }
 
   @Override
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
index e0683fb605b..d718e35d7cd 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
@@ -30,14 +30,10 @@ import 
org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
 import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
 import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -93,9 +89,6 @@ public class KafkaSamplerSpec extends 
SeekableStreamSamplerSpec
   @Nonnull
   public Set<ResourceAction> getInputSourceResources() throws UOE
   {
-    return Collections.singleton(new ResourceAction(
-        new Resource(KafkaIndexTaskModule.SCHEME, ResourceType.EXTERNAL),
-        Action.READ
-    ));
+    return KafkaIndexTask.INPUT_SOURCE_RESOURCES;
   }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 13b3972f2ec..4962ae393cc 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.kafka.KafkaIndexTask;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
@@ -40,14 +41,10 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
@@ -109,10 +106,7 @@ public class KafkaSupervisorSpec extends 
SeekableStreamSupervisorSpec
   @Override
   public Set<ResourceAction> getInputSourceResources()
   {
-    return Collections.singleton(new ResourceAction(
-        new Resource(TASK_TYPE, ResourceType.EXTERNAL),
-        Action.READ
-    ));
+    return KafkaIndexTask.INPUT_SOURCE_RESOURCES;
   }
 
   @Override
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index f9ca54dd996..766f2958766 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -35,14 +35,11 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.utils.RuntimeInfo;
 
 import javax.annotation.Nonnull;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
@@ -50,6 +47,10 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, Ki
 {
   private static final String TYPE = "index_kinesis";
 
+  public static final Set<ResourceAction> INPUT_SOURCE_RESOURCES = Set.of(
+      
AuthorizationUtils.createExternalResourceReadAction(KinesisIndexingServiceModule.SCHEME)
+  );
+
   // GetRecords returns maximum 10MB per call
   // 
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
   private static final long GET_RECORDS_MAX_BYTES_PER_CALL = 10_000_000L;
@@ -173,10 +174,7 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, Ki
   @Override
   public Set<ResourceAction> getInputSourceResources()
   {
-    return Collections.singleton(new ResourceAction(
-        new Resource(KinesisIndexingServiceModule.SCHEME, 
ResourceType.EXTERNAL),
-        Action.READ
-    ));
+    return INPUT_SOURCE_RESOURCES;
   }
 
   @Override
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
index 81f8b774f04..80751745d64 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -31,14 +31,10 @@ import 
org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
 import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
 import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import java.util.Collections;
 import java.util.Set;
 
 public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
@@ -94,9 +90,6 @@ public class KinesisSamplerSpec extends 
SeekableStreamSamplerSpec
   @Override
   public Set<ResourceAction> getInputSourceResources() throws UOE
   {
-    return Collections.singleton(new ResourceAction(
-        new Resource(KinesisIndexingServiceModule.SCHEME, 
ResourceType.EXTERNAL),
-        Action.READ
-    ));
+    return KinesisIndexTask.INPUT_SOURCE_RESOURCES;
   }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index 4aa60ea0327..ba6bb3ad851 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.name.Named;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.kinesis.KinesisIndexTask;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
 import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -39,14 +40,10 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
@@ -127,10 +124,7 @@ public class KinesisSupervisorSpec extends 
SeekableStreamSupervisorSpec
   @Override
   public Set<ResourceAction> getInputSourceResources()
   {
-    return Collections.singleton(new ResourceAction(
-        new Resource(SUPERVISOR_TYPE, ResourceType.EXTERNAL),
-        Action.READ
-    ));
+    return KinesisIndexTask.INPUT_SOURCE_RESOURCES;
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index e894080a569..01696af04d0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -62,12 +62,9 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.realtime.ChatHandler;
 import org.apache.druid.segment.realtime.ChatHandlerProvider;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.util.ToolRunner;
@@ -87,7 +84,6 @@ import java.io.File;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -202,7 +198,7 @@ public class HadoopIndexTask extends HadoopTask implements 
ChatHandler
   @Override
   public Set<ResourceAction> getInputSourceResources()
   {
-    return Collections.singleton(new ResourceAction(new 
Resource(INPUT_SOURCE_TYPE, ResourceType.EXTERNAL), Action.READ));
+    return 
Set.of(AuthorizationUtils.createExternalResourceReadAction(INPUT_SOURCE_TYPE));
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 530d21e5f1d..d9eb9152b54 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -93,12 +93,9 @@ import 
org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import 
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
 import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -311,7 +308,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler, Pe
     return getIngestionSchema().getIOConfig().getInputSource() != null ?
            getIngestionSchema().getIOConfig().getInputSource().getTypes()
                .stream()
-               .map(i -> new ResourceAction(new Resource(i, 
ResourceType.EXTERNAL), Action.READ))
+               .map(AuthorizationUtils::createExternalResourceReadAction)
                .collect(Collectors.toSet()) :
            ImmutableSet.of();
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 50378db7d69..1d134f81ddb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -82,9 +82,7 @@ import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublis
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.BuildingShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -294,7 +292,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
     return getIngestionSchema().getIOConfig().getInputSource() != null ?
            getIngestionSchema().getIOConfig().getInputSource().getTypes()
                                .stream()
-                               .map(i -> new ResourceAction(new Resource(i, 
ResourceType.EXTERNAL), Action.READ))
+                               
.map(AuthorizationUtils::createExternalResourceReadAction)
                                .collect(Collectors.toSet()) :
            ImmutableSet.of();
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index 6a47af05adb..ff8bb94cf79 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -45,10 +45,8 @@ import 
org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.partition.HashPartitioner;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -142,7 +140,7 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
     return getIngestionSchema().getIOConfig().getInputSource() != null ?
            getIngestionSchema().getIOConfig().getInputSource().getTypes()
                                .stream()
-                               .map(i -> new ResourceAction(new Resource(i, 
ResourceType.EXTERNAL), Action.READ))
+                               
.map(AuthorizationUtils::createExternalResourceReadAction)
                                .collect(Collectors.toSet()) :
            ImmutableSet.of();
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index f206c985080..151b1ac4cbf 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -51,10 +51,8 @@ import 
org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.joda.time.Interval;
 
 import javax.annotation.Nonnull;
@@ -183,7 +181,7 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
     return getIngestionSchema().getIOConfig().getInputSource() != null ?
            getIngestionSchema().getIOConfig().getInputSource().getTypes()
                                .stream()
-                               .map(i -> new ResourceAction(new Resource(i, 
ResourceType.EXTERNAL), Action.READ))
+                               
.map(AuthorizationUtils::createExternalResourceReadAction)
                                .collect(Collectors.toSet()) :
            ImmutableSet.of();
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 92bba5b3862..ed8cfa75581 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -35,10 +35,8 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
 import 
org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
 import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.joda.time.Interval;
@@ -138,7 +136,7 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
     return getIngestionSchema().getIOConfig().getInputSource() != null ?
            getIngestionSchema().getIOConfig().getInputSource().getTypes()
                                .stream()
-                               .map(i -> new ResourceAction(new Resource(i, 
ResourceType.EXTERNAL), Action.READ))
+                               
.map(AuthorizationUtils::createExternalResourceReadAction)
                                .collect(Collectors.toSet()) :
            ImmutableSet.of();
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 933df9ee778..be4ca7efb20 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -37,10 +37,8 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
 import 
org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
 import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 import org.joda.time.Interval;
@@ -156,7 +154,7 @@ public class PartialRangeSegmentGenerateTask extends 
PartialSegmentGenerateTask<
     return getIngestionSchema().getIOConfig().getInputSource() != null ?
            getIngestionSchema().getIOConfig().getInputSource().getTypes()
                                .stream()
-                               .map(i -> new ResourceAction(new Resource(i, 
ResourceType.EXTERNAL), Action.READ))
+                               
.map(AuthorizationUtils::createExternalResourceReadAction)
                                .collect(Collectors.toSet()) :
            ImmutableSet.of();
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 17ed802b86f..d021dccbbac 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -65,12 +65,9 @@ import 
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResul
 import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import 
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.TimelineObjectHolder;
@@ -201,7 +198,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
     return getIngestionSchema().getIOConfig().getInputSource() != null ?
            getIngestionSchema().getIOConfig().getInputSource().getTypes()
                                .stream()
-                               .map(i -> new ResourceAction(new Resource(i, 
ResourceType.EXTERNAL), Action.READ))
+                               
.map(AuthorizationUtils::createExternalResourceReadAction)
                                .collect(Collectors.toSet()) :
            ImmutableSet.of();
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
index 71864c33322..dc5803596ec 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
@@ -26,11 +26,14 @@ import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.security.ResourceAction;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 public class CompactionSupervisorSpec implements SupervisorSpec
 {
@@ -120,6 +123,15 @@ public class CompactionSupervisorSpec implements 
SupervisorSpec
     return "";
   }
 
+  @Nonnull
+  @Override
+  public Set<ResourceAction> getInputSourceResources() throws 
UnsupportedOperationException
+  {
+    // No external resource is read. The datasource being written to is 
authorized
+    // separately in SupervisorResource itself
+    return Set.of();
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
index c78d28c8c82..cd27eb75bf7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
@@ -31,10 +31,8 @@ import org.apache.druid.data.input.InputSource;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -103,7 +101,7 @@ public class IndexTaskSamplerSpec implements SamplerSpec
   {
     return inputSource.getTypes()
                       .stream()
-                      .map(i -> new ResourceAction(new Resource(i, 
ResourceType.EXTERNAL), Action.READ))
+                      
.map(AuthorizationUtils::createExternalResourceReadAction)
                       .collect(Collectors.toSet());
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java
index a5cc072aa7a..0ceed2df8e0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java
@@ -33,10 +33,13 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.explain.ExplainAttributes;
 import org.apache.druid.query.explain.ExplainPlan;
 import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.server.security.ResourceAction;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 
 public class ScheduledBatchSupervisorSpec implements SupervisorSpec
@@ -189,6 +192,15 @@ public class ScheduledBatchSupervisorSpec implements 
SupervisorSpec
     return "";
   }
 
+  @Nonnull
+  @Override
+  public Set<ResourceAction> getInputSourceResources() throws 
UnsupportedOperationException
+  {
+    // Scheduled supervisor currently launches MSQ tasks for which
+    // the input sources are determined in the SQL layer.
+    return Set.of();
+  }
+
   public CronSchedulerConfig getSchedulerConfig()
   {
     return schedulerConfig;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
index 21c2dda3cd8..b1732acabbb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
@@ -216,6 +216,17 @@ public class CompactionSupervisorSpecTest
     Assert.assertEquals(activeSpec.getDataSources(), 
suspendedSpec.getDataSources());
   }
 
+  @Test
+  public void test_getInputSourceResources_returnsEmpty()
+  {
+    final CompactionSupervisorSpec supervisorSpec = new 
CompactionSupervisorSpec(
+        
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
+        true,
+        scheduler
+    );
+    Assert.assertTrue(supervisorSpec.getInputSourceResources().isEmpty());
+  }
+
   private void testSerde(CompactionSupervisorSpec spec)
   {
     try {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpecTest.java
index 36717f16d0f..5ceb571f4be 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpecTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.query.explain.ExplainAttributes;
 import org.apache.druid.query.explain.ExplainPlan;
 import org.apache.druid.query.http.ClientSqlQuery;
 import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -225,6 +226,22 @@ public class ScheduledBatchSupervisorSpecTest
     );
   }
 
+  @Test
+  public void test_getInputSourceResources_returnsEmpty()
+  {
+    final ScheduledBatchSupervisorSpec supervisorSpec = new 
ScheduledBatchSupervisorSpec(
+        query,
+        new UnixCronSchedulerConfig("* * * * *"),
+        true,
+        null,
+        null,
+        OBJECT_MAPPER,
+        scheduler,
+        brokerClient
+    );
+    Assert.assertTrue(supervisorSpec.getInputSourceResources().isEmpty());
+  }
+
   private void testSerde(final ScheduledBatchSupervisorSpec spec)
   {
     try {
diff --git 
a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java 
b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
index dc8e7d73724..cdaf88785ec 100644
--- a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
+++ b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
@@ -40,12 +40,12 @@ public class DruidExceptionMatcher extends 
DiagnosingMatcher<Throwable>
     );
   }
 
-  public static DruidExceptionMatcher notFound()
+  public static DruidExceptionMatcher unsupported()
   {
     return new DruidExceptionMatcher(
-        DruidException.Persona.USER,
-        DruidException.Category.NOT_FOUND,
-        "notFound"
+        DruidException.Persona.OPERATOR,
+        DruidException.Category.UNSUPPORTED,
+        "general"
     );
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index a1a4aaaae62..9ff217d5404 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -24,8 +24,6 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.druid.error.DruidException;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.server.security.ResourceAction;
 
 import javax.annotation.Nonnull;
@@ -88,10 +86,9 @@ public interface SupervisorSpec
   @Nonnull
   default Set<ResourceAction> getInputSourceResources() throws 
UnsupportedOperationException
   {
-    throw new UOE(StringUtils.format(
-        "SuperviserSpec type [%s], does not support input source based 
security",
-        getType()
-    ));
+    throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                        .ofCategory(DruidException.Category.UNSUPPORTED)
+                        .build("Supervisor type[%s] does not support input 
source based security", getType());
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/server/security/AuthorizationUtils.java 
b/server/src/main/java/org/apache/druid/server/security/AuthorizationUtils.java
index 94ae93e3adb..2bdd0d586b7 100644
--- 
a/server/src/main/java/org/apache/druid/server/security/AuthorizationUtils.java
+++ 
b/server/src/main/java/org/apache/druid/server/security/AuthorizationUtils.java
@@ -487,6 +487,18 @@ public class AuthorizationUtils
     }
   }
 
+  /**
+   * Creates a {@link ResourceAction} to {@link Action#READ read} an
+   * {@link ResourceType#EXTERNAL external} resource.
+   */
+  public static ResourceAction createExternalResourceReadAction(String 
resourceName)
+  {
+    return new ResourceAction(
+        new Resource(resourceName, ResourceType.EXTERNAL),
+        Action.READ
+    );
+  }
+
   /**
    * This method constructs a 'superuser' set of permissions composed of 
{@link Action#READ} and {@link Action#WRITE}
    * permissions for all known {@link ResourceType#knownTypes()} for any 
{@link Authorizer} implementation which is
diff --git 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
index 4bbc9dbd9ae..8ba97c4d651 100644
--- 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
+++ 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
@@ -19,7 +19,9 @@
 
 package org.apache.druid.indexing.overlord.supervisor;
 
-import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.hamcrest.MatcherAssert;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -50,7 +52,7 @@ public class SupervisorSpecTest
     @Override
     public String getType()
     {
-      return null;
+      return "abc";
     }
 
     @Override
@@ -63,6 +65,11 @@ public class SupervisorSpecTest
   @Test
   public void test()
   {
-    Assert.assertThrows(UOE.class, () -> 
SUPERVISOR_SPEC.getInputSourceResources());
+    MatcherAssert.assertThat(
+        Assert.assertThrows(DruidException.class, 
SUPERVISOR_SPEC::getInputSourceResources),
+        DruidExceptionMatcher.unsupported().expectMessageIs(
+            "Supervisor type[abc] does not support input source based security"
+        )
+    );
   }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
index d698efc1817..2b8559538e9 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
@@ -25,10 +25,8 @@ import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.util.NlsString;
 import org.apache.druid.data.input.InputSource;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.table.DruidTable;
 
 import javax.validation.constraints.NotNull;
@@ -58,7 +56,7 @@ public class DruidExternTableMacro extends 
DruidUserDefinedTableMacro
     try {
       InputSource inputSource = ((DruidTableMacro) 
macro).getJsonMapper().readValue(inputSourceStr, InputSource.class);
       return inputSource.getTypes().stream()
-          .map(inputSourceType -> new ResourceAction(new 
Resource(inputSourceType, ResourceType.EXTERNAL), Action.READ))
+          .map(AuthorizationUtils::createExternalResourceReadAction)
           .collect(Collectors.toSet());
     }
     catch (JsonProcessingException e) {
@@ -71,7 +69,7 @@ public class DruidExternTableMacro extends 
DruidUserDefinedTableMacro
   private String getInputSourceArgument(final SqlCall call)
   {
     // this covers case where parameters are used positionally
-    if (call.getOperandList().size() > 0) {
+    if (!call.getOperandList().isEmpty()) {
       if (call.getOperandList().get(0) instanceof SqlCharStringLiteral) {
         return ((SqlCharStringLiteral) call.getOperandList().get(0)).toValue();
       }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
index b5b0c392833..a861d68cd9a 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
@@ -37,10 +37,8 @@ import org.apache.calcite.sql.type.SqlOperandMetadata;
 import org.apache.calcite.sql.type.SqlOperandTypeInference;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
 import org.apache.druid.sql.calcite.table.ExternalTable;
 
@@ -169,8 +167,7 @@ public abstract class SchemaAwareUserDefinedTableMacro
       if (table instanceof ExternalTable && inputSourceTypeSecurityEnabled) {
         resourceActions.addAll(((ExternalTable) table)
                                    .getInputSourceTypeSupplier().get().stream()
-                                   .map(inputSourceType ->
-                                 new ResourceAction(new 
Resource(inputSourceType, ResourceType.EXTERNAL), Action.READ))
+                                   
.map(AuthorizationUtils::createExternalResourceReadAction)
                                    .collect(Collectors.toSet()));
       } else {
         resourceActions.addAll(base.computeResources(call, 
inputSourceTypeSecurityEnabled));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to