This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0ef39a321fc Remove redundant field
SeekableStreamIndexTask.authorizerMapper (#17928)
0ef39a321fc is described below
commit 0ef39a321fcd143068f5c2040955f7647443c325
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Apr 17 02:19:58 2025 +0530
Remove redundant field SeekableStreamIndexTask.authorizerMapper (#17928)
* Remove redundant field SeekableStreamIndexTask.authorizerMapper
* Fix test
---
...entalPublishingRabbitStreamIndexTaskRunner.java | 10 ++---
.../rabbitstream/RabbitStreamIndexTask.java | 4 +-
.../IncrementalPublishingKafkaIndexTaskRunner.java | 3 --
.../druid/indexing/kafka/KafkaIndexTask.java | 1 -
.../druid/indexing/kinesis/KinesisIndexTask.java | 1 -
.../indexing/kinesis/KinesisIndexTaskRunner.java | 9 +----
.../seekablestream/SeekableStreamIndexTask.java | 5 ---
.../SeekableStreamIndexTaskRunner.java | 5 ++-
.../SeekableStreamIndexTaskRunnerAuthTest.java | 45 +++++++++++++++++-----
.../SeekableStreamIndexTaskRunnerTest.java | 10 ++---
10 files changed, 51 insertions(+), 42 deletions(-)
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java
index 75663d23063..51f9fd901a0 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java
@@ -35,12 +35,10 @@ import
org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.security.AuthorizerMapper;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -60,14 +58,14 @@ public class
IncrementalPublishingRabbitStreamIndexTaskRunner
IncrementalPublishingRabbitStreamIndexTaskRunner(
RabbitStreamIndexTask task,
@Nullable InputRowParser<ByteBuffer> parser,
- AuthorizerMapper authorizerMapper,
- LockGranularity lockGranularityToUse)
+ LockGranularity lockGranularityToUse
+ )
{
super(
task,
parser,
- authorizerMapper,
- lockGranularityToUse);
+ lockGranularityToUse
+ );
this.task = task;
}
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
index 6e9fab2c60a..4fb452e0b2b 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
@@ -89,8 +89,8 @@ public class RabbitStreamIndexTask extends
SeekableStreamIndexTask<String, Long,
return new IncrementalPublishingRabbitStreamIndexTaskRunner(
this,
dataSchema.getParser(),
- authorizerMapper,
- lockGranularityToUse);
+ lockGranularityToUse
+ );
}
@Override
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index b16f825308f..73270c18576 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -37,7 +37,6 @@ import
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
@@ -66,14 +65,12 @@ public class IncrementalPublishingKafkaIndexTaskRunner
extends SeekableStreamInd
IncrementalPublishingKafkaIndexTaskRunner(
KafkaIndexTask task,
@Nullable InputRowParser<ByteBuffer> parser,
- AuthorizerMapper authorizerMapper,
LockGranularity lockGranularityToUse
)
{
super(
task,
parser,
- authorizerMapper,
lockGranularityToUse
);
this.task = task;
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index e5ff77467cd..c7f477ebb50 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -93,7 +93,6 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<KafkaTopicPartition,
return new IncrementalPublishingKafkaIndexTaskRunner(
this,
dataSchema.getParser(),
- authorizerMapper,
lockGranularityToUse
);
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index bea69d96c3d..f9ca54dd996 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -106,7 +106,6 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, Ki
return new KinesisIndexTaskRunner(
this,
dataSchema.getParser(),
- authorizerMapper,
lockGranularityToUse
);
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index dfca894d281..1c6bdabe0c4 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -36,7 +36,6 @@ import
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.security.AuthorizerMapper;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -59,16 +58,10 @@ public class KinesisIndexTaskRunner extends
SeekableStreamIndexTaskRunner<String
KinesisIndexTaskRunner(
KinesisIndexTask task,
@Nullable InputRowParser<ByteBuffer> parser,
- AuthorizerMapper authorizerMapper,
LockGranularity lockGranularityToUse
)
{
- super(
- task,
- parser,
- authorizerMapper,
- lockGranularityToUse
- );
+ super(task, parser, lockGranularityToUse);
this.task = task;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 7b10c0afdcc..1abff409229 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -52,9 +52,7 @@ import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
-import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nullable;
import java.util.Map;
@@ -78,9 +76,6 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
// By the way, lazily init is synchronized because the runner may be needed
in multiple threads.
private final Supplier<SeekableStreamIndexTaskRunner<PartitionIdType,
SequenceOffsetType, ?>> runnerSupplier;
- @MonotonicNonNull
- protected AuthorizerMapper authorizerMapper;
-
public SeekableStreamIndexTask(
final String id,
@Nullable final TaskResource taskResource,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index cb5214eb6d9..ae6f1d3f979 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -255,7 +255,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
public SeekableStreamIndexTaskRunner(
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType,
RecordType> task,
@Nullable final InputRowParser<ByteBuffer> parser,
- final AuthorizerMapper authorizerMapper,
final LockGranularity lockGranularityToUse
)
{
@@ -266,7 +265,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
this.inputRowSchema = InputRowSchemas.fromDataSchema(task.getDataSchema());
this.inputFormat = ioConfig.getInputFormat();
this.parser = parser;
- this.authorizerMapper = authorizerMapper;
this.stream = ioConfig.getStartSequenceNumbers().getStream();
this.endOffsets = new
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
this.sequences = new CopyOnWriteArrayList<>();
@@ -1452,6 +1450,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
*/
private void authorizationCheck(final HttpServletRequest request)
{
+ if (authorizerMapper == null) {
+ throw DruidException.defensive("Cannot authorize request since
AuthorizerMapper is not initialized yet.");
+ }
AuthorizationUtils.verifyUnrestrictedAccessToDatasource(request,
task.getDataSource(), authorizerMapper);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
index 89ee5e6ca35..1dd70b07336 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
@@ -28,13 +28,20 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
@@ -45,6 +52,7 @@ import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceType;
import org.easymock.EasyMock;
import org.joda.time.Duration;
+import org.joda.time.Period;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -94,14 +102,16 @@ public class SeekableStreamIndexTaskRunnerAuthTest
// - Datasource Read User requests Read access
// - or, Datasource Write User requests Write access
if (resource.getType().equals(ResourceType.DATASOURCE)) {
- return new Access(
- (action == Action.READ &&
username.equals(Users.DATASOURCE_READ))
- || (action == Action.WRITE &&
username.equals(Users.DATASOURCE_WRITE))
- );
+ if ((action == Action.READ &&
username.equals(Users.DATASOURCE_READ))
+ || (action == Action.WRITE &&
username.equals(Users.DATASOURCE_WRITE))) {
+ return Access.allow();
+ } else {
+ return Access.DENIED;
+ }
}
// Do not allow access to any other resource
- return new Access(false);
+ return Access.DENIED;
};
}
};
@@ -114,12 +124,30 @@ public class SeekableStreamIndexTaskRunnerAuthTest
.withGranularity(new ArbitraryGranularitySpec(new
AllGranularity(), Collections.emptyList()))
.build();
SeekableStreamIndexTaskTuningConfig tuningConfig =
mock(SeekableStreamIndexTaskTuningConfig.class);
+
EasyMock.expect(tuningConfig.getIntermediateHandoffPeriod()).andReturn(Period.minutes(10)).anyTimes();
+
EasyMock.expect(tuningConfig.isLogParseExceptions()).andReturn(false).anyTimes();
+
EasyMock.expect(tuningConfig.getMaxParseExceptions()).andReturn(10).anyTimes();
+
EasyMock.expect(tuningConfig.getMaxSavedParseExceptions()).andReturn(10).anyTimes();
+ replay(tuningConfig);
+
SeekableStreamIndexTaskIOConfig<String, String> ioConfig = new
TestSeekableStreamIndexTaskIOConfig();
// Initiliaze task and task runner
SeekableStreamIndexTask<String, String, ByteEntity> indexTask
= new TestSeekableStreamIndexTask("id", dataSchema, tuningConfig,
ioConfig);
- taskRunner = new TestSeekableStreamIndexTaskRunner(indexTask,
authorizerMapper);
+ taskRunner = new TestSeekableStreamIndexTaskRunner(indexTask);
+
+ final ObjectMapper mapper = TestHelper.JSON_MAPPER;
+ final IndexIO indexIO = new IndexIO(mapper, ColumnConfig.DEFAULT);
+
+ TaskToolbox toolbox = new TaskToolbox.Builder()
+ .indexIO(indexIO)
+ .taskReportFileWriter(new NoopTestTaskReportFileWriter())
+ .authorizerMapper(authorizerMapper)
+ .rowIngestionMetersFactory(NoopRowIngestionMeters::new)
+ .indexMergerV9(new IndexMergerV9(mapper, indexIO,
TmpFileSegmentWriteOutMediumFactory.instance(), false))
+ .build();
+ taskRunner.run(toolbox);
}
@Test
@@ -253,11 +281,10 @@ public class SeekableStreamIndexTaskRunnerAuthTest
{
private TestSeekableStreamIndexTaskRunner(
- SeekableStreamIndexTask<String, String, ByteEntity> task,
- AuthorizerMapper authorizerMapper
+ SeekableStreamIndexTask<String, String, ByteEntity> task
)
{
- super(task, null, authorizerMapper, LockGranularity.SEGMENT);
+ super(task, null, LockGranularity.SEGMENT);
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index edc58fb4414..7e021083b14 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -37,7 +37,6 @@ import
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.security.AuthorizerMapper;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@@ -102,7 +101,8 @@ public class SeekableStreamIndexTaskRunnerTest
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
- TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(task, null, null,
LockGranularity.TIME_CHUNK);
+ TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(task, null,
+
LockGranularity.TIME_CHUNK);
Mockito.when(row.getTimestamp()).thenReturn(now);
Assert.assertTrue(runner.withinMinMaxRecordTime(row));
@@ -154,7 +154,8 @@ public class SeekableStreamIndexTaskRunnerTest
Mockito.when(task.getDataSchema()).thenReturn(schema);
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
- TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(task, null, null,
LockGranularity.TIME_CHUNK);
+ TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(task, null,
+
LockGranularity.TIME_CHUNK);
Assert.assertTrue(runner.withinMinMaxRecordTime(row));
@@ -170,11 +171,10 @@ public class SeekableStreamIndexTaskRunnerTest
public TestasbleSeekableStreamIndexTaskRunner(
SeekableStreamIndexTask task,
@Nullable InputRowParser parser,
- AuthorizerMapper authorizerMapper,
LockGranularity lockGranularityToUse
)
{
- super(task, parser, authorizerMapper, lockGranularityToUse);
+ super(task, parser, lockGranularityToUse);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]