github-code-scanning[bot] commented on code in PR #13803:
URL: https://github.com/apache/druid/pull/13803#discussion_r1106168713
##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -349,9 +450,73 @@
if (fields.indexOf(field) >= 0) {
BaseColumn col = getColumnHolder(field).getColumn();
return col.makeColumnValueSelector(readableOffset);
- } else {
- return NilColumnValueSelector.instance();
}
+ if (!path.isEmpty() && path.get(path.size() - 1) instanceof
NestedPathArrayElement) {
+ final NestedPathPart lastPath = path.get(path.size() - 1);
+ final String arrayField = getField(path.subList(0, path.size() - 1));
+ if (fields.indexOf(arrayField) >= 0) {
+ final int elementNumber = ((NestedPathArrayElement)
lastPath).getIndex();
+ DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>)
getColumnHolder(arrayField).getColumn();
+ ColumnValueSelector arraySelector =
col.makeColumnValueSelector(readableOffset);
+ return new ColumnValueSelector<Object>()
+ {
+ @Override
+ public boolean isNull()
+ {
+ Object o = getObject();
+ return !(o instanceof Number);
+ }
+
+ @Override
+ public long getLong()
+ {
+ Object o = getObject();
+ return o instanceof Number ? ((Number) o).longValue() : 0L;
+ }
+
+ @Override
+ public float getFloat()
+ {
+ Object o = getObject();
+ return o instanceof Number ? ((Number) o).floatValue() : 0f;
+ }
+
+ @Override
+ public double getDouble()
+ {
+ Object o = getObject();
+ return o instanceof Number ? ((Number) o).doubleValue() : 0.0;
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ arraySelector.inspectRuntimeShape(inspector);
+ }
+
+ @Nullable
+ @Override
+ public Object getObject()
+ {
+ Object o = arraySelector.getObject();
+ if (o instanceof Object[]) {
+ Object[] array = (Object[]) o;
+ if (elementNumber < array.length) {
+ return array[elementNumber];
Review Comment:
## Improper validation of user-provided array index
This index depends on a [user-provided value](1) which can cause an
ArrayIndexOutOfBoundsException.
This index depends on a [user-provided value](2) which can cause an
ArrayIndexOutOfBoundsException.
This index depends on a [user-provided value](3) which can cause an
ArrayIndexOutOfBoundsException.
This index depends on a [user-provided value](4) which can cause an
ArrayIndexOutOfBoundsException.
This index depends on a [user-provided value](5) which can cause an
ArrayIndexOutOfBoundsException.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4268)
##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIntArrayIndexedWriter.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class FrontCodedIntArrayIndexedWriter implements DictionaryWriter<int[]>
+{
+ private static final int MAX_LOG_BUFFER_SIZE = 26;
+
+ public static final Comparator<int[]> ARRAY_COMPARATOR = (o1, o2) -> {
+ //noinspection ArrayEquality
+ if (o1 == o2) {
+ return 0;
+ }
+ if (o1 == null) {
+ return -1;
+ }
+ if (o2 == null) {
+ return 1;
+ }
+ final int iter = Math.min(o1.length, o2.length);
+ for (int i = 0; i < iter; i++) {
+ final int cmp = Integer.compare(o1[i], o2[i]);
+ if (cmp == 0) {
+ continue;
+ }
+ return cmp;
+ }
+ return Integer.compare(o1.length, o2.length);
+ };
+
+ private final SegmentWriteOutMedium segmentWriteOutMedium;
+ private final int bucketSize;
+ private final ByteOrder byteOrder;
+ private final int[][] bucketBuffer;
+ private final ByteBuffer getOffsetBuffer;
+ private final int div;
+
+ @Nullable
+ private int[] prevObject = null;
+ @Nullable
+ private WriteOutBytes headerOut = null;
+ @Nullable
+ private WriteOutBytes valuesOut = null;
+ private int numWritten = 0;
+ private ByteBuffer scratch;
+ private int logScratchSize = 10;
+ private boolean isClosed = false;
+ private boolean hasNulls = false;
+
+ public FrontCodedIntArrayIndexedWriter(
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ ByteOrder byteOrder,
+ int bucketSize
+ )
+ {
+ if (Integer.bitCount(bucketSize) != 1 || bucketSize < 1 || bucketSize >
128) {
+ throw new IAE("bucketSize must be a power of two (from 1 up to 128) but
was[%,d]", bucketSize);
+ }
+ this.segmentWriteOutMedium = segmentWriteOutMedium;
+ this.scratch = ByteBuffer.allocate(1 << logScratchSize).order(byteOrder);
+ this.bucketSize = bucketSize;
+ this.byteOrder = byteOrder;
+ this.bucketBuffer = new int[bucketSize][];
+ this.getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder);
+ this.div = Integer.numberOfTrailingZeros(bucketSize);
+ }
+
+ @Override
+ public void open() throws IOException
+ {
+ headerOut = segmentWriteOutMedium.makeWriteOutBytes();
+ valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
+ }
+
+ @Override
+ public void write(@Nullable int[] value) throws IOException
+ {
+
+ if (prevObject != null && ARRAY_COMPARATOR.compare(prevObject, value) >=
0) {
+ throw new ISE(
+ "Values must be sorted and unique. Element [%s] with value [%s] is
before or equivalent to [%s]",
+ numWritten,
+ value == null ? null : Arrays.toString(value),
+ Arrays.toString(prevObject)
+ );
+ }
+
+ if (value == null) {
+ hasNulls = true;
+ return;
+ }
+
+ // if the bucket buffer is full, write the bucket
+ if (numWritten > 0 && (numWritten % bucketSize) == 0) {
+ resetScratch();
+ int written;
+ // write the bucket, growing scratch buffer as necessary
+ do {
+ written = writeBucket(scratch, bucketBuffer, bucketSize);
+ if (written < 0) {
+ growScratch();
+ }
+ } while (written < 0);
+ scratch.flip();
+ Channels.writeFully(valuesOut, scratch);
+
+ resetScratch();
+ // write end offset for current value
+ scratch.putInt((int) valuesOut.size());
+ scratch.flip();
+ Channels.writeFully(headerOut, scratch);
+ }
+
+ bucketBuffer[numWritten % bucketSize] = value;
+
+ ++numWritten;
+ prevObject = value;
+ }
+
+
+ @Override
+ public long getSerializedSize() throws IOException
+ {
+ if (!isClosed) {
+ flush();
+ }
+ int headerAndValues = Ints.checkedCast(headerOut.size() +
valuesOut.size());
+ return Byte.BYTES +
+ Byte.BYTES +
+ Byte.BYTES +
+ VByte.computeIntSize(numWritten) +
+ VByte.computeIntSize(headerAndValues) +
+ headerAndValues;
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
+ {
+ if (!isClosed) {
+ flush();
+ }
+ resetScratch();
+ // version 0
+ scratch.put((byte) 0);
+ scratch.put((byte) bucketSize);
+ scratch.put(hasNulls ? NullHandling.IS_NULL_BYTE :
NullHandling.IS_NOT_NULL_BYTE);
+ VByte.writeInt(scratch, numWritten);
+ VByte.writeInt(scratch, Ints.checkedCast(headerOut.size() +
valuesOut.size()));
+ scratch.flip();
+ Channels.writeFully(channel, scratch);
+ headerOut.writeTo(channel);
+ valuesOut.writeTo(channel);
+ }
+
+ @Override
+ public boolean isSorted()
+ {
+ return true;
+ }
+
+ @Nullable
+ @Override
+ public int[] get(int index) throws IOException
+ {
+ if (index == 0 && hasNulls) {
+ return null;
+ }
+ final int adjustedIndex = hasNulls ? index - 1 : index;
+ final int relativeIndex = adjustedIndex % bucketSize;
+ // check for current page
+ if (adjustedIndex >= numWritten - bucketSize) {
+ return bucketBuffer[relativeIndex];
+ } else {
+ final int bucket = adjustedIndex >> div;
+ long startOffset;
+ if (bucket == 0) {
+ startOffset = 0;
+ } else {
+ startOffset = getBucketOffset(bucket - 1);
+ }
+ long endOffset = getBucketOffset(bucket);
+ int bucketSize = Ints.checkedCast(endOffset - startOffset);
Review Comment:
## Possible confusion of local and field
Confusing name: method [get](1) also refers to field [bucketSize](2)
(without qualifying it with 'this').
[Show more
details](https://github.com/apache/druid/security/code-scanning/4267)
##########
processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java:
##########
@@ -19,432 +19,444 @@
package org.apache.druid.query;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.ResourceInputSource;
+import org.apache.druid.data.input.impl.DelimitedInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.guava.nary.TrinaryFn;
import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.transform.ExpressionTransform;
+import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.SegmentId;
import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.SequenceInputStream;
-import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.BiFunction;
public class NestedDataTestUtils
{
- public static final String SIMPLE_DATA_FILE = "simple-nested-test-data.json";
- public static final String SIMPLE_PARSER_FILE =
"simple-nested-test-data-parser.json";
- public static final String SIMPLE_DATA_TSV_FILE =
"simple-nested-test-data.tsv";
- public static final String SIMPLE_PARSER_TSV_FILE =
"simple-nested-test-data-tsv-parser.json";
- public static final String SIMPLE_PARSER_TSV_TRANSFORM_FILE =
"simple-nested-test-data-tsv-transform.json";
- public static final String SIMPLE_AGG_FILE =
"simple-nested-test-data-aggs.json";
-
- public static final String TYPES_DATA_FILE = "types-test-data.json";
- public static final String TYPES_PARSER_FILE = "types-test-data-parser.json";
-
- public static final String NUMERIC_DATA_FILE =
"numeric-nested-test-data.json";
- public static final String NUMERIC_PARSER_FILE =
"numeric-nested-test-data-parser.json";
+ public static final String SIMPLE_DATA_FILE = "nested-simple-test-data.json";
+ public static final String SIMPLE_DATA_TSV_FILE =
"nested-simple-test-data.tsv";
+ public static final String NUMERIC_DATA_FILE =
"nested-numeric-test-data.json";
+ public static final String TYPES_DATA_FILE = "nested-types-test-data.json";
+ public static final String ARRAY_TYPES_DATA_FILE =
"nested-array-test-data.json";
public static final ObjectMapper JSON_MAPPER;
+ public static final TimestampSpec TIMESTAMP_SPEC = new
TimestampSpec("timestamp", null, null);
+
+ public static final DimensionsSpec AUTO_DISCOVERY =
+ DimensionsSpec.builder()
+ .setUseNestedColumnIndexerForSchemaDiscovery(true)
+ .build();
+
+ public static final DimensionsSpec TSV_SCHEMA =
+ DimensionsSpec.builder()
+ .setDimensions(
+ Arrays.asList(
+ new StringDimensionSchema("dim"),
+ new NestedDataDimensionSchema("nest_json"),
+ new NestedDataDimensionSchema("nester_json"),
+ new NestedDataDimensionSchema("variant_json"),
+ new NestedDataDimensionSchema("list_json")
+ )
+ )
+ .build();
+ public static final InputRowSchema AUTO_SCHEMA = new InputRowSchema(
+ TIMESTAMP_SPEC,
+ AUTO_DISCOVERY,
+ null
+ );
+
+ public static final InputRowSchema SIMPLE_DATA_TSV_SCHEMA = new
InputRowSchema(
+ TIMESTAMP_SPEC,
+ TSV_SCHEMA,
+ null
+ );
+
+ public static DelimitedInputFormat SIMPLE_DATA_TSV_INPUT_FORMAT =
DelimitedInputFormat.ofColumns(
+ "timestamp",
+ "dim",
+ "nest",
+ "nester",
+ "variant",
+ "list"
+ );
+ public static final TransformSpec SIMPLE_DATA_TSV_TRANSFORM = new
TransformSpec(
+ null,
+ Arrays.asList(
+ new ExpressionTransform("nest_json", "parse_json(nest)",
TestExprMacroTable.INSTANCE),
+ new ExpressionTransform("nester_json", "parse_json(nester)",
TestExprMacroTable.INSTANCE),
+ new ExpressionTransform("variant_json", "parse_json(variant)",
TestExprMacroTable.INSTANCE),
+ new ExpressionTransform("list_json", "parse_json(list)",
TestExprMacroTable.INSTANCE)
+ )
+ );
+
+ public static final AggregatorFactory[] COUNT = new AggregatorFactory[]{
+ new CountAggregatorFactory("count")
+ };
+
static {
JSON_MAPPER = TestHelper.makeJsonMapper();
JSON_MAPPER.registerModules(NestedDataModule.getJacksonModulesList());
}
- public static List<Segment> createSegments(
- AggregationTestHelper helper,
+ public static List<Segment> createSimpleSegmentsTsv(
TemporaryFolder tempFolder,
- Closer closer,
- Granularity granularity,
- boolean rollup,
- int maxRowCount
- ) throws Exception
+ Closer closer
+ )
+ throws Exception
{
- return createSegments(
- helper,
+ return createSimpleNestedTestDataTsvSegments(
tempFolder,
closer,
- SIMPLE_DATA_FILE,
- SIMPLE_PARSER_FILE,
- SIMPLE_AGG_FILE,
- granularity,
- rollup,
- maxRowCount
+ Granularities.NONE,
+ true
);
}
- public static List<Segment> createTsvSegments(
- AggregationTestHelper helper,
+ public static List<Segment> createSimpleNestedTestDataTsvSegments(
TemporaryFolder tempFolder,
Closer closer,
Granularity granularity,
- boolean rollup,
- int maxRowCount
+ boolean rollup
) throws Exception
{
return createSegments(
- helper,
tempFolder,
closer,
SIMPLE_DATA_TSV_FILE,
- SIMPLE_PARSER_TSV_FILE,
- SIMPLE_PARSER_TSV_TRANSFORM_FILE,
- SIMPLE_AGG_FILE,
+ SIMPLE_DATA_TSV_INPUT_FORMAT,
+ TIMESTAMP_SPEC,
+ SIMPLE_DATA_TSV_SCHEMA.getDimensionsSpec(),
+ SIMPLE_DATA_TSV_TRANSFORM,
+ COUNT,
granularity,
- rollup,
- maxRowCount
+ rollup
);
}
- public static Segment createIncrementalIndex(
- Granularity granularity,
- boolean rollup,
- boolean deserializeComplexMetrics,
- int maxRowCount
- )
- throws Exception
+ public static Segment
createSimpleNestedTestDataIncrementalIndex(TemporaryFolder tempFolder) throws
Exception
{
- return createIncrementalIndex(
+ return createIncrementalIndexForJsonInput(
+ tempFolder,
SIMPLE_DATA_FILE,
- SIMPLE_PARSER_FILE,
- SIMPLE_AGG_FILE,
- granularity,
- rollup,
- deserializeComplexMetrics,
- maxRowCount
+ Granularities.NONE,
+ true,
+ 1000
);
}
- public static List<Segment> createSegments(
- AggregationTestHelper helper,
+ public static List<Segment> createSimpleNestedTestDataSegments(
TemporaryFolder tempFolder,
- Closer closer,
- String inputFileName,
- String parserJsonFileName,
- String aggJsonFileName,
- Granularity granularity,
- boolean rollup,
- int maxRowCount
- ) throws Exception
+ Closer closer
+ )
+ throws Exception
{
- File segmentDir = tempFolder.newFolder();
- File inputFile = readFileFromClasspath(inputFileName);
- FileInputStream inputDataStream = new FileInputStream(inputFile);
- String parserJson = readFileFromClasspathAsString(parserJsonFileName);
- String aggJson = readFileFromClasspathAsString(aggJsonFileName);
-
- helper.createIndex(
- inputDataStream,
- parserJson,
- aggJson,
- segmentDir,
- 0,
- granularity,
- maxRowCount,
- rollup
- );
- inputDataStream.close();
-
- final List<Segment> segments = Lists.transform(
- ImmutableList.of(segmentDir),
- dir -> {
- try {
- return closer.register(new
QueryableIndexSegment(helper.getIndexIO().loadIndex(dir), SegmentId.dummy("")));
- }
- catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
+ return createSegmentsForJsonInput(
+ tempFolder,
+ closer,
+ SIMPLE_DATA_FILE,
+ Granularities.NONE,
+ true
);
+ }
- return segments;
+ public static Segment createIncrementalIndexForJsonInput(TemporaryFolder
tempFolder, String fileName)
+ throws Exception
+ {
+ return createIncrementalIndexForJsonInput(
+ tempFolder,
+ fileName,
+ Granularities.NONE,
+ true,
+ 1000
+ );
}
- public static List<Segment> createSegments(
- AggregationTestHelper helper,
+ public static Segment createIncrementalIndexForJsonInput(
TemporaryFolder tempFolder,
- Closer closer,
- String inputFileName,
- String parserJsonFileName,
- String transformSpecJsonFileName,
- String aggJsonFileName,
+ String file,
Granularity granularity,
boolean rollup,
int maxRowCount
- ) throws Exception
+ )
+ throws Exception
{
- File segmentDir = tempFolder.newFolder();
- File inputFile = readFileFromClasspath(inputFileName);
- FileInputStream inputDataStream = new FileInputStream(inputFile);
- String parserJson = readFileFromClasspathAsString(parserJsonFileName);
- String transformSpecJson =
readFileFromClasspathAsString(transformSpecJsonFileName);
- String aggJson = readFileFromClasspathAsString(aggJsonFileName);
-
- helper.createIndex(
- inputDataStream,
- parserJson,
- transformSpecJson,
- aggJson,
- segmentDir,
- 0,
+ return createIncrementalIndex(
+ tempFolder,
+ file,
+ JsonInputFormat.DEFAULT,
+ TIMESTAMP_SPEC,
+ AUTO_DISCOVERY,
+ TransformSpec.NONE,
+ COUNT,
granularity,
- maxRowCount,
- rollup
- );
- inputDataStream.close();
-
- final List<Segment> segments = Lists.transform(
- ImmutableList.of(segmentDir),
- dir -> {
- try {
- return closer.register(new
QueryableIndexSegment(helper.getIndexIO().loadIndex(dir), SegmentId.dummy("")));
- }
- catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
+ rollup,
+ maxRowCount
);
-
- return segments;
}
- public static List<Segment> createSegmentsWithConcatenatedInput(
- AggregationTestHelper helper,
+ public static List<Segment> createSegmentsForJsonInput(
TemporaryFolder tempFolder,
Closer closer,
+ String inputFile,
Granularity granularity,
- boolean rollup,
- int maxRowCount,
- int numCopies,
- int numSegments
+ boolean rollup
) throws Exception
{
- return createSegmentsWithConcatenatedInput(
- helper,
+ return createSegments(
tempFolder,
closer,
- SIMPLE_DATA_FILE,
- SIMPLE_PARSER_FILE,
- null,
- SIMPLE_AGG_FILE,
+ inputFile,
+ JsonInputFormat.DEFAULT,
+ TIMESTAMP_SPEC,
+ AUTO_DISCOVERY,
+ TransformSpec.NONE,
+ COUNT,
granularity,
- rollup,
- maxRowCount,
- numCopies,
- numSegments
+ rollup
);
}
- /**
- * turn small test data into bigger test data by duplicating itself into a
bigger stream
- */
- public static List<Segment> createSegmentsWithConcatenatedInput(
- AggregationTestHelper helper,
+ public static List<Segment> createSegmentsWithConcatenatedJsonInput(
TemporaryFolder tempFolder,
Closer closer,
- String inputFileName,
- String parserJsonFileName,
- String transformSpecJsonFileName,
- String aggJsonFileName,
+ String inputFile,
Granularity granularity,
boolean rollup,
- int maxRowCount,
int numCopies,
int numSegments
) throws Exception
{
- String parserJson = readFileFromClasspathAsString(parserJsonFileName);
- String transformSpecJson = transformSpecJsonFileName != null ?
readFileFromClasspathAsString(transformSpecJsonFileName) : null;
- String aggJson = readFileFromClasspathAsString(aggJsonFileName);
-
- List<File> segmentDirs = Lists.newArrayListWithCapacity(numSegments);
+ List<InputSource> inputFiles = Lists.newArrayListWithCapacity(numSegments);
for (int i = 0; i < numSegments; i++) {
- List<InputStream> inputStreams =
Lists.newArrayListWithCapacity(numCopies);
- for (int j = 0; j < numCopies; j++) {
- inputStreams.add(new
FileInputStream(readFileFromClasspath(inputFileName)));
- if (j + 1 < numCopies) {
- inputStreams.add(new ByteArrayInputStream(StringUtils.toUtf8("\n")));
- }
- }
- SequenceInputStream inputDataStream = new
SequenceInputStream(Collections.enumeration(inputStreams));
- File segmentDir = tempFolder.newFolder();
- helper.createIndex(
- inputDataStream,
- parserJson,
- transformSpecJson,
- aggJson,
- segmentDir,
- 0,
- granularity,
- maxRowCount,
- rollup
- );
- inputDataStream.close();
- segmentDirs.add(segmentDir);
+ File file = selfConcatenateResourceFile(tempFolder, inputFile,
numCopies);
+ inputFiles.add(new LocalInputSource(file.getParentFile(),
file.getName()));
}
-
- final List<Segment> segments = Lists.transform(
- segmentDirs,
- dir -> {
- try {
- return closer.register(new
QueryableIndexSegment(helper.getIndexIO().loadIndex(dir), SegmentId.dummy("")));
- }
- catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
- );
-
- return segments;
- }
-
- public static Segment createIncrementalIndex(
- String inputFileName,
- String parserJsonFileName,
- String aggJsonFileName,
- Granularity granularity,
- boolean rollup,
- boolean deserializeComplexMetrics,
- int maxRowCount
- )
- throws Exception
- {
- File inputFile = readFileFromClasspath(inputFileName);
- FileInputStream inputDataStream = new FileInputStream(inputFile);
- String parserJson = readFileFromClasspathAsString(parserJsonFileName);
- String aggJson = readFileFromClasspathAsString(aggJsonFileName);
- StringInputRowParser parser = JSON_MAPPER.readValue(parserJson,
StringInputRowParser.class);
-
- LineIterator iter = IOUtils.lineIterator(inputDataStream, "UTF-8");
- List<AggregatorFactory> aggregatorSpecs = JSON_MAPPER.readValue(
- aggJson,
- new TypeReference<List<AggregatorFactory>>()
- {
- }
- );
- IncrementalIndex index = AggregationTestHelper.createIncrementalIndex(
- iter,
- parser,
- parser.getParseSpec().getDimensionsSpec().getDimensions(),
- aggregatorSpecs.toArray(new AggregatorFactory[0]),
- 0,
+ return createSegments(
+ tempFolder,
+ closer,
+ inputFiles,
+ JsonInputFormat.DEFAULT,
+ TIMESTAMP_SPEC,
+ AUTO_DISCOVERY,
+ TransformSpec.NONE,
+ COUNT,
granularity,
- deserializeComplexMetrics,
- maxRowCount,
rollup
);
- inputDataStream.close();
- return new IncrementalIndexSegment(index,
SegmentId.dummy("test_datasource"));
}
- public static Segment createDefaultHourlyIncrementalIndex() throws Exception
- {
- return createIncrementalIndex(Granularities.HOUR, true, true, 1000);
- }
-
- public static Segment createDefaultDailyIncrementalIndex() throws Exception
- {
- return createIncrementalIndex(Granularities.DAY, true, true, 1000);
- }
-
- public static List<Segment> createDefaultHourlySegments(
- AggregationTestHelper helper,
+ public static List<Segment> createSegmentsForJsonInput(
TemporaryFolder tempFolder,
- Closer closer
+ Closer closer,
+ String inputFile
)
throws Exception
{
- return createSegments(
- helper,
+ return createSegmentsForJsonInput(
tempFolder,
closer,
- Granularities.HOUR,
- true,
- 1000
+ inputFile,
+ Granularities.NONE,
+ true
);
}
- public static List<Segment> createDefaultHourlySegmentsTsv(
- AggregationTestHelper helper,
+ public static Segment createIncrementalIndex(
TemporaryFolder tempFolder,
- Closer closer
+ String inputFileName,
+ InputFormat inputFormat,
+ TimestampSpec timestampSpec,
+ DimensionsSpec dimensionsSpec,
+ TransformSpec transformSpec,
+ AggregatorFactory[] aggregators,
+ Granularity queryGranularity,
+ boolean rollup,
+ int maxRowCount
Review Comment:
## Useless parameter
The parameter 'maxRowCount' is never used.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4266)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]