This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fb26a1093d discover nested columns when using nested column indexer
for schemaless ingestion (#13672)
fb26a1093d is described below
commit fb26a1093d11f0574dbb905ac0d39c8eb3777312
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jan 18 12:57:28 2023 -0800
discover nested columns when using nested column indexer for schemaless
ingestion (#13672)
* discover nested columns when using nested column indexer for schemaless
* move useNestedColumnIndexerForSchemaDiscovery from AppendableIndexSpec to
DimensionsSpec
---
.../apache/druid/data/input/InputRowSchema.java | 1 +
.../druid/data/input/impl/DimensionsSpec.java | 61 ++++++++++--
.../druid/data/input/impl/JsonLineReader.java | 8 +-
.../druid/data/input/impl/JsonNodeReader.java | 8 +-
.../apache/druid/data/input/impl/JsonReader.java | 8 +-
.../util/common/parsers/JSONFlattenerMaker.java | 10 +-
.../java/util/common/parsers/JSONPathParser.java | 5 +-
.../java/util/common/parsers/ObjectFlatteners.java | 4 +-
.../common/parsers/JSONFlattenerMakerTest.java | 32 ++++++-
.../util/common/parsers/ObjectFlattenersTest.java | 2 +-
.../druid/data/input/avro/AvroFlattenerMaker.java | 24 ++++-
.../druid/data/input/avro/AvroOCFReader.java | 10 +-
.../apache/druid/data/input/avro/AvroParsers.java | 13 ++-
.../druid/data/input/avro/AvroStreamReader.java | 10 +-
.../data/input/avro/AvroFlattenerMakerTest.java | 60 +++++++++++-
.../data/input/kafkainput/KafkaInputFormat.java | 7 +-
.../data/input/orc/OrcHadoopInputRowParser.java | 10 +-
.../org/apache/druid/data/input/orc/OrcReader.java | 8 +-
.../data/input/orc/OrcStructFlattenerMaker.java | 10 +-
.../apache/druid/data/input/orc/OrcReaderTest.java | 104 +++++++++++++++++++++
.../druid/data/input/parquet/ParquetReader.java | 8 +-
.../avro/ParquetAvroHadoopInputRowParser.java | 10 +-
.../parquet/simple/ParquetGroupFlattenerMaker.java | 10 +-
.../simple/ParquetHadoopInputRowParser.java | 10 +-
.../parquet/NestedColumnParquetReaderTest.java | 73 +++++++++++++++
.../input/protobuf/ProtobufFlattenerMaker.java | 13 ++-
.../input/protobuf/ProtobufInputRowSchema.java | 13 ++-
.../druid/data/input/protobuf/ProtobufReader.java | 5 +-
.../input/protobuf/ProtobufInputFormatTest.java | 71 ++++++++++++++
.../indexing/overlord/sampler/SamplerConfig.java | 21 ++---
.../task/ClientCompactionTaskQuerySerdeTest.java | 4 +-
.../cases/cluster/Common/dependencies.yaml | 3 +-
.../druid/testsEx/cluster/MetastoreClient.java | 1 -
.../testsEx/config/IntegrationTestingConfigEx.java | 1 -
.../testsEx/indexer/AbstractITBatchIndexTest.java | 41 +++++++-
.../AbstractLocalInputSourceParallelIndexTest.java | 35 ++++++-
...ITLocalInputSourceAllFormatSchemalessTest.java} | 79 +++++++++++-----
.../ITLocalInputSourceAllInputFormatTest.java | 1 -
.../testsEx/indexer/ITOverwriteBatchIndexTest.java | 1 +
.../testsEx/leadership/ITHighAvailabilityTest.java | 2 +-
.../resources/indexer/wikipedia_index_queries.json | 2 +-
.../wikipedia_index_schemaless_queries.json | 32 +++++++
...a_local_input_source_index_task_schemaless.json | 42 +++++++++
.../incremental/AppendableIndexBuilder.java | 10 --
.../segment/incremental/AppendableIndexSpec.java | 3 -
.../segment/incremental/IncrementalIndex.java | 6 +-
.../incremental/OnheapIncrementalIndex.java | 39 +++-----
.../druid/segment/NestedDataColumnIndexerTest.java | 4 +-
.../OnheapIncrementalIndexBenchmark.java | 6 +-
.../incremental/OnheapIncrementalIndexTest.java} | 34 ++++---
.../ClientCompactionTaskQueryTuningConfig.java | 4 +-
.../DataSourceCompactionConfigTest.java | 2 +-
.../UserCompactionTaskQueryTuningConfigTest.java | 2 +-
.../duty/NewestSegmentFirstPolicyTest.java | 4 +-
54 files changed, 829 insertions(+), 158 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
index 3c4263ba99..dc7d50afd4 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
@@ -34,6 +34,7 @@ public class InputRowSchema
private final TimestampSpec timestampSpec;
private final DimensionsSpec dimensionsSpec;
private final ColumnsFilter columnsFilter;
+
/**
* Set of metric names for further downstream processing by {@link
InputSource}.
* Empty set if no metric given.
diff --git
a/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
b/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
index 6e412c30ad..6dfaa9ad3d 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
@@ -49,7 +49,9 @@ public class DimensionsSpec
private final Map<String, DimensionSchema> dimensionSchemaMap;
private final boolean includeAllDimensions;
- public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null,
null, false);
+ private final boolean useNestedColumnIndexerForSchemaDiscovery;
+
+ public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null,
null, false, null);
public static List<DimensionSchema> getDefaultSchemas(List<String> dimNames)
{
@@ -78,7 +80,7 @@ public class DimensionsSpec
public DimensionsSpec(List<DimensionSchema> dimensions)
{
- this(dimensions, null, null, false);
+ this(dimensions, null, null, false, null);
}
@JsonCreator
@@ -86,7 +88,8 @@ public class DimensionsSpec
@JsonProperty("dimensions") List<DimensionSchema> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@Deprecated @JsonProperty("spatialDimensions")
List<SpatialDimensionSchema> spatialDimensions,
- @JsonProperty("includeAllDimensions") boolean includeAllDimensions
+ @JsonProperty("includeAllDimensions") boolean includeAllDimensions,
+ @JsonProperty("useNestedColumnIndexerForSchemaDiscovery") Boolean
useNestedColumnIndexerForSchemaDiscovery
)
{
this.dimensions = dimensions == null
@@ -115,6 +118,8 @@ public class DimensionsSpec
dimensionSchemaMap.put(newSchema.getName(), newSchema);
}
this.includeAllDimensions = includeAllDimensions;
+ this.useNestedColumnIndexerForSchemaDiscovery =
+ useNestedColumnIndexerForSchemaDiscovery != null &&
useNestedColumnIndexerForSchemaDiscovery;
}
@JsonProperty
@@ -135,6 +140,12 @@ public class DimensionsSpec
return includeAllDimensions;
}
+ @JsonProperty
+ public boolean useNestedColumnIndexerForSchemaDiscovery()
+ {
+ return useNestedColumnIndexerForSchemaDiscovery;
+ }
+
@Deprecated
@JsonIgnore
public List<SpatialDimensionSchema> getSpatialDimensions()
@@ -188,7 +199,13 @@ public class DimensionsSpec
@PublicApi
public DimensionsSpec withDimensions(List<DimensionSchema> dims)
{
- return new DimensionsSpec(dims, ImmutableList.copyOf(dimensionExclusions),
null, includeAllDimensions);
+ return new DimensionsSpec(
+ dims,
+ ImmutableList.copyOf(dimensionExclusions),
+ null,
+ includeAllDimensions,
+ useNestedColumnIndexerForSchemaDiscovery
+ );
}
public DimensionsSpec withDimensionExclusions(Set<String> dimExs)
@@ -197,14 +214,21 @@ public class DimensionsSpec
dimensions,
ImmutableList.copyOf(Sets.union(dimensionExclusions, dimExs)),
null,
- includeAllDimensions
+ includeAllDimensions,
+ useNestedColumnIndexerForSchemaDiscovery
);
}
@Deprecated
public DimensionsSpec withSpatialDimensions(List<SpatialDimensionSchema>
spatials)
{
- return new DimensionsSpec(dimensions,
ImmutableList.copyOf(dimensionExclusions), spatials, includeAllDimensions);
+ return new DimensionsSpec(
+ dimensions,
+ ImmutableList.copyOf(dimensionExclusions),
+ spatials,
+ includeAllDimensions,
+ useNestedColumnIndexerForSchemaDiscovery
+ );
}
private void verify(List<SpatialDimensionSchema> spatialDimensions)
@@ -243,6 +267,7 @@ public class DimensionsSpec
}
DimensionsSpec that = (DimensionsSpec) o;
return includeAllDimensions == that.includeAllDimensions
+ && useNestedColumnIndexerForSchemaDiscovery ==
that.useNestedColumnIndexerForSchemaDiscovery
&& Objects.equals(dimensions, that.dimensions)
&& Objects.equals(dimensionExclusions, that.dimensionExclusions);
}
@@ -250,7 +275,12 @@ public class DimensionsSpec
@Override
public int hashCode()
{
- return Objects.hash(dimensions, dimensionExclusions, includeAllDimensions);
+ return Objects.hash(
+ dimensions,
+ dimensionExclusions,
+ includeAllDimensions,
+ useNestedColumnIndexerForSchemaDiscovery
+ );
}
@Override
@@ -260,6 +290,7 @@ public class DimensionsSpec
"dimensions=" + dimensions +
", dimensionExclusions=" + dimensionExclusions +
", includeAllDimensions=" + includeAllDimensions +
+ ", useNestedColumnIndexerForSchemaDiscovery=" +
useNestedColumnIndexerForSchemaDiscovery +
'}';
}
@@ -270,6 +301,8 @@ public class DimensionsSpec
private List<SpatialDimensionSchema> spatialDimensions;
private boolean includeAllDimensions;
+ private boolean useNestedColumnIndexerForSchemaDiscovery;
+
public Builder setDimensions(List<DimensionSchema> dimensions)
{
this.dimensions = dimensions;
@@ -301,9 +334,21 @@ public class DimensionsSpec
return this;
}
+ public Builder setUseNestedColumnIndexerForSchemaDiscovery(boolean
useNestedColumnIndexerForSchemaDiscovery)
+ {
+ this.useNestedColumnIndexerForSchemaDiscovery =
useNestedColumnIndexerForSchemaDiscovery;
+ return this;
+ }
+
public DimensionsSpec build()
{
- return new DimensionsSpec(dimensions, dimensionExclusions,
spatialDimensions, includeAllDimensions);
+ return new DimensionsSpec(
+ dimensions,
+ dimensionExclusions,
+ spatialDimensions,
+ includeAllDimensions,
+ useNestedColumnIndexerForSchemaDiscovery
+ );
}
}
}
diff --git
a/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java
b/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java
index 19c0d685a9..aa5ecbb086 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java
@@ -59,7 +59,13 @@ public class JsonLineReader extends TextReader
)
{
super(inputRowSchema, source);
- this.flattener = ObjectFlatteners.create(flattenSpec, new
JSONFlattenerMaker(keepNullColumns));
+ this.flattener = ObjectFlatteners.create(
+ flattenSpec,
+ new JSONFlattenerMaker(
+ keepNullColumns,
+
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
this.mapper = mapper;
}
diff --git
a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
index 7a7b98c452..eaf42244cb 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
@@ -80,7 +80,13 @@ public class JsonNodeReader extends
IntermediateRowParsingReader<JsonNode>
{
this.inputRowSchema = inputRowSchema;
this.source = source;
- this.flattener = ObjectFlatteners.create(flattenSpec, new
JSONFlattenerMaker(keepNullColumns));
+ this.flattener = ObjectFlatteners.create(
+ flattenSpec,
+ new JSONFlattenerMaker(
+ keepNullColumns,
+
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
this.mapper = mapper;
this.jsonFactory = new JsonFactory();
}
diff --git
a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
index 8dee12dc30..1b1402f22a 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
@@ -77,7 +77,13 @@ public class JsonReader extends
IntermediateRowParsingReader<String>
{
this.inputRowSchema = inputRowSchema;
this.source = source;
- this.flattener = ObjectFlatteners.create(flattenSpec, new
JSONFlattenerMaker(keepNullColumns));
+ this.flattener = ObjectFlatteners.create(
+ flattenSpec,
+ new JSONFlattenerMaker(
+ keepNullColumns,
+
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
this.mapper = mapper;
this.jsonFactory = new JsonFactory();
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
index b54b51f863..0b8244e29b 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
@@ -57,15 +57,23 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
private final boolean keepNullValues;
+ private final boolean discoverNestedFields;
- public JSONFlattenerMaker(boolean keepNullValues)
+
+ public JSONFlattenerMaker(boolean keepNullValues, boolean
discoverNestedFields)
{
this.keepNullValues = keepNullValues;
+ this.discoverNestedFields = discoverNestedFields;
}
@Override
public Iterable<String> discoverRootFields(final JsonNode obj)
{
+ // if discovering nested fields, just return all root fields since we want
everything
+ // else, we filter for literals and arrays of literals
+ if (discoverNestedFields) {
+ return obj::fieldNames;
+ }
return FluentIterable.from(obj::fields)
.filter(
entry -> {
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java
index 48777a8311..281d8442cc 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java
@@ -42,7 +42,10 @@ public class JSONPathParser implements Parser<String, Object>
public JSONPathParser(JSONPathSpec flattenSpec, ObjectMapper mapper, boolean
keepNullColumns)
{
this.mapper = mapper == null ? new ObjectMapper() : mapper;
- this.flattener = ObjectFlatteners.create(flattenSpec, new
JSONFlattenerMaker(keepNullColumns));
+ this.flattener = ObjectFlatteners.create(
+ flattenSpec,
+ new JSONFlattenerMaker(keepNullColumns, false)
+ );
}
@Override
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
index 98f087333a..b2040d73db 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
@@ -218,7 +218,9 @@ public class ObjectFlatteners
{
JsonProvider getJsonProvider();
/**
- * List all "root" primitive properties and primitive lists (no nested
objects, no lists of objects)
+ * List all "root" fields. If
+ * {@link
org.apache.druid.data.input.impl.DimensionsSpec#useNestedColumnIndexerForSchemaDiscovery}
is false, this
+ * method should filter fields to include only fields that contain
primitive and lists of primitive values
*/
Iterable<String> discoverRootFields(T obj);
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java
b/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java
index 0a1b09578b..5583081db1 100644
---
a/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java
+++
b/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -37,7 +38,8 @@ public class JSONFlattenerMakerTest
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final JSONFlattenerMaker FLATTENER_MAKER = new
JSONFlattenerMaker(true);
+ private static final JSONFlattenerMaker FLATTENER_MAKER = new
JSONFlattenerMaker(true, false);
+ private static final JSONFlattenerMaker FLATTENER_MAKER_NESTED = new
JSONFlattenerMaker(true, true);
@Test
public void testStrings() throws JsonProcessingException
@@ -169,4 +171,32 @@ public class JSONFlattenerMakerTest
result = FLATTENER_MAKER.finalizeConversionForMap(node);
Assert.assertEquals(expectedList, result);
}
+
+ @Test
+ public void testDiscovery() throws JsonProcessingException
+ {
+ Map<String, Object> theMap =
+ ImmutableMap.<String, Object>builder()
+ .put("bool", true)
+ .put("int", 1)
+ .put("long", 1L)
+ .put("float", 0.11f)
+ .put("double", 0.33)
+ .put("binary", new byte[]{0x01, 0x02, 0x03})
+ .put("list", ImmutableList.of("foo", "bar", "baz"))
+ .put("anotherList", ImmutableList.of(1, 2, 3))
+ .put("nested", ImmutableMap.of("x", 1L, "y", 2L, "z", 3L))
+ .build();
+
+ JsonNode node =
OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(theMap));
+ Assert.assertTrue(node.isObject());
+ Assert.assertEquals(
+ ImmutableSet.of("bool", "int", "long", "float", "double", "binary",
"list", "anotherList"),
+ ImmutableSet.copyOf(FLATTENER_MAKER.discoverRootFields(node))
+ );
+ Assert.assertEquals(
+ ImmutableSet.of("bool", "int", "long", "float", "double", "binary",
"list", "anotherList", "nested"),
+ ImmutableSet.copyOf(FLATTENER_MAKER_NESTED.discoverRootFields(node))
+ );
+ }
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
index e0b0fbcc51..ab6c50ef18 100644
---
a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
+++
b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
@@ -34,7 +34,7 @@ public class ObjectFlattenersTest
{
private static final String SOME_JSON = "{\"foo\": null, \"bar\": 1}";
- private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new
JSONFlattenerMaker(true);
+ private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new
JSONFlattenerMaker(true, false);
private static final ObjectFlattener FLATTENER = ObjectFlatteners.create(
new JSONPathSpec(
true,
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index 4e254c18ee..ba9d895b1f 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -92,14 +92,27 @@ public class AvroFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<Gener
private final boolean fromPigAvroStorage;
private final boolean binaryAsString;
+ private final boolean discoverNestedFields;
+
/**
- * @param fromPigAvroStorage boolean to specify the data file is stored
using AvroStorage
- * @param binaryAsString boolean to encode the byte[] as a string.
+ * @param fromPigAvroStorage boolean to specify the data file is stored
using AvroStorage
+ * @param binaryAsString if true, treat byte[] as utf8 encoded values
and coerce to strings, else leave as byte[]
+ * @param extractUnionsByType if true, unions will be extracted to
separate nested fields for each type. See
+ * {@link
GenericAvroJsonProvider#extractUnionTypes(Object)} for more details
+ * @param discoverNestedFields if true, {@link
#discoverRootFields(GenericRecord)} will return the full set of
+ * fields, else this list will be filtered to
contain only simple literals and arrays
+ * of simple literals
*/
- public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean
binaryAsString, final boolean extractUnionsByType)
+ public AvroFlattenerMaker(
+ final boolean fromPigAvroStorage,
+ final boolean binaryAsString,
+ final boolean extractUnionsByType,
+ final boolean discoverNestedFields
+ )
{
this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString;
+ this.discoverNestedFields = discoverNestedFields;
this.avroJsonProvider = new GenericAvroJsonProvider(extractUnionsByType);
this.jsonPathConfiguration =
@@ -113,6 +126,11 @@ public class AvroFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<Gener
@Override
public Set<String> discoverRootFields(final GenericRecord obj)
{
+ // if discovering nested fields, just return all root fields since we want
everything
+ // else, we filter for literals and arrays of literals
+ if (discoverNestedFields) {
+ return
obj.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toSet());
+ }
return obj.getSchema()
.getFields()
.stream()
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index 66552f9eda..3039fbb7c8 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -64,7 +64,15 @@ public class AvroOCFReader extends
IntermediateRowParsingReader<GenericRecord>
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.readerSchema = readerSchema;
- this.recordFlattener = ObjectFlatteners.create(flattenSpec, new
AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
+ this.recordFlattener = ObjectFlatteners.create(
+ flattenSpec,
+ new AvroFlattenerMaker(
+ false,
+ binaryAsString,
+ extractUnionsByType,
+
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
}
@Override
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
index 6d399a8dc6..12f8a20bab 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.avro;
import org.apache.avro.generic.GenericRecord;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -50,7 +51,17 @@ public class AvroParsers
flattenSpec = JSONPathSpec.DEFAULT;
}
- return ObjectFlatteners.create(flattenSpec, new
AvroFlattenerMaker(fromPigAvroStorage, binaryAsString, extractUnionsByType));
+ final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec();
+
+ return ObjectFlatteners.create(
+ flattenSpec,
+ new AvroFlattenerMaker(
+ fromPigAvroStorage,
+ binaryAsString,
+ extractUnionsByType,
+ dimensionsSpec != null &&
dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
}
public static List<InputRow> parseGenericRecord(
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
index 4ed6a8a2ab..51bc733d43 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -60,7 +60,15 @@ public class AvroStreamReader extends
IntermediateRowParsingReader<GenericRecord
this.inputRowSchema = inputRowSchema;
this.source = source;
this.avroBytesDecoder = avroBytesDecoder;
- this.recordFlattener = ObjectFlatteners.create(flattenSpec, new
AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
+ this.recordFlattener = ObjectFlatteners.create(
+ flattenSpec,
+ new AvroFlattenerMaker(
+ false,
+ binaryAsString,
+ extractUnionsByType,
+
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
}
@Override
diff --git
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
index 1174d2684f..28caea3049 100644
---
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
+++
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.data.input.avro;
+import com.google.common.collect.ImmutableSet;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
@@ -40,9 +41,9 @@ import java.util.Map;
public class AvroFlattenerMakerTest
{
private static final AvroFlattenerMaker
FLATTENER_WITHOUT_EXTRACT_UNION_BY_TYPE =
- new AvroFlattenerMaker(false, false, false);
+ new AvroFlattenerMaker(false, false, false, false);
private static final AvroFlattenerMaker FLATTENER_WITH_EXTRACT_UNION_BY_TYPE
=
- new AvroFlattenerMaker(false, false, true);
+ new AvroFlattenerMaker(false, false, true, false);
private static final SomeAvroDatum RECORD =
AvroStreamInputRowParserTest.buildSomeAvroDatum();
@@ -77,7 +78,7 @@ public class AvroFlattenerMakerTest
@Test
public void jsonPathExtractorExtractUnionsByType()
{
- final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false,
true);
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false,
true, false);
// Unmamed types are accessed by type
@@ -156,6 +157,59 @@ public class AvroFlattenerMakerTest
);
}
+ @Test
+ public void testDiscovery()
+ {
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false,
true, false);
+ final AvroFlattenerMaker flattenerNested = new AvroFlattenerMaker(false,
false, true, true);
+
+ SomeAvroDatum input = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+
+ Assert.assertEquals(
+ ImmutableSet.of(
+ "someOtherId",
+ "someStringArray",
+ "someIntArray",
+ "someFloat",
+ "eventType",
+ "someFixed",
+ "someBytes",
+ "someUnion",
+ "id",
+ "someEnum",
+ "someLong",
+ "someInt",
+ "timestamp"
+ ),
+ ImmutableSet.copyOf(flattener.discoverRootFields(input))
+ );
+ Assert.assertEquals(
+ ImmutableSet.of(
+ "someStringValueMap",
+ "someOtherId",
+ "someStringArray",
+ "someIntArray",
+ "someFloat",
+ "isValid",
+ "someIntValueMap",
+ "eventType",
+ "someFixed",
+ "someBytes",
+ "someRecord",
+ "someMultiMemberUnion",
+ "someNull",
+ "someRecordArray",
+ "someUnion",
+ "id",
+ "someEnum",
+ "someLong",
+ "someInt",
+ "timestamp"
+ ),
+ ImmutableSet.copyOf(flattenerNested.discoverRootFields(input))
+ );
+ }
+
private void getRootField_common(final SomeAvroDatum record, final
AvroFlattenerMaker flattener)
{
Assert.assertEquals(
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
index 73ff6a47a8..aad06bac7b 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
@@ -82,7 +82,12 @@ public class KafkaInputFormat implements InputFormat
public InputEntityReader createReader(InputRowSchema inputRowSchema,
InputEntity source, File temporaryDirectory)
{
SettableByteEntity<KafkaRecordEntity> settableByteEntitySource =
(SettableByteEntity<KafkaRecordEntity>) source;
- InputRowSchema newInputRowSchema = new InputRowSchema(dummyTimestampSpec,
inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter());
+ InputRowSchema newInputRowSchema = new InputRowSchema(
+ dummyTimestampSpec,
+ inputRowSchema.getDimensionsSpec(),
+ inputRowSchema.getColumnsFilter(),
+ inputRowSchema.getMetricNames()
+ );
return new KafkaInputReader(
inputRowSchema,
settableByteEntitySource,
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
index 9fcefcba2a..5ab8cda968 100644
---
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
+++
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
@@ -22,6 +22,7 @@ package org.apache.druid.data.input.orc;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
@@ -55,7 +56,14 @@ public class OrcHadoopInputRowParser implements
InputRowParser<OrcStruct>
} else {
flattenSpec = JSONPathSpec.DEFAULT;
}
- this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new
OrcStructFlattenerMaker(this.binaryAsString));
+ final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec();
+ this.orcStructFlattener = ObjectFlatteners.create(
+ flattenSpec,
+ new OrcStructFlattenerMaker(
+ this.binaryAsString,
+ dimensionsSpec != null &&
dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
this.parser = new MapInputRowParser(parseSpec);
}
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
index 5bcec8c80e..89ba536562 100644
---
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
+++
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -70,7 +70,13 @@ public class OrcReader extends
IntermediateRowParsingReader<OrcStruct>
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
- this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new
OrcStructFlattenerMaker(binaryAsString));
+ this.orcStructFlattener = ObjectFlatteners.create(
+ flattenSpec,
+ new OrcStructFlattenerMaker(
+ binaryAsString,
+
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
}
@Override
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java
index ee770d3e8e..016ddb3f90 100644
---
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java
+++
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java
@@ -43,7 +43,9 @@ public class OrcStructFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<
private final JsonProvider orcJsonProvider;
private final OrcStructConverter converter;
- OrcStructFlattenerMaker(boolean binaryAsString)
+ private final boolean discoverNestedFields;
+
+ OrcStructFlattenerMaker(boolean binaryAsString, boolean disocverNestedFields)
{
this.converter = new OrcStructConverter(binaryAsString);
this.orcJsonProvider = new OrcStructJsonProvider(converter);
@@ -52,11 +54,17 @@ public class OrcStructFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<
.mappingProvider(new
NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
+ this.discoverNestedFields = disocverNestedFields;
}
@Override
public Iterable<String> discoverRootFields(OrcStruct obj)
{
+ // if discovering nested fields, just return all root fields since we want
everything
+ // else, we filter for literals and arrays of literals
+ if (discoverNestedFields) {
+ return obj.getSchema().getFieldNames();
+ }
List<String> fields = obj.getSchema().getFieldNames();
List<TypeDescription> children = obj.getSchema().getChildren();
List<String> primitiveFields = new ArrayList<>();
diff --git
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
index 03083f61eb..cf8ba19d9b 100644
---
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
+++
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
@@ -52,6 +52,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
public class OrcReaderTest extends InitializedNullHandlingTest
{
@@ -421,6 +422,109 @@ public class OrcReaderTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void testNestedColumnSchemaless() throws IOException
+ {
+ final OrcInputFormat inputFormat = new OrcInputFormat(
+ new JSONPathSpec(true, ImmutableList.of()),
+ null,
+ new Configuration()
+ );
+ final InputRowSchema schema = new InputRowSchema(
+ new TimestampSpec("ts", "millis", null),
+
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
+ ColumnsFilter.all(),
+ null
+ );
+ final FileEntity entity = new FileEntity(new
File("example/orc-file-11-format.orc"));
+
+ final InputEntityReader reader = inputFormat.createReader(schema, entity,
temporaryFolder.newFolder());
+
+ List<String> dims = ImmutableList.of(
+ "boolean1",
+ "byte1",
+ "short1",
+ "int1",
+ "long1",
+ "float1",
+ "double1",
+ "bytes1",
+ "string1",
+ "middle",
+ "list",
+ "map",
+ "ts",
+ "decimal1"
+ );
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ int actualRowCount = 0;
+
+ // Check the first row
+ Assert.assertTrue(iterator.hasNext());
+ InputRow row = iterator.next();
+
+ Assert.assertEquals(dims, row.getDimensions());
+ actualRowCount++;
+ Assert.assertEquals(
+ ImmutableMap.of(
+ "list",
+ ImmutableList.of(
+ ImmutableMap.of("int1", 1, "string1", "bye"),
+ ImmutableMap.of("int1", 2, "string1", "sigh")
+ )
+ ),
+ row.getRaw("middle")
+ );
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("int1", 3, "string1", "good"),
+ ImmutableMap.of("int1", 4, "string1", "bad")
+ ),
+ row.getRaw("list")
+ );
+ Assert.assertEquals(
+ ImmutableMap.of(),
+ row.getRaw("map")
+ );
+ Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"),
row.getTimestamp());
+
+ while (iterator.hasNext()) {
+ actualRowCount++;
+ row = iterator.next();
+ Assert.assertEquals(dims, row.getDimensions());
+ }
+
+ // Check the last row
+ Assert.assertEquals(
+ ImmutableMap.of(
+ "list",
+ ImmutableList.of(
+ ImmutableMap.of("int1", 1, "string1", "bye"),
+ ImmutableMap.of("int1", 2, "string1", "sigh")
+ )
+ ),
+ row.getRaw("middle")
+ );
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("int1", 100000000, "string1", "cat"),
+ ImmutableMap.of("int1", -100000, "string1", "in"),
+ ImmutableMap.of("int1", 1234, "string1", "hat")
+ ),
+ row.getRaw("list")
+ );
+ Assert.assertEquals(
+ ImmutableMap.of(
+ "chani", ImmutableMap.of("int1", 5, "string1", "chani"),
+ "mauddib", ImmutableMap.of("int1", 1, "string1", "mauddib")
+ ),
+ row.getRaw("map")
+ );
+
+ Assert.assertEquals(7500, actualRowCount);
+ }
+ }
+
@Test
public void testListMap() throws IOException
{
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index aced1106ee..494f662516 100644
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -66,7 +66,13 @@ public class ParquetReader extends
IntermediateRowParsingReader<Group>
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
- this.flattener = ObjectFlatteners.create(flattenSpec, new
ParquetGroupFlattenerMaker(binaryAsString));
+ this.flattener = ObjectFlatteners.create(
+ flattenSpec,
+ new ParquetGroupFlattenerMaker(
+ binaryAsString,
+
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
}
@Override
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index 796891ea8e..5c2dd2844f 100755
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -32,6 +32,7 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.avro.AvroFlattenerMaker;
import org.apache.druid.data.input.avro.AvroParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
@@ -76,9 +77,16 @@ public class ParquetAvroHadoopInputRowParser implements
InputRowParser<GenericRe
flattenSpec = JSONPathSpec.DEFAULT;
}
+ final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec();
+
this.recordFlattener = ObjectFlatteners.create(
flattenSpec,
- new AvroFlattenerMaker(false, this.binaryAsString,
this.extractUnionsByType)
+ new AvroFlattenerMaker(
+ false,
+ this.binaryAsString,
+ this.extractUnionsByType,
+ dimensionsSpec != null &&
dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
+ )
);
}
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
index 7725055462..a243107cc2 100644
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
@@ -41,7 +41,9 @@ public class ParquetGroupFlattenerMaker implements
ObjectFlatteners.FlattenerMak
private final ParquetGroupConverter converter;
private final JsonProvider parquetJsonProvider;
- public ParquetGroupFlattenerMaker(boolean binaryAsString)
+ private final boolean discoverNestedFields;
+
+ public ParquetGroupFlattenerMaker(boolean binaryAsString, boolean
discoverNestedFields)
{
this.converter = new ParquetGroupConverter(binaryAsString);
this.parquetJsonProvider = new ParquetGroupJsonProvider(converter);
@@ -50,11 +52,17 @@ public class ParquetGroupFlattenerMaker implements
ObjectFlatteners.FlattenerMak
.mappingProvider(new
NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
+ this.discoverNestedFields = discoverNestedFields;
}
@Override
public Set<String> discoverRootFields(Group obj)
{
+ // if discovering nested fields, just return all root fields since we want
everything
+ // else, we filter for literals and arrays of literals
+ if (discoverNestedFields) {
+ return
obj.getType().getFields().stream().map(Type::getName).collect(Collectors.toSet());
+ }
return obj.getType()
.getFields()
.stream()
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
index 762bb709d1..f3e8539685 100644
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
@@ -22,6 +22,7 @@ package org.apache.druid.data.input.parquet.simple;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
@@ -54,7 +55,14 @@ public class ParquetHadoopInputRowParser implements
InputRowParser<Group>
} else {
flattenSpec = JSONPathSpec.DEFAULT;
}
- this.groupFlattener = ObjectFlatteners.create(flattenSpec, new
ParquetGroupFlattenerMaker(this.binaryAsString));
+ final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec();
+ this.groupFlattener = ObjectFlatteners.create(
+ flattenSpec,
+ new ParquetGroupFlattenerMaker(
+ this.binaryAsString,
+ dimensionsSpec != null &&
dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
+ )
+ );
this.parser = new MapInputRowParser(parseSpec);
}
diff --git
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
index 387ff2cf91..950039d069 100644
---
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
+++
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
@@ -162,4 +162,77 @@ public class NestedColumnParquetReaderTest extends
BaseParquetReaderTest
Assert.assertEquals(1L, rows.get(0).getRaw("t_a2_1_b1"));
Assert.assertEquals(1L, rows.get(0).getRaw("tt_a2_0_b1"));
}
+
+ @Test
+ public void testNestedColumnSchemalessNestedTestFileNoNested() throws
IOException
+ {
+ final String file = "example/flattening/test_nested_1.parquet";
+ InputRowSchema schema = new InputRowSchema(
+ new TimestampSpec("timestamp", "auto", null),
+
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(false).build(),
+ ColumnsFilter.all(),
+ null
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(true, ImmutableList.of());
+ InputEntityReader reader = createReader(
+ file,
+ schema,
+ flattenSpec
+ );
+
+ List<InputRow> rows = readAllRows(reader);
+ Assert.assertEquals(ImmutableList.of("dim1", "metric1", "timestamp"),
rows.get(0).getDimensions());
+ Assert.assertEquals(FlattenSpecParquetInputTest.TS1,
rows.get(0).getTimestamp().toString());
+ Assert.assertEquals(ImmutableList.of("d1v1"),
rows.get(0).getDimension("dim1"));
+ Assert.assertEquals("d1v1", rows.get(0).getRaw("dim1"));
+ Assert.assertEquals(ImmutableList.of("1"),
rows.get(0).getDimension("metric1"));
+ Assert.assertEquals(1, rows.get(0).getRaw("metric1"));
+ Assert.assertEquals(1, rows.get(0).getMetric("metric1"));
+ // can still read even if it doesn't get reported as a dimension
+ Assert.assertEquals(
+ ImmutableMap.of(
+ "listDim", ImmutableList.of("listDim1v1", "listDim1v2"),
+ "dim3", 1,
+ "dim2", "d2v1",
+ "metric2", 2
+ ),
+ rows.get(0).getRaw("nestedData")
+ );
+ }
+
+ @Test
+ public void testNestedColumnSchemalessNestedTestFile() throws IOException
+ {
+ final String file = "example/flattening/test_nested_1.parquet";
+ InputRowSchema schema = new InputRowSchema(
+ new TimestampSpec("timestamp", "auto", null),
+
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
+ ColumnsFilter.all(),
+ null
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(true, ImmutableList.of());
+ InputEntityReader reader = createReader(
+ file,
+ schema,
+ flattenSpec
+ );
+
+ List<InputRow> rows = readAllRows(reader);
+ Assert.assertEquals(ImmutableList.of("nestedData", "dim1", "metric1",
"timestamp"), rows.get(0).getDimensions());
+ Assert.assertEquals(FlattenSpecParquetInputTest.TS1,
rows.get(0).getTimestamp().toString());
+ Assert.assertEquals(ImmutableList.of("d1v1"),
rows.get(0).getDimension("dim1"));
+ Assert.assertEquals("d1v1", rows.get(0).getRaw("dim1"));
+ Assert.assertEquals(ImmutableList.of("1"),
rows.get(0).getDimension("metric1"));
+ Assert.assertEquals(1, rows.get(0).getRaw("metric1"));
+ Assert.assertEquals(1, rows.get(0).getMetric("metric1"));
+ Assert.assertEquals(
+ ImmutableMap.of(
+ "listDim", ImmutableList.of("listDim1v1", "listDim1v2"),
+ "dim3", 1,
+ "dim2", "d2v1",
+ "metric2", 2
+ ),
+ rows.get(0).getRaw("nestedData")
+ );
+ }
}
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
index a3314e4273..a3d69226fd 100644
---
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
@@ -56,6 +56,13 @@ public class ProtobufFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<M
private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
+ private final boolean discoverNestedFields;
+
+ public ProtobufFlattenerMaker(boolean discoverNestedFields)
+ {
+ this.discoverNestedFields = discoverNestedFields;
+ }
+
@Override
public JsonProvider getJsonProvider()
{
@@ -65,7 +72,11 @@ public class ProtobufFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<M
@Override
public Iterable<String> discoverRootFields(Map<String, Object> obj)
{
- // in the future we can just return obj.keySet(), but for now this doesnt
expect nested fields...
+ // if discovering nested fields, just return all root fields since we want
everything
+ // else, we filter for literals and arrays of literals
+ if (discoverNestedFields) {
+ return obj.keySet();
+ }
Set<String> rootFields =
Sets.newHashSetWithExpectedSize(obj.keySet().size());
for (Map.Entry<String, Object> entry : obj.entrySet()) {
if (entry.getValue() instanceof List || entry.getValue() instanceof Map)
{
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java
index 9bb50a247b..7c0881b69c 100644
---
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java
@@ -40,7 +40,12 @@ public class ProtobufInputRowSchema extends InputRowSchema
{
public ProtobufInputRowSchema(InputRowSchema inputRowSchema)
{
- super(new ProtobufTimestampSpec(inputRowSchema.getTimestampSpec()),
inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter());
+ super(
+ new ProtobufTimestampSpec(inputRowSchema.getTimestampSpec()),
+ inputRowSchema.getDimensionsSpec(),
+ inputRowSchema.getColumnsFilter(),
+ inputRowSchema.getMetricNames()
+ );
}
static class ProtobufTimestampSpec extends TimestampSpec
@@ -51,9 +56,9 @@ public class ProtobufInputRowSchema extends InputRowSchema
}
/**
- * Extracts the timestamp from the record. If the timestamp column is of
complex type such as {@link Timestamp}, then the timestamp
- * is first serialized to string via {@link JsonFormat}. Directly calling
{@code toString()} on {@code Timestamp}
- * returns an unparseable string.
+ * Extracts the timestamp from the record. If the timestamp column is of
complex type such as {@link Timestamp},
+ * then the timestamp is first serialized to string via {@link
JsonFormat}. Directly calling {@code toString()}
+ * on {@code Timestamp} returns an unparseable string.
*/
@Override
@Nullable
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
index 2dc6aa1f8c..ac77a545dd 100644
---
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
@@ -57,7 +57,10 @@ public class ProtobufReader extends
IntermediateRowParsingReader<DynamicMessage>
)
{
this.inputRowSchema = inputRowSchema;
- this.recordFlattener = ObjectFlatteners.create(flattenSpec, new
ProtobufFlattenerMaker());
+ this.recordFlattener = ObjectFlatteners.create(
+ flattenSpec,
+ new
ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery())
+ );
this.source = source;
this.protobufBytesDecoder = protobufBytesDecoder;
}
diff --git
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index 0b5d096449..9255e1e7eb 100644
---
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -328,6 +328,77 @@ public class ProtobufInputFormatTest
}
+ @Test
+ public void testParseNestedDataSchemaless() throws Exception
+ {
+ ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
+ JSONPathSpec.DEFAULT,
+ decoder
+ );
+
+ //create binary of proto test event
+ DateTime dateTime = new DateTime(2012, 7, 12, 9, 30,
ISOChronology.getInstanceUTC());
+ ProtoTestEventWrapper.ProtoTestEvent event =
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+ final ByteEntity entity = new
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+ InputEntityReader reader = protobufInputFormat.createReader(
+ new InputRowSchema(
+ timestampSpec,
+
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
+ null,
+ null
+ ),
+ entity,
+ null
+ );
+
+ TransformSpec transformSpec = new TransformSpec(
+ null,
+ Lists.newArrayList(
+ new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')",
TestExprMacroTable.INSTANCE),
+ new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')",
TestExprMacroTable.INSTANCE)
+ )
+ );
+ TransformingInputEntityReader transformingReader = new
TransformingInputEntityReader(
+ reader,
+ transformSpec.toTransformer()
+ );
+
+
+ InputRow row = transformingReader.read().next();
+
+ Assert.assertEquals(
+ ImmutableList.of(
+ "someOtherId",
+ "bar",
+ "someIntColumn",
+ "isValid",
+ "foo",
+ "description",
+ "someLongColumn",
+ "someFloatColumn",
+ "eventType",
+ "id",
+ "someBytesColumn",
+ "timestamp"
+ ),
+ row.getDimensions()
+ );
+
+ Assert.assertEquals(ImmutableMap.of("bar", "baz"), row.getRaw("foo"));
+ Assert.assertEquals(
+ ImmutableList.of(ImmutableMap.of("bar", "bar0"),
ImmutableMap.of("bar", "bar1")),
+ row.getRaw("bar")
+ );
+ Assert.assertArrayEquals(
+ new byte[]{0x01, 0x02, 0x03, 0x04},
+ (byte[]) row.getRaw("someBytesColumn")
+ );
+ ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+
+ }
+
@Test
public void testParseNestedDataTransformsOnly() throws Exception
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
index aaa2cd0789..8824decc05 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
@@ -22,10 +22,7 @@ package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.InputSource;
import org.apache.druid.java.util.common.HumanReadableBytes;
-import org.apache.druid.segment.indexing.DataSchema;
import javax.annotation.Nullable;
@@ -35,13 +32,9 @@ public class SamplerConfig
private static final int MAX_NUM_ROWS = 5000;
private static final int DEFAULT_TIMEOUT_MS = 10000;
-
-
private final int numRows;
private final int timeoutMs;
-
private final long maxBytesInMemory;
-
private final long maxClientResponseBytes;
@JsonCreator
@@ -92,13 +85,13 @@ public class SamplerConfig
/**
* Maximum number of bytes in memory that the {@link
org.apache.druid.segment.incremental.IncrementalIndex} used by
- * {@link InputSourceSampler#sample(InputSource, InputFormat, DataSchema,
SamplerConfig)} will be allowed to
- * accumulate before aborting sampling. Particularly useful for limiting
footprint of sample operations as well as
- * overall response size from sample requests. However, it is not directly
correlated to response size since it
- * also contains the "raw" input data, so actual responses will likely be at
least twice the size of this value,
- * depending on factors such as number of transforms, aggregations in the
case of rollup, whether all columns
- * of the input are present in the dimension spec, and so on. If it is
preferred to control client response size,
- * use {@link SamplerConfig#getMaxClientResponseBytes()} instead.
+ * {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource,
org.apache.druid.data.input.InputFormat,
org.apache.druid.segment.indexing.DataSchema, SamplerConfig})
+ * will be allowed to accumulate before aborting sampling. Particularly
useful for limiting footprint of sample
+ * operations as well as overall response size from sample requests.
However, it is not directly correlated to
+ * response size since it also contains the "raw" input data, so actual
responses will likely be at least twice the
+ * size of this value, depending on factors such as number of transforms,
aggregations in the case of rollup, whether
+ * all columns of the input are present in the dimension spec, and so on. If
it is preferred to control client
+ * response size, use {@link SamplerConfig#getMaxClientResponseBytes()}
instead.
*/
public long getMaxBytesInMemory()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index b3db2b71d1..c6b6d6fdf0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -252,7 +252,7 @@ public class ClientCompactionTaskQuerySerdeTest
new ParallelIndexTuningConfig(
null,
null,
- new OnheapIncrementalIndex.Spec(true, false),
+ new OnheapIncrementalIndex.Spec(true),
40000,
2000L,
null,
@@ -316,7 +316,7 @@ public class ClientCompactionTaskQuerySerdeTest
),
new ClientCompactionTaskQueryTuningConfig(
100,
- new OnheapIncrementalIndex.Spec(true, false),
+ new OnheapIncrementalIndex.Spec(true),
40000,
2000L,
30000L,
diff --git a/integration-tests-ex/cases/cluster/Common/dependencies.yaml
b/integration-tests-ex/cases/cluster/Common/dependencies.yaml
index ccfce630a2..4eeb85a49c 100644
--- a/integration-tests-ex/cases/cluster/Common/dependencies.yaml
+++ b/integration-tests-ex/cases/cluster/Common/dependencies.yaml
@@ -67,7 +67,8 @@ services:
# See https://hub.docker.com/_/mysql
# The image will intialize the user and DB upon first start.
metadata:
- # platform: linux/x86_64 - Add when running on M1 Macs
+ # Uncomment the following when running on M1 Macs:
+ # platform: linux/x86_64
image: mysql:$MYSQL_IMAGE_VERSION
container_name: metadata
command:
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java
index 86e0ece8d5..58e47bbe7e 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java
@@ -26,7 +26,6 @@ import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import javax.inject.Inject;
-
import java.sql.Connection;
import java.sql.SQLException;
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java
index c14ea745aa..e23c996c88 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java
@@ -25,7 +25,6 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.IntegrationTestingConfigProvider;
import javax.inject.Inject;
-
import java.util.Map;
import java.util.Properties;
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
index 144722eea9..7b983b9cf1 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
@@ -155,14 +155,23 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
return fileString;
}
+ protected void doTestQuery(String dataSource, String queryFilePath)
+ {
+ doTestQuery(dataSource, queryFilePath, false);
+ }
+
/**
* Reads native queries from a file and runs against the provided datasource.
*/
- protected void doTestQuery(String dataSource, String queryFilePath)
+ protected void doTestQuery(String dataSource, String queryFilePath, boolean
isSql)
{
try {
String query = getStringFromFileAndReplaceDatasource(queryFilePath,
dataSource);
- queryHelper.testQueriesFromString(query);
+ if (isSql) {
+ sqlQueryHelper.testQueriesFromString(query);
+ } else {
+ queryHelper.testQueriesFromString(query);
+ }
}
catch (Exception e) {
LOG.error(e, "Error while running test query at path " + queryFilePath);
@@ -255,6 +264,31 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
queryFilePath,
waitForNewVersion,
runTestQueries,
+ false,
+ waitForSegmentsToLoad,
+ segmentAvailabilityConfirmationPair
+ );
+ }
+
+ protected void doIndexTest(
+ String dataSource,
+ String indexTaskFilePath,
+ Function<String, String> taskSpecTransform,
+ String queryFilePath,
+ boolean waitForNewVersion,
+ boolean runTestQueries,
+ boolean waitForSegmentsToLoad,
+ Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+ ) throws IOException
+ {
+ doIndexTest(
+ dataSource,
+ indexTaskFilePath,
+ taskSpecTransform,
+ queryFilePath,
+ waitForNewVersion,
+ runTestQueries,
+ false,
waitForSegmentsToLoad,
segmentAvailabilityConfirmationPair
);
@@ -267,6 +301,7 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
String queryFilePath,
boolean waitForNewVersion,
boolean runTestQueries,
+ boolean isSqlQueries,
boolean waitForSegmentsToLoad,
Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
) throws IOException
@@ -288,7 +323,7 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
segmentAvailabilityConfirmationPair
);
if (runTestQueries) {
- doTestQuery(dataSource, queryFilePath);
+ doTestQuery(dataSource, queryFilePath, isSqlQueries);
}
}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java
index ce8a9f5c13..868ffdb8e5 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nonnull;
import java.io.Closeable;
+import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
@@ -48,6 +49,28 @@ public abstract class
AbstractLocalInputSourceParallelIndexTest extends Abstract
@Nonnull Map<String, Object> extraInputFormatMap,
Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
) throws Exception
+ {
+ doIndexTest(
+ inputFormatDetails,
+ INDEX_TASK,
+ INDEX_QUERIES_RESOURCE,
+ false,
+ Collections.emptyMap(),
+ extraInputFormatMap,
+ segmentAvailabilityConfirmationPair
+ );
+ }
+
+
+ public void doIndexTest(
+ InputFormatDetails inputFormatDetails,
+ String ingestSpecTemplate,
+ String queries,
+ boolean useSqlQueries,
+ @Nonnull Map<String, Object> templateValues,
+ @Nonnull Map<String, Object> extraInputFormatMap,
+ Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+ ) throws Exception
{
final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
Map<String, Object> inputFormatMap = new ImmutableMap.Builder<String,
Object>().putAll(extraInputFormatMap)
@@ -58,6 +81,13 @@ public abstract class
AbstractLocalInputSourceParallelIndexTest extends Abstract
) {
final Function<String, String> sqlInputSourcePropsTransform = spec -> {
try {
+ for (Map.Entry<String, Object> entry : templateValues.entrySet()) {
+ spec = StringUtils.replace(
+ spec,
+ "%%" + entry.getKey() + "%%",
+ jsonMapper.writeValueAsString(entry.getValue())
+ );
+ }
spec = StringUtils.replace(
spec,
"%%PARTITIONS_SPEC%%",
@@ -102,11 +132,12 @@ public abstract class
AbstractLocalInputSourceParallelIndexTest extends Abstract
doIndexTest(
indexDatasource,
- INDEX_TASK,
+ ingestSpecTemplate,
sqlInputSourcePropsTransform,
- INDEX_QUERIES_RESOURCE,
+ queries,
false,
true,
+ useSqlQueries,
true,
segmentAvailabilityConfirmationPair
);
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java
similarity index 55%
copy from
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
copy to
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java
index 8482952db2..61deca75ed 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java
@@ -22,22 +22,25 @@ package org.apache.druid.testsEx.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Pair;
-import
org.apache.druid.tests.indexer.AbstractLocalInputSourceParallelIndexTest;
import org.apache.druid.testsEx.categories.InputFormat;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@RunWith(DruidTestRunner.class)
@Category(InputFormat.class)
-public class ITLocalInputSourceAllInputFormatTest extends
AbstractLocalInputSourceParallelIndexTest
+public class ITLocalInputSourceAllFormatSchemalessTest extends
AbstractLocalInputSourceParallelIndexTest
{
+ private static final String INDEX_TASK =
"/indexer/wikipedia_local_input_source_index_task_schemaless.json";
+ private static final String INDEX_QUERIES_RESOURCE =
"/indexer/wikipedia_index_schemaless_queries.json";
+
@Test
- public void testAvroInputFormatIndexDataIngestionSpecWithSchema() throws
Exception
+ public void
testAvroInputFormatIndexDataIngestionSpecWithFileSchemaSchemaless() throws
Exception
{
List fieldList = ImmutableList.of(
ImmutableMap.of("name", "timestamp", "type", "string"),
@@ -61,42 +64,70 @@ public class ITLocalInputSourceAllInputFormatTest extends
AbstractLocalInputSour
"type", "record",
"name", "wikipedia",
"fields", fieldList);
- doIndexTest(InputFormatDetails.AVRO, ImmutableMap.of("schema", schema),
new Pair<>(false, false));
- }
-
- @Test
- public void testAvroInputFormatIndexDataIngestionSpecWithoutSchema() throws
Exception
- {
- doIndexTest(InputFormatDetails.AVRO, new Pair<>(false, false));
- }
-
- @Test
- public void testJsonInputFormatIndexDataIngestionSpecWithSchema() throws
Exception
- {
- doIndexTest(InputFormatDetails.JSON, new Pair<>(false, false));
+ doIndexTest(
+ AbstractITBatchIndexTest.InputFormatDetails.AVRO,
+ INDEX_TASK,
+ INDEX_QUERIES_RESOURCE,
+ true,
+ ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+ ImmutableMap.of("schema", schema),
+ new Pair<>(false, false)
+ );
}
@Test
- public void testTsvInputFormatIndexDataIngestionSpecWithSchema() throws
Exception
+ public void
testAvroInputFormatIndexDataIngestionSpecNoFileSchemaSchemaless() throws
Exception
{
- doIndexTest(InputFormatDetails.TSV,
ImmutableMap.of("findColumnsFromHeader", true), new Pair<>(false, false));
+ doIndexTest(
+ AbstractITBatchIndexTest.InputFormatDetails.AVRO,
+ INDEX_TASK,
+ INDEX_QUERIES_RESOURCE,
+ true,
+ ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+ Collections.emptyMap(),
+ new Pair<>(false, false)
+ );
}
@Test
- public void testParquetInputFormatIndexDataIngestionSpecWithSchema() throws
Exception
+ public void testJsonInputFormatIndexDataIngestionSpecSchemaless() throws
Exception
{
- doIndexTest(InputFormatDetails.PARQUET, new Pair<>(false, false));
+ doIndexTest(
+ AbstractITBatchIndexTest.InputFormatDetails.JSON,
+ INDEX_TASK,
+ INDEX_QUERIES_RESOURCE,
+ true,
+ ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+ Collections.emptyMap(),
+ new Pair<>(false, false)
+ );
}
@Test
- public void testOrcInputFormatIndexDataIngestionSpecWithSchema() throws
Exception
+ public void testParquetInputFormatIndexDataIngestionSpecSchemaless() throws
Exception
{
- doIndexTest(InputFormatDetails.ORC, new Pair<>(false, false));
+ doIndexTest(
+ AbstractITBatchIndexTest.InputFormatDetails.PARQUET,
+ INDEX_TASK,
+ INDEX_QUERIES_RESOURCE,
+ true,
+ ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+ Collections.emptyMap(),
+ new Pair<>(false, false)
+ );
}
@Test
- public void testCsvInputFormatIndexDataIngestionSpecWithSchema() throws
Exception
+ public void testOrcInputFormatIndexDataIngestionSpecSchemaless() throws
Exception
{
- doIndexTest(InputFormatDetails.CSV,
ImmutableMap.of("findColumnsFromHeader", true), new Pair<>(false, false));
+ doIndexTest(
+ AbstractITBatchIndexTest.InputFormatDetails.ORC,
+ INDEX_TASK,
+ INDEX_QUERIES_RESOURCE,
+ true,
+ ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+ Collections.emptyMap(),
+ new Pair<>(false, false)
+ );
}
}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
index 8482952db2..641d385865 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.testsEx.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Pair;
-import
org.apache.druid.tests.indexer.AbstractLocalInputSourceParallelIndexTest;
import org.apache.druid.testsEx.categories.InputFormat;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.junit.Test;
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java
index 73fc73d42d..4f19a226d9 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java
@@ -159,6 +159,7 @@ public class ITOverwriteBatchIndexTest extends
AbstractITBatchIndexTest
null,
false,
false,
+ false,
true,
new Pair<>(false, false)
);
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java
index 735bc0a50f..60d22f3f9f 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java
@@ -32,11 +32,11 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
-import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.apache.druid.testsEx.categories.HighAvailability;
import org.apache.druid.testsEx.cluster.DruidClusterClient;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.apache.druid.testsEx.config.Initializer;
+import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
import org.apache.druid.testsEx.utils.DruidClusterAdminClient;
import org.junit.Test;
import org.junit.experimental.categories.Category;
diff --git
a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json
index 928effe65e..62c3ac6371 100644
---
a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json
+++
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json
@@ -1,6 +1,6 @@
[
{
- "description": "timeseries, 1 agg, all",
+ "description": "timeboundary",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
diff --git
a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json
new file mode 100644
index 0000000000..316ec5fd8a
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json
@@ -0,0 +1,32 @@
+[
+ {
+ "description": "select all things",
+ "query": {
+ "query": "SELECT
\"__time\",\"continent\",\"country\",\"city\",\"added\",\"unpatrolled\",\"delta\",\"language\",\"robot\",\"deleted\",\"newPage\",\"namespace\",\"anonymous\",\"page\",\"region\",\"user\"
FROM \"%%DATASOURCE%%\" ORDER BY __time"
+ },
+ "expectedResults": [
+ {"__time":"2013-08-31T01:02:33.000Z","continent":"North
America","country":"United States","city":"San
Francisco","added":57,"unpatrolled":"true","delta":-143,"language":"en","robot":"false","deleted":200,"newPage":"true","namespace":"article","anonymous":"false","page":"Gypsy
Danger","region":"Bay Area","user":"nuclear"},
+
{"__time":"2013-08-31T03:32:45.000Z","continent":"Australia","country":"Australia","city":"Syndey","added":459,"unpatrolled":"false","delta":330,"language":"en","robot":"true","deleted":129,"newPage":"true","namespace":"wikipedia","anonymous":"false","page":"Striker
Eureka","region":"Cantebury","user":"speed"},
+
{"__time":"2013-08-31T07:11:21.000Z","continent":"Asia","country":"Russia","city":"Moscow","added":123,"unpatrolled":"false","delta":111,"language":"ru","robot":"true","deleted":12,"newPage":"true","namespace":"article","anonymous":"false","page":"Cherno
Alpha","region":"Oblast","user":"masterYi"},
+
{"__time":"2013-08-31T11:58:39.000Z","continent":"Asia","country":"China","city":"Taiyuan","added":905,"unpatrolled":"true","delta":900,"language":"zh","robot":"true","deleted":5,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Crimson
Typhoon","region":"Shanxi","user":"triplets"},
+
{"__time":"2013-08-31T12:41:27.000Z","continent":"Asia","country":"Japan","city":"Tokyo","added":1,"unpatrolled":"true","delta":-9,"language":"ja","robot":"true","deleted":10,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Coyote
Tango","region":"Kanto","user":"stringer"},
+ {"__time":"2013-09-01T01:02:33.000Z","continent":"North
America","country":"United States","city":"San
Francisco","added":57,"unpatrolled":"true","delta":-143,"language":"en","robot":"false","deleted":200,"newPage":"true","namespace":"article","anonymous":"false","page":"Gypsy
Danger","region":"Bay Area","user":"nuclear"},
+
{"__time":"2013-09-01T03:32:45.000Z","continent":"Australia","country":"Australia","city":"Syndey","added":459,"unpatrolled":"false","delta":330,"language":"en","robot":"true","deleted":129,"newPage":"true","namespace":"wikipedia","anonymous":"false","page":"Striker
Eureka","region":"Cantebury","user":"speed"},
+
{"__time":"2013-09-01T07:11:21.000Z","continent":"Asia","country":"Russia","city":"Moscow","added":123,"unpatrolled":"false","delta":111,"language":"ru","robot":"true","deleted":12,"newPage":"true","namespace":"article","anonymous":"false","page":"Cherno
Alpha","region":"Oblast","user":"masterYi"},
+
{"__time":"2013-09-01T11:58:39.000Z","continent":"Asia","country":"China","city":"Taiyuan","added":905,"unpatrolled":"true","delta":900,"language":"zh","robot":"true","deleted":5,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Crimson
Typhoon","region":"Shanxi","user":"triplets"},
+
{"__time":"2013-09-01T12:41:27.000Z","continent":"Asia","country":"Japan","city":"Tokyo","added":1,"unpatrolled":"true","delta":-9,"language":"ja","robot":"true","deleted":10,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Coyote
Tango","region":"Kanto","user":"stringer"}
+ ]
+ },
+ {
+ "description": "simple group by",
+ "query": {
+ "query": "SELECT page, SUM(added) as added FROM \"%%DATASOURCE%%\" WHERE
continent = 'Asia' GROUP BY 1 ORDER BY 2 DESC"
+ },
+ "expectedResults": [
+ {"page":"Crimson Typhoon","added":1810},
+ {"page":"Cherno Alpha","added":246},
+ {"page":"Coyote Tango","added":2}
+ ]
+ }
+
+]
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json
new file mode 100644
index 0000000000..bc96c730ee
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json
@@ -0,0 +1,42 @@
+{
+ "type": "index_parallel",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "timestampSpec": {
+ "column": "timestamp"
+ },
+ "dimensionsSpec": {
+ "dimensions": [],
+ "useNestedColumnIndexerForSchemaDiscovery":
%%USE_NESTED_COLUMN_INDEXER%%
+ },
+ "metricsSpec": [],
+ "granularitySpec": {
+ "segmentGranularity": "DAY",
+ "queryGranularity": "second",
+ "intervals" : [ "2013-08-31/2013-09-02" ]
+ }
+ },
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "local",
+ "filter" : "%%INPUT_SOURCE_FILTER%%",
+ "baseDir": "%%INPUT_SOURCE_BASE_DIR%%"
+ },
+ "appendToExisting": %%APPEND_TO_EXISTING%%,
+ "dropExisting": %%DROP_EXISTING%%,
+ "inputFormat": %%INPUT_FORMAT%%
+ },
+ "tuningConfig": {
+ "type": "index_parallel",
+ "maxNumConcurrentSubTasks": 4,
+ "splitHintSpec": {
+ "type": "maxSize",
+ "maxNumFiles": 1
+ },
+ "forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%,
+ "partitionsSpec": %%PARTITIONS_SPEC%%
+ }
+ }
+}
\ No newline at end of file
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index 887c931204..1269fe1e6b 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -42,8 +42,6 @@ public abstract class AppendableIndexBuilder
protected boolean preserveExistingMetrics = false;
protected boolean useMaxMemoryEstimates = true;
- protected boolean useNestedColumnIndexerForSchemaDiscovery = false;
-
protected final Logger log = new Logger(this.getClass());
public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema
incrementalIndexSchema)
@@ -132,14 +130,6 @@ public abstract class AppendableIndexBuilder
return this;
}
- public AppendableIndexBuilder setUseNestedColumnIndexerForSchemaDiscovery(
- boolean useNestedColumnIndexerForSchemaDiscovery
- )
- {
- this.useNestedColumnIndexerForSchemaDiscovery =
useNestedColumnIndexerForSchemaDiscovery;
- return this;
- }
-
public void validate()
{
if (maxRowCount <= 0) {
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
index 3af6bfa4cb..67cdabdf56 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
@@ -32,7 +32,4 @@ public interface AppendableIndexSpec
// Returns the default max bytes in memory for this index.
long getDefaultMaxBytesInMemory();
-
- @SuppressWarnings("unused")
- boolean useNestedColumnIndexerForSchemaDiscovery();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 0085b58859..ddb52c309c 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -276,8 +276,7 @@ public abstract class IncrementalIndex extends
AbstractIndex implements Iterable
final boolean deserializeComplexMetrics,
final boolean concurrentEventAdd,
final boolean preserveExistingMetrics,
- final boolean useMaxMemoryEstimates,
- final boolean useNestedColumnIndexerForSchemaDiscovery
+ final boolean useMaxMemoryEstimates
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
@@ -289,7 +288,8 @@ public abstract class IncrementalIndex extends
AbstractIndex implements Iterable
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.preserveExistingMetrics = preserveExistingMetrics;
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
- this.useNestedColumnIndexerForSchemaDiscovery =
useNestedColumnIndexerForSchemaDiscovery;
+ this.useNestedColumnIndexerForSchemaDiscovery =
incrementalIndexSchema.getDimensionsSpec()
+
.useNestedColumnIndexerForSchemaDiscovery();
this.timeAndMetricsColumnCapabilities = new HashMap<>();
this.metricDescs = Maps.newLinkedHashMap();
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 1912136a07..35ccd8961d 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -126,11 +126,16 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
// preserveExistingMetrics should only be set true for DruidInputSource
since that is the only case where we can have existing metrics
// This is currently only use by auto compaction and should not be use
for anything else.
boolean preserveExistingMetrics,
- boolean useMaxMemoryEstimates,
- boolean useNestedColumnIndexerSchemaDiscovery
+ boolean useMaxMemoryEstimates
)
{
- super(incrementalIndexSchema, deserializeComplexMetrics,
concurrentEventAdd, preserveExistingMetrics, useMaxMemoryEstimates,
useNestedColumnIndexerSchemaDiscovery);
+ super(
+ incrementalIndexSchema,
+ deserializeComplexMetrics,
+ concurrentEventAdd,
+ preserveExistingMetrics,
+ useMaxMemoryEstimates
+ );
this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE :
maxBytesInMemory;
this.facts = incrementalIndexSchema.isRollup() ? new
RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
@@ -657,8 +662,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
maxRowCount,
maxBytesInMemory,
preserveExistingMetrics,
- useMaxMemoryEstimates,
- useNestedColumnIndexerForSchemaDiscovery
+ useMaxMemoryEstimates
);
}
}
@@ -666,7 +670,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
public static class Spec implements AppendableIndexSpec
{
private static final boolean DEFAULT_PRESERVE_EXISTING_METRICS = false;
- private static final boolean
DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY = false;
public static final String TYPE = "onheap";
// When set to true, for any row that already has metric (with the same
name defined in metricSpec),
@@ -676,26 +679,19 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
// This is currently only use by auto compaction and should not be use for
anything else.
final boolean preserveExistingMetrics;
- final boolean useNestedColumnIndexerForSchemaDiscovery;
-
public Spec()
{
this.preserveExistingMetrics = DEFAULT_PRESERVE_EXISTING_METRICS;
- this.useNestedColumnIndexerForSchemaDiscovery =
DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY;
}
@JsonCreator
public Spec(
- final @JsonProperty("preserveExistingMetrics") @Nullable Boolean
preserveExistingMetrics,
- final @JsonProperty("useNestedColumnIndexerForSchemaDiscovery")
@Nullable Boolean useNestedColumnIndexerForSchemaDiscovery
+ final @JsonProperty("preserveExistingMetrics") @Nullable Boolean
preserveExistingMetrics
)
{
this.preserveExistingMetrics = preserveExistingMetrics != null
? preserveExistingMetrics
: DEFAULT_PRESERVE_EXISTING_METRICS;
- this.useNestedColumnIndexerForSchemaDiscovery =
useNestedColumnIndexerForSchemaDiscovery != null
- ?
useNestedColumnIndexerForSchemaDiscovery
- :
DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY;
}
@JsonProperty
@@ -707,8 +703,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Override
public AppendableIndexBuilder builder()
{
- return new Builder().setPreserveExistingMetrics(preserveExistingMetrics)
-
.setUseNestedColumnIndexerForSchemaDiscovery(useNestedColumnIndexerForSchemaDiscovery);
+ return new Builder().setPreserveExistingMetrics(preserveExistingMetrics);
}
@Override
@@ -720,13 +715,6 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
}
- @JsonProperty
- @Override
- public boolean useNestedColumnIndexerForSchemaDiscovery()
- {
- return useNestedColumnIndexerForSchemaDiscovery;
- }
-
@Override
public boolean equals(Object o)
{
@@ -737,14 +725,13 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
return false;
}
Spec spec = (Spec) o;
- return preserveExistingMetrics == spec.preserveExistingMetrics &&
- useNestedColumnIndexerForSchemaDiscovery ==
spec.useNestedColumnIndexerForSchemaDiscovery;
+ return preserveExistingMetrics == spec.preserveExistingMetrics;
}
@Override
public int hashCode()
{
- return Objects.hash(preserveExistingMetrics,
useNestedColumnIndexerForSchemaDiscovery);
+ return Objects.hash(preserveExistingMetrics);
}
}
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java
b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java
index 54bfe1dcb5..e5a1c41bc4 100644
---
a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java
@@ -46,7 +46,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nonnull;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -548,13 +547,12 @@ public class NestedDataColumnIndexerTest extends
InitializedNullHandlingTest
new TimestampSpec(TIME_COL, "millis", null),
Granularities.NONE,
VirtualColumns.EMPTY,
- new DimensionsSpec(Collections.emptyList()),
+
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
new AggregatorFactory[0],
false
)
)
.setMaxRowCount(1000)
- .setUseNestedColumnIndexerForSchemaDiscovery(true)
.build();
return index;
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index dad2a4c213..02d43e2750 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -125,8 +125,7 @@ public class OnheapIncrementalIndexBenchmark extends
AbstractBenchmark
maxRowCount,
maxBytesInMemory,
false,
- true,
- false
+ true
);
}
@@ -150,8 +149,7 @@ public class OnheapIncrementalIndexBenchmark extends
AbstractBenchmark
maxRowCount,
maxBytesInMemory,
false,
- true,
- false
+ true
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
similarity index 51%
copy from
processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
copy to
processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
index 3af6bfa4cb..07ed6f7502 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
@@ -19,20 +19,28 @@
package org.apache.druid.segment.incremental;
-import org.apache.druid.guice.annotations.UnstableApi;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
-/**
- * AppendableIndexSpec describes the in-memory indexing method for data
ingestion.
- */
-@UnstableApi
-public interface AppendableIndexSpec
+public class OnheapIncrementalIndexTest
{
- // Returns a builder of the appendable index.
- AppendableIndexBuilder builder();
-
- // Returns the default max bytes in memory for this index.
- long getDefaultMaxBytesInMemory();
+ private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
- @SuppressWarnings("unused")
- boolean useNestedColumnIndexerForSchemaDiscovery();
+ @Test
+ public void testSerde() throws JsonProcessingException
+ {
+ OnheapIncrementalIndex.Spec spec = new OnheapIncrementalIndex.Spec(true);
+ Assert.assertEquals(spec,
MAPPER.readValue(MAPPER.writeValueAsString(spec),
OnheapIncrementalIndex.Spec.class));
+ }
+ @Test
+ public void testSpecEqualsAndHashCode()
+ {
+ EqualsVerifier.forClass(OnheapIncrementalIndex.Spec.class)
+ .usingGetClass()
+ .verify();
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
index ad8827f2a9..7b1a7c5468 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
@@ -88,7 +88,7 @@ public class ClientCompactionTaskQueryTuningConfig
if (userCompactionTaskQueryTuningConfig == null) {
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
- new OnheapIncrementalIndex.Spec(preserveExistingMetrics, false),
+ new OnheapIncrementalIndex.Spec(preserveExistingMetrics),
null,
null,
null,
@@ -111,7 +111,7 @@ public class ClientCompactionTaskQueryTuningConfig
} else {
AppendableIndexSpec appendableIndexSpecToUse =
userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
?
userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
- : new
OnheapIncrementalIndex.Spec(preserveExistingMetrics, false);
+ : new
OnheapIncrementalIndex.Spec(preserveExistingMetrics);
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
appendableIndexSpecToUse,
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index 4a48f3280c..a0f4cf9a61 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -259,7 +259,7 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
{
final UserCompactionTaskQueryTuningConfig tuningConfig = new
UserCompactionTaskQueryTuningConfig(
40000,
- new OnheapIncrementalIndex.Spec(true, false),
+ new OnheapIncrementalIndex.Spec(true),
2000L,
null,
new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null),
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
index 9473287229..01c889ad2c 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
@@ -78,7 +78,7 @@ public class UserCompactionTaskQueryTuningConfigTest
{
final UserCompactionTaskQueryTuningConfig tuningConfig = new
UserCompactionTaskQueryTuningConfig(
40000,
- new OnheapIncrementalIndex.Spec(true, false),
+ new OnheapIncrementalIndex.Spec(true),
2000L,
null,
new SegmentsSplitHintSpec(new HumanReadableBytes(42L), null),
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index b1f3ad9952..73abcb8d5e 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -1523,7 +1523,7 @@ public class NewestSegmentFirstPolicyTest
null,
new UserCompactionTaskQueryTuningConfig(
null,
- new OnheapIncrementalIndex.Spec(true, false),
+ new OnheapIncrementalIndex.Spec(true),
null,
1000L,
null,
@@ -1558,7 +1558,7 @@ public class NewestSegmentFirstPolicyTest
null,
new UserCompactionTaskQueryTuningConfig(
null,
- new OnheapIncrementalIndex.Spec(false, false),
+ new OnheapIncrementalIndex.Spec(false),
null,
1000L,
null,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]