This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ef39a321fc Remove redundant field 
SeekableStreamIndexTask.authorizerMapper (#17928)
0ef39a321fc is described below

commit 0ef39a321fcd143068f5c2040955f7647443c325
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Apr 17 02:19:58 2025 +0530

    Remove redundant field SeekableStreamIndexTask.authorizerMapper (#17928)
    
    * Remove redundant field SeekableStreamIndexTask.authorizerMapper
    
    * Fix test
---
 ...entalPublishingRabbitStreamIndexTaskRunner.java | 10 ++---
 .../rabbitstream/RabbitStreamIndexTask.java        |  4 +-
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  3 --
 .../druid/indexing/kafka/KafkaIndexTask.java       |  1 -
 .../druid/indexing/kinesis/KinesisIndexTask.java   |  1 -
 .../indexing/kinesis/KinesisIndexTaskRunner.java   |  9 +----
 .../seekablestream/SeekableStreamIndexTask.java    |  5 ---
 .../SeekableStreamIndexTaskRunner.java             |  5 ++-
 .../SeekableStreamIndexTaskRunnerAuthTest.java     | 45 +++++++++++++++++-----
 .../SeekableStreamIndexTaskRunnerTest.java         | 10 ++---
 10 files changed, 51 insertions(+), 42 deletions(-)

diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java
index 75663d23063..51f9fd901a0 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java
@@ -35,12 +35,10 @@ import 
org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.security.AuthorizerMapper;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -60,14 +58,14 @@ public class 
IncrementalPublishingRabbitStreamIndexTaskRunner
   IncrementalPublishingRabbitStreamIndexTaskRunner(
       RabbitStreamIndexTask task,
       @Nullable InputRowParser<ByteBuffer> parser,
-      AuthorizerMapper authorizerMapper,
-      LockGranularity lockGranularityToUse)
+      LockGranularity lockGranularityToUse
+  )
   {
     super(
         task,
         parser,
-        authorizerMapper,
-        lockGranularityToUse);
+        lockGranularityToUse
+    );
     this.task = task;
   }
 
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
index 6e9fab2c60a..4fb452e0b2b 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
@@ -89,8 +89,8 @@ public class RabbitStreamIndexTask extends 
SeekableStreamIndexTask<String, Long,
     return new IncrementalPublishingRabbitStreamIndexTaskRunner(
         this,
         dataSchema.getParser(),
-        authorizerMapper,
-        lockGranularityToUse);
+        lockGranularityToUse
+    );
   }
 
   @Override
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index b16f825308f..73270c18576 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -37,7 +37,6 @@ import 
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.utils.CollectionUtils;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.common.TopicPartition;
@@ -66,14 +65,12 @@ public class IncrementalPublishingKafkaIndexTaskRunner 
extends SeekableStreamInd
   IncrementalPublishingKafkaIndexTaskRunner(
       KafkaIndexTask task,
       @Nullable InputRowParser<ByteBuffer> parser,
-      AuthorizerMapper authorizerMapper,
       LockGranularity lockGranularityToUse
   )
   {
     super(
         task,
         parser,
-        authorizerMapper,
         lockGranularityToUse
     );
     this.task = task;
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index e5ff77467cd..c7f477ebb50 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -93,7 +93,6 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<KafkaTopicPartition,
     return new IncrementalPublishingKafkaIndexTaskRunner(
         this,
         dataSchema.getParser(),
-        authorizerMapper,
         lockGranularityToUse
     );
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index bea69d96c3d..f9ca54dd996 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -106,7 +106,6 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, Ki
     return new KinesisIndexTaskRunner(
         this,
         dataSchema.getParser(),
-        authorizerMapper,
         lockGranularityToUse
     );
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index dfca894d281..1c6bdabe0c4 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -36,7 +36,6 @@ import 
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.security.AuthorizerMapper;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -59,16 +58,10 @@ public class KinesisIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<String
   KinesisIndexTaskRunner(
       KinesisIndexTask task,
       @Nullable InputRowParser<ByteBuffer> parser,
-      AuthorizerMapper authorizerMapper,
       LockGranularity lockGranularityToUse
   )
   {
-    super(
-        task,
-        parser,
-        authorizerMapper,
-        lockGranularityToUse
-    );
+    super(task, parser, lockGranularityToUse);
     this.task = task;
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 7b10c0afdcc..1abff409229 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -52,9 +52,7 @@ import org.apache.druid.segment.realtime.ChatHandler;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
-import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import javax.annotation.Nullable;
 import java.util.Map;
@@ -78,9 +76,6 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   // By the way, lazily init is synchronized because the runner may be needed 
in multiple threads.
   private final Supplier<SeekableStreamIndexTaskRunner<PartitionIdType, 
SequenceOffsetType, ?>> runnerSupplier;
 
-  @MonotonicNonNull
-  protected AuthorizerMapper authorizerMapper;
-
   public SeekableStreamIndexTask(
       final String id,
       @Nullable final TaskResource taskResource,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index cb5214eb6d9..ae6f1d3f979 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -255,7 +255,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   public SeekableStreamIndexTaskRunner(
       final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, 
RecordType> task,
       @Nullable final InputRowParser<ByteBuffer> parser,
-      final AuthorizerMapper authorizerMapper,
       final LockGranularity lockGranularityToUse
   )
   {
@@ -266,7 +265,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     this.inputRowSchema = InputRowSchemas.fromDataSchema(task.getDataSchema());
     this.inputFormat = ioConfig.getInputFormat();
     this.parser = parser;
-    this.authorizerMapper = authorizerMapper;
     this.stream = ioConfig.getStartSequenceNumbers().getStream();
     this.endOffsets = new 
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
     this.sequences = new CopyOnWriteArrayList<>();
@@ -1452,6 +1450,9 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
    */
   private void authorizationCheck(final HttpServletRequest request)
   {
+    if (authorizerMapper == null) {
+      throw DruidException.defensive("Cannot authorize request since 
AuthorizerMapper is not initialized yet.");
+    }
     AuthorizationUtils.verifyUnrestrictedAccessToDatasource(request, 
task.getDataSource(), authorizerMapper);
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
index 89ee5e6ca35..1dd70b07336 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
@@ -28,13 +28,20 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthConfig;
@@ -45,6 +52,7 @@ import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.server.security.ResourceType;
 import org.easymock.EasyMock;
 import org.joda.time.Duration;
+import org.joda.time.Period;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -94,14 +102,16 @@ public class SeekableStreamIndexTaskRunnerAuthTest
           // - Datasource Read User requests Read access
           // - or, Datasource Write User requests Write access
           if (resource.getType().equals(ResourceType.DATASOURCE)) {
-            return new Access(
-                (action == Action.READ && 
username.equals(Users.DATASOURCE_READ))
-                || (action == Action.WRITE && 
username.equals(Users.DATASOURCE_WRITE))
-            );
+            if ((action == Action.READ && 
username.equals(Users.DATASOURCE_READ))
+                || (action == Action.WRITE && 
username.equals(Users.DATASOURCE_WRITE))) {
+              return Access.allow();
+            } else {
+              return Access.DENIED;
+            }
           }
 
           // Do not allow access to any other resource
-          return new Access(false);
+          return Access.DENIED;
         };
       }
     };
@@ -114,12 +124,30 @@ public class SeekableStreamIndexTaskRunnerAuthTest
                   .withGranularity(new ArbitraryGranularitySpec(new 
AllGranularity(), Collections.emptyList()))
                   .build();
     SeekableStreamIndexTaskTuningConfig tuningConfig = 
mock(SeekableStreamIndexTaskTuningConfig.class);
+    
EasyMock.expect(tuningConfig.getIntermediateHandoffPeriod()).andReturn(Period.minutes(10)).anyTimes();
+    
EasyMock.expect(tuningConfig.isLogParseExceptions()).andReturn(false).anyTimes();
+    
EasyMock.expect(tuningConfig.getMaxParseExceptions()).andReturn(10).anyTimes();
+    
EasyMock.expect(tuningConfig.getMaxSavedParseExceptions()).andReturn(10).anyTimes();
+    replay(tuningConfig);
+
     SeekableStreamIndexTaskIOConfig<String, String> ioConfig = new 
TestSeekableStreamIndexTaskIOConfig();
 
     // Initiliaze task and task runner
     SeekableStreamIndexTask<String, String, ByteEntity> indexTask
         = new TestSeekableStreamIndexTask("id", dataSchema, tuningConfig, 
ioConfig);
-    taskRunner = new TestSeekableStreamIndexTaskRunner(indexTask, 
authorizerMapper);
+    taskRunner = new TestSeekableStreamIndexTaskRunner(indexTask);
+
+    final ObjectMapper mapper = TestHelper.JSON_MAPPER;
+    final IndexIO indexIO = new IndexIO(mapper, ColumnConfig.DEFAULT);
+
+    TaskToolbox toolbox = new TaskToolbox.Builder()
+        .indexIO(indexIO)
+        .taskReportFileWriter(new NoopTestTaskReportFileWriter())
+        .authorizerMapper(authorizerMapper)
+        .rowIngestionMetersFactory(NoopRowIngestionMeters::new)
+        .indexMergerV9(new IndexMergerV9(mapper, indexIO, 
TmpFileSegmentWriteOutMediumFactory.instance(), false))
+        .build();
+    taskRunner.run(toolbox);
   }
 
   @Test
@@ -253,11 +281,10 @@ public class SeekableStreamIndexTaskRunnerAuthTest
   {
 
     private TestSeekableStreamIndexTaskRunner(
-        SeekableStreamIndexTask<String, String, ByteEntity> task,
-        AuthorizerMapper authorizerMapper
+        SeekableStreamIndexTask<String, String, ByteEntity> task
     )
     {
-      super(task, null, authorizerMapper, LockGranularity.SEGMENT);
+      super(task, null, LockGranularity.SEGMENT);
     }
 
     @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index edc58fb4414..7e021083b14 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -37,7 +37,6 @@ import 
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.AuthorizerMapper;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
@@ -102,7 +101,8 @@ public class SeekableStreamIndexTaskRunnerTest
     Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
     Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
 
-    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(task, null, null, 
LockGranularity.TIME_CHUNK);
+    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(task, null,
+                                                                               
                LockGranularity.TIME_CHUNK);
 
     Mockito.when(row.getTimestamp()).thenReturn(now);
     Assert.assertTrue(runner.withinMinMaxRecordTime(row));
@@ -154,7 +154,8 @@ public class SeekableStreamIndexTaskRunnerTest
     Mockito.when(task.getDataSchema()).thenReturn(schema);
     Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
     Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
-    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(task, null, null, 
LockGranularity.TIME_CHUNK);
+    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(task, null,
+                                                                               
                LockGranularity.TIME_CHUNK);
 
     Assert.assertTrue(runner.withinMinMaxRecordTime(row));
 
@@ -170,11 +171,10 @@ public class SeekableStreamIndexTaskRunnerTest
     public TestasbleSeekableStreamIndexTaskRunner(
         SeekableStreamIndexTask task,
         @Nullable InputRowParser parser,
-        AuthorizerMapper authorizerMapper,
         LockGranularity lockGranularityToUse
     )
     {
-      super(task, parser, authorizerMapper, lockGranularityToUse);
+      super(task, parser, lockGranularityToUse);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to