github-code-scanning[bot] commented on code in PR #12852:
URL: https://github.com/apache/druid/pull/12852#discussion_r1099894611


##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java:
##########
@@ -243,22 +241,20 @@
     } else {
       Preconditions.checkArgument(inputFormat == null);
       ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
-          new LocalFirehoseFactory(inputDirectory, filter, null),
+          null,
+          new LocalInputSource(inputDirectory, filter),
+          createInputFormatFromParseSpec(parseSpec),
           appendToExisting,
           dropExisting
       );
-      //noinspection unchecked
       ingestionSpec = new ParallelIndexIngestionSpec(
           new DataSchema(
-              "dataSource",
-              getObjectMapper().convertValue(
-                  new StringInputRowParser(parseSpec, null),
-                  Map.class
-              ),
+              DATASOURCE,
+              parseSpec.getTimestampSpec(),

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [parseSpec](1) may be null at this access because of [this](2) null 
argument.
   Variable [parseSpec](1) may be null at this access because of [this](3) null 
argument.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4250)



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java:
##########
@@ -167,6 +174,87 @@
         OBJECT_MAPPER
     );
 
+    runSamplerAndCompareResponse(samplerSpec, true);
+  }
+
+  @Test
+  public void testWithInputRowParser() throws IOException
+  {
+    insertData(generateRecords(TOPIC));
+
+    ObjectMapper objectMapper = new DefaultObjectMapper();
+    TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("dim1"),
+            new StringDimensionSchema("dim1t"),
+            new StringDimensionSchema("dim2"),
+            new LongDimensionSchema("dimLong"),
+            new FloatDimensionSchema("dimFloat")
+        )
+    );
+    InputRowParser parser = new StringInputRowParser(new 
JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null), 
"UTF8");
+
+    DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        objectMapper.readValue(objectMapper.writeValueAsBytes(parser), 
Map.class),
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("met1sum", "met1"),
+            new CountAggregatorFactory("rows")
+        },
+        new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, 
null),
+        null,
+        objectMapper
+    );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSchema.DataSchema](1) should be avoided because it has been 
deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4238)



##########
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java:
##########
@@ -112,30 +118,93 @@
   }
 
   @Test(timeout = 10_000L)
-  public void testSample() throws Exception
+  public void testSample() throws InterruptedException
   {
-    
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
-
-    recordSupplier.assign(ImmutableSet.of(StreamPartition.of(STREAM, 
SHARD_ID)));
-    EasyMock.expectLastCall().once();
+    KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
+        null,
+        DATA_SCHEMA,
+        null,
+        new KinesisSupervisorIOConfig(
+            STREAM,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            false
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
 
-    recordSupplier.seekToEarliest(ImmutableSet.of(StreamPartition.of(STREAM, 
SHARD_ID)));
-    EasyMock.expectLastCall().once();
+    KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
+        supervisorSpec,
+        new SamplerConfig(5, null, null, null),
+        new InputSourceSampler(new DefaultObjectMapper()),
+        null
+    );
 
-    
EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(STREAM)).once();
+    runSamplerAndCompareResponse(samplerSpec, true);
+  }
 
-    recordSupplier.close();
-    EasyMock.expectLastCall().once();
+  @Test
+  public void testWithInputRowParser() throws IOException
+  {
+    ObjectMapper objectMapper = new DefaultObjectMapper();
+    TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("dim1"),
+            new StringDimensionSchema("dim1t"),
+            new StringDimensionSchema("dim2"),
+            new LongDimensionSchema("dimLong"),
+            new FloatDimensionSchema("dimFloat")
+        )
+    );
+    InputRowParser parser = new StringInputRowParser(new 
JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null), 
"UTF8");
 
-    replayAll();
+    DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        objectMapper.readValue(objectMapper.writeValueAsBytes(parser), 
Map.class),
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("met1sum", "met1"),
+            new CountAggregatorFactory("rows")
+        },
+        new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, 
null),
+        null,
+        objectMapper
+    );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSchema.DataSchema](1) should be avoided because it has been 
deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4239)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+  private static final String STREAM = "sampling";
+  private static final String SHARD_ID = "1";
+
+  private final SeekableStreamSupervisorSpec supervisorSpec = 
mock(SeekableStreamSupervisorSpec.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+  private static List<OrderedPartitionableRecord<String, String, ByteEntity>> 
generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "6",
+            Collections.singletonList(new 
ByteEntity(StringUtils.toUtf8("unparseable")))
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "8", 
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+    );
+  }
+
+  @Test(timeout = 10_000L)
+  public void testSampleWithInputRowParser() throws Exception
+  {
+    final DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        OBJECT_MAPPER.convertValue(
+            new StringInputRowParser(
+                new JSONParseSpec(
+                    new TimestampSpec("timestamp", "iso", null),
+                    new DimensionsSpec(
+                        Arrays.asList(
+                            new StringDimensionSchema("dim1"),
+                            new StringDimensionSchema("dim1t"),
+                            new StringDimensionSchema("dim2"),
+                            new LongDimensionSchema("dimLong"),
+                            new FloatDimensionSchema("dimFloat")
+                        )
+                    ),
+                    new JSONPathSpec(true, ImmutableList.of()),
+                    ImmutableMap.of(),
+                    false
+                )
+            ),
+            Map.class
+        ),
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("met1sum", "met1"),
+            new CountAggregatorFactory("rows")
+        },
+        new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, 
null),
+        null,
+        OBJECT_MAPPER
+    );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSchema.DataSchema](1) should be avoided because it has been 
deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4244)



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java:
##########
@@ -141,30 +139,42 @@
     }
 
     @Override
-    public Stream<InputSplit<Object>> getSplits(@Nullable SplitHintSpec 
splitHintSpec)
+    public Stream<InputSplit> createSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
+    public int estimateNumSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public FiniteFirehoseFactory withSplit(InputSplit split)
+    public SplittableInputSource withSplit(InputSplit split)
     {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public boolean needsFormat()
+    {
+      return false;
+    }
+
+    @Override
+    protected InputSourceReader fixedFormatReader(InputRowSchema 
inputRowSchema, @Nullable File temporaryDirectory)
+    {
+      return new SeekableStreamSamplerInputSourceReader(parser);
+    }
   }
 
-  private class SeekableStreamSamplerFirehose implements Firehose
+  private class SeekableStreamSamplerInputSourceReader implements 
InputSourceReader

Review Comment:
   ## Inner class could be static
   
   SeekableStreamSamplerInputSourceReader could be made static, since the 
enclosing instance is used only in its constructor.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4251)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+  private static final String STREAM = "sampling";
+  private static final String SHARD_ID = "1";
+
+  private final SeekableStreamSupervisorSpec supervisorSpec = 
mock(SeekableStreamSupervisorSpec.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+  private static List<OrderedPartitionableRecord<String, String, ByteEntity>> 
generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "6",
+            Collections.singletonList(new 
ByteEntity(StringUtils.toUtf8("unparseable")))
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "8", 
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+    );
+  }
+
+  @Test(timeout = 10_000L)
+  public void testSampleWithInputRowParser() throws Exception
+  {
+    final DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        OBJECT_MAPPER.convertValue(
+            new StringInputRowParser(
+                new JSONParseSpec(
+                    new TimestampSpec("timestamp", "iso", null),
+                    new DimensionsSpec(
+                        Arrays.asList(
+                            new StringDimensionSchema("dim1"),
+                            new StringDimensionSchema("dim1t"),
+                            new StringDimensionSchema("dim2"),
+                            new LongDimensionSchema("dimLong"),
+                            new FloatDimensionSchema("dimFloat")
+                        )
+                    ),
+                    new JSONPathSpec(true, ImmutableList.of()),
+                    ImmutableMap.of(),
+                    false
+                )
+            ),
+            Map.class
+        ),
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("met1sum", "met1"),
+            new CountAggregatorFactory("rows")
+        },
+        new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, 
null),
+        null,
+        OBJECT_MAPPER
+    );
+
+    final SeekableStreamSupervisorIOConfig supervisorIOConfig = new 
TestableSeekableStreamSupervisorIOConfig(
+        STREAM,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        true,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
+    
EasyMock.expect(supervisorSpec.getDataSchema()).andReturn(dataSchema).once();

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided 
because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4246)



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java:
##########
@@ -291,6 +295,83 @@
           null
       );
     }
+
+    @Test
+    public void testFailToConstructWhenBothInputSourceAndParserAreSet()
+    {
+      final ObjectMapper mapper = new DefaultObjectMapper();
+      final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+          null,
+          new InlineInputSource("test"),
+          null,
+          false,
+          null
+      );
+      final ParallelIndexTuningConfig tuningConfig = new 
ParallelIndexTuningConfig(
+          null,
+          null,
+          null,
+          10,
+          1000L,
+          null,
+          null,
+          null,
+          null,
+          new HashedPartitionsSpec(null, 10, null),
+          new IndexSpec(
+              new RoaringBitmapSerdeFactory(true),
+              CompressionStrategy.UNCOMPRESSED,
+              CompressionStrategy.LZF,
+              LongEncodingStrategy.LONGS
+          ),
+          new IndexSpec(),
+          1,
+          true,
+          true,
+          10000L,
+          OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+          null,
+          10,
+          100,
+          20L,
+          new Duration(3600),
+          128,
+          null,
+          null,
+          false,
+          null,
+          null,
+          null,
+          null,
+          null
+      );
+
+      expectedException.expect(IAE.class);
+      expectedException.expectMessage("Cannot use parser and inputSource 
together. Try using inputFormat instead of parser.");
+      new ParallelIndexIngestionSpec(
+          new DataSchema(
+              "datasource",
+              mapper.convertValue(
+                  new StringInputRowParser(
+                      new JSONParseSpec(
+                          new TimestampSpec(null, null, null),
+                          DimensionsSpec.EMPTY,
+                          null,
+                          null,
+                          null
+                      )
+                  ),
+                  Map.class
+              ),
+              null,
+              null,
+              null,
+              mapper
+          ),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSchema.DataSchema](1) should be avoided because it has been 
deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4242)



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java:
##########
@@ -291,6 +295,83 @@
           null
       );
     }
+
+    @Test
+    public void testFailToConstructWhenBothInputSourceAndParserAreSet()
+    {
+      final ObjectMapper mapper = new DefaultObjectMapper();
+      final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+          null,
+          new InlineInputSource("test"),
+          null,
+          false,
+          null
+      );
+      final ParallelIndexTuningConfig tuningConfig = new 
ParallelIndexTuningConfig(
+          null,
+          null,
+          null,
+          10,
+          1000L,
+          null,
+          null,
+          null,
+          null,
+          new HashedPartitionsSpec(null, 10, null),
+          new IndexSpec(
+              new RoaringBitmapSerdeFactory(true),
+              CompressionStrategy.UNCOMPRESSED,
+              CompressionStrategy.LZF,
+              LongEncodingStrategy.LONGS
+          ),
+          new IndexSpec(),
+          1,
+          true,
+          true,
+          10000L,
+          OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+          null,
+          10,
+          100,
+          20L,
+          new Duration(3600),
+          128,
+          null,
+          null,
+          false,
+          null,
+          null,
+          null,
+          null,
+          null
+      );
+
+      expectedException.expect(IAE.class);
+      expectedException.expectMessage("Cannot use parser and inputSource 
together. Try using inputFormat instead of parser.");
+      new ParallelIndexIngestionSpec(
+          new DataSchema(
+              "datasource",
+              mapper.convertValue(
+                  new StringInputRowParser(
+                      new JSONParseSpec(
+                          new TimestampSpec(null, null, null),
+                          DimensionsSpec.EMPTY,
+                          null,
+                          null,
+                          null
+                      )
+                  ),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [StringInputRowParser.StringInputRowParser](1) should be avoided 
because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4243)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+  private static final String STREAM = "sampling";
+  private static final String SHARD_ID = "1";
+
+  private final SeekableStreamSupervisorSpec supervisorSpec = 
mock(SeekableStreamSupervisorSpec.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+  private static List<OrderedPartitionableRecord<String, String, ByteEntity>> 
generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "6",
+            Collections.singletonList(new 
ByteEntity(StringUtils.toUtf8("unparseable")))
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "8", 
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+    );
+  }
+
+  @Test(timeout = 10_000L)
+  public void testSampleWithInputRowParser() throws Exception
+  {
+    final DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        OBJECT_MAPPER.convertValue(
+            new StringInputRowParser(
+                new JSONParseSpec(
+                    new TimestampSpec("timestamp", "iso", null),
+                    new DimensionsSpec(
+                        Arrays.asList(
+                            new StringDimensionSchema("dim1"),
+                            new StringDimensionSchema("dim1t"),
+                            new StringDimensionSchema("dim2"),
+                            new LongDimensionSchema("dimLong"),
+                            new FloatDimensionSchema("dimFloat")
+                        )
+                    ),
+                    new JSONPathSpec(true, ImmutableList.of()),
+                    ImmutableMap.of(),
+                    false
+                )
+            ),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [StringInputRowParser.StringInputRowParser](1) should be avoided 
because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4245)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to