github-code-scanning[bot] commented on code in PR #12852:
URL: https://github.com/apache/druid/pull/12852#discussion_r1099894611
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java:
##########
@@ -243,22 +241,20 @@
} else {
Preconditions.checkArgument(inputFormat == null);
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
- new LocalFirehoseFactory(inputDirectory, filter, null),
+ null,
+ new LocalInputSource(inputDirectory, filter),
+ createInputFormatFromParseSpec(parseSpec),
appendToExisting,
dropExisting
);
- //noinspection unchecked
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
- "dataSource",
- getObjectMapper().convertValue(
- new StringInputRowParser(parseSpec, null),
- Map.class
- ),
+ DATASOURCE,
+ parseSpec.getTimestampSpec(),
Review Comment:
## Dereferenced variable may be null
Variable [parseSpec](1) may be null at this access because of [this](2) null
argument.
Variable [parseSpec](1) may be null at this access because of [this](3) null
argument.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4250)
##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java:
##########
@@ -167,6 +174,87 @@
OBJECT_MAPPER
);
+ runSamplerAndCompareResponse(samplerSpec, true);
+ }
+
+ @Test
+ public void testWithInputRowParser() throws IOException
+ {
+ insertData(generateRecords(TOPIC));
+
+ ObjectMapper objectMapper = new DefaultObjectMapper();
+ TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
+ );
+ InputRowParser parser = new StringInputRowParser(new
JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null),
"UTF8");
+
+ DataSchema dataSchema = new DataSchema(
+ "test_ds",
+ objectMapper.readValue(objectMapper.writeValueAsBytes(parser),
Map.class),
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("met1sum", "met1"),
+ new CountAggregatorFactory("rows")
+ },
+ new UniformGranularitySpec(Granularities.DAY, Granularities.NONE,
null),
+ null,
+ objectMapper
+ );
Review Comment:
## Deprecated method or constructor invocation
Invoking [DataSchema.DataSchema](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4238)
##########
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java:
##########
@@ -112,30 +118,93 @@
}
@Test(timeout = 10_000L)
- public void testSample() throws Exception
+ public void testSample() throws InterruptedException
{
-
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
-
- recordSupplier.assign(ImmutableSet.of(StreamPartition.of(STREAM,
SHARD_ID)));
- EasyMock.expectLastCall().once();
+ KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
+ null,
+ DATA_SCHEMA,
+ null,
+ new KinesisSupervisorIOConfig(
+ STREAM,
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
- recordSupplier.seekToEarliest(ImmutableSet.of(StreamPartition.of(STREAM,
SHARD_ID)));
- EasyMock.expectLastCall().once();
+ KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
+ supervisorSpec,
+ new SamplerConfig(5, null, null, null),
+ new InputSourceSampler(new DefaultObjectMapper()),
+ null
+ );
-
EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(STREAM)).once();
+ runSamplerAndCompareResponse(samplerSpec, true);
+ }
- recordSupplier.close();
- EasyMock.expectLastCall().once();
+ @Test
+ public void testWithInputRowParser() throws IOException
+ {
+ ObjectMapper objectMapper = new DefaultObjectMapper();
+ TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
+ );
+ InputRowParser parser = new StringInputRowParser(new
JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null),
"UTF8");
- replayAll();
+ DataSchema dataSchema = new DataSchema(
+ "test_ds",
+ objectMapper.readValue(objectMapper.writeValueAsBytes(parser),
Map.class),
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("met1sum", "met1"),
+ new CountAggregatorFactory("rows")
+ },
+ new UniformGranularitySpec(Granularities.DAY, Granularities.NONE,
null),
+ null,
+ objectMapper
+ );
Review Comment:
## Deprecated method or constructor invocation
Invoking [DataSchema.DataSchema](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4239)
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+ private static final String STREAM = "sampling";
+ private static final String SHARD_ID = "1";
+
+ private final SeekableStreamSupervisorSpec supervisorSpec =
mock(SeekableStreamSupervisorSpec.class);
+
+ static {
+ NullHandling.initializeForTests();
+ }
+
+ private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+ private static List<OrderedPartitionableRecord<String, String, ByteEntity>>
generateRecords(String stream)
+ {
+ return ImmutableList.of(
+ new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "5",
+ jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+ ),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "6",
+ Collections.singletonList(new
ByteEntity(StringUtils.toUtf8("unparseable")))
+ ),
+ new OrderedPartitionableRecord<>(stream, "1", "8",
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+ );
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSampleWithInputRowParser() throws Exception
+ {
+ final DataSchema dataSchema = new DataSchema(
+ "test_ds",
+ OBJECT_MAPPER.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
+ ),
+ new JSONPathSpec(true, ImmutableList.of()),
+ ImmutableMap.of(),
+ false
+ )
+ ),
+ Map.class
+ ),
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("met1sum", "met1"),
+ new CountAggregatorFactory("rows")
+ },
+ new UniformGranularitySpec(Granularities.DAY, Granularities.NONE,
null),
+ null,
+ OBJECT_MAPPER
+ );
Review Comment:
## Deprecated method or constructor invocation
Invoking [DataSchema.DataSchema](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4244)
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java:
##########
@@ -141,30 +139,42 @@
}
@Override
- public Stream<InputSplit<Object>> getSplits(@Nullable SplitHintSpec
splitHintSpec)
+ public Stream<InputSplit> createSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
{
throw new UnsupportedOperationException();
}
@Override
- public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
+ public int estimateNumSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
{
throw new UnsupportedOperationException();
}
@Override
- public FiniteFirehoseFactory withSplit(InputSplit split)
+ public SplittableInputSource withSplit(InputSplit split)
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean needsFormat()
+ {
+ return false;
+ }
+
+ @Override
+ protected InputSourceReader fixedFormatReader(InputRowSchema
inputRowSchema, @Nullable File temporaryDirectory)
+ {
+ return new SeekableStreamSamplerInputSourceReader(parser);
+ }
}
- private class SeekableStreamSamplerFirehose implements Firehose
+ private class SeekableStreamSamplerInputSourceReader implements
InputSourceReader
Review Comment:
## Inner class could be static
SeekableStreamSamplerInputSourceReader could be made static, since the
enclosing instance is used only in its constructor.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4251)
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+ private static final String STREAM = "sampling";
+ private static final String SHARD_ID = "1";
+
+ private final SeekableStreamSupervisorSpec supervisorSpec =
mock(SeekableStreamSupervisorSpec.class);
+
+ static {
+ NullHandling.initializeForTests();
+ }
+
+ private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+ private static List<OrderedPartitionableRecord<String, String, ByteEntity>>
generateRecords(String stream)
+ {
+ return ImmutableList.of(
+ new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "5",
+ jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+ ),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "6",
+ Collections.singletonList(new
ByteEntity(StringUtils.toUtf8("unparseable")))
+ ),
+ new OrderedPartitionableRecord<>(stream, "1", "8",
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+ );
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSampleWithInputRowParser() throws Exception
+ {
+ final DataSchema dataSchema = new DataSchema(
+ "test_ds",
+ OBJECT_MAPPER.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
+ ),
+ new JSONPathSpec(true, ImmutableList.of()),
+ ImmutableMap.of(),
+ false
+ )
+ ),
+ Map.class
+ ),
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("met1sum", "met1"),
+ new CountAggregatorFactory("rows")
+ },
+ new UniformGranularitySpec(Granularities.DAY, Granularities.NONE,
null),
+ null,
+ OBJECT_MAPPER
+ );
+
+ final SeekableStreamSupervisorIOConfig supervisorIOConfig = new
TestableSeekableStreamSupervisorIOConfig(
+ STREAM,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
+
EasyMock.expect(supervisorSpec.getDataSchema()).andReturn(dataSchema).once();
Review Comment:
## Deprecated method or constructor invocation
Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided
because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4246)
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java:
##########
@@ -291,6 +295,83 @@
null
);
}
+
+ @Test
+ public void testFailToConstructWhenBothInputSourceAndParserAreSet()
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ null,
+ new InlineInputSource("test"),
+ null,
+ false,
+ null
+ );
+ final ParallelIndexTuningConfig tuningConfig = new
ParallelIndexTuningConfig(
+ null,
+ null,
+ null,
+ 10,
+ 1000L,
+ null,
+ null,
+ null,
+ null,
+ new HashedPartitionsSpec(null, 10, null),
+ new IndexSpec(
+ new RoaringBitmapSerdeFactory(true),
+ CompressionStrategy.UNCOMPRESSED,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ new IndexSpec(),
+ 1,
+ true,
+ true,
+ 10000L,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+ null,
+ 10,
+ 100,
+ 20L,
+ new Duration(3600),
+ 128,
+ null,
+ null,
+ false,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ expectedException.expect(IAE.class);
+ expectedException.expectMessage("Cannot use parser and inputSource
together. Try using inputFormat instead of parser.");
+ new ParallelIndexIngestionSpec(
+ new DataSchema(
+ "datasource",
+ mapper.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec(null, null, null),
+ DimensionsSpec.EMPTY,
+ null,
+ null,
+ null
+ )
+ ),
+ Map.class
+ ),
+ null,
+ null,
+ null,
+ mapper
+ ),
Review Comment:
## Deprecated method or constructor invocation
Invoking [DataSchema.DataSchema](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4242)
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java:
##########
@@ -291,6 +295,83 @@
null
);
}
+
+ @Test
+ public void testFailToConstructWhenBothInputSourceAndParserAreSet()
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ null,
+ new InlineInputSource("test"),
+ null,
+ false,
+ null
+ );
+ final ParallelIndexTuningConfig tuningConfig = new
ParallelIndexTuningConfig(
+ null,
+ null,
+ null,
+ 10,
+ 1000L,
+ null,
+ null,
+ null,
+ null,
+ new HashedPartitionsSpec(null, 10, null),
+ new IndexSpec(
+ new RoaringBitmapSerdeFactory(true),
+ CompressionStrategy.UNCOMPRESSED,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ new IndexSpec(),
+ 1,
+ true,
+ true,
+ 10000L,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+ null,
+ 10,
+ 100,
+ 20L,
+ new Duration(3600),
+ 128,
+ null,
+ null,
+ false,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ expectedException.expect(IAE.class);
+ expectedException.expectMessage("Cannot use parser and inputSource
together. Try using inputFormat instead of parser.");
+ new ParallelIndexIngestionSpec(
+ new DataSchema(
+ "datasource",
+ mapper.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec(null, null, null),
+ DimensionsSpec.EMPTY,
+ null,
+ null,
+ null
+ )
+ ),
Review Comment:
## Deprecated method or constructor invocation
Invoking [StringInputRowParser.StringInputRowParser](1) should be avoided
because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4243)
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+ private static final String STREAM = "sampling";
+ private static final String SHARD_ID = "1";
+
+ private final SeekableStreamSupervisorSpec supervisorSpec =
mock(SeekableStreamSupervisorSpec.class);
+
+ static {
+ NullHandling.initializeForTests();
+ }
+
+ private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+ private static List<OrderedPartitionableRecord<String, String, ByteEntity>>
generateRecords(String stream)
+ {
+ return ImmutableList.of(
+ new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "5",
+ jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+ ),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "6",
+ Collections.singletonList(new
ByteEntity(StringUtils.toUtf8("unparseable")))
+ ),
+ new OrderedPartitionableRecord<>(stream, "1", "8",
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+ );
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSampleWithInputRowParser() throws Exception
+ {
+ final DataSchema dataSchema = new DataSchema(
+ "test_ds",
+ OBJECT_MAPPER.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
+ ),
+ new JSONPathSpec(true, ImmutableList.of()),
+ ImmutableMap.of(),
+ false
+ )
+ ),
Review Comment:
## Deprecated method or constructor invocation
Invoking [StringInputRowParser.StringInputRowParser](1) should be avoided
because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4245)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]