This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1582d74f37 Fix Parquet Reader for schema-less ingestion need to read
all columns (#13689)
1582d74f37 is described below
commit 1582d74f375bf97a26fbc3327c97f96906ab247d
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Jan 18 10:52:12 2023 -1000
Fix Parquet Reader for schema-less ingestion need to read all columns
(#13689)
* fix stuff
* address comments
---
.../apache/druid/segment/indexing/ReaderUtils.java | 17 ++++++++--
.../druid/segment/indexing/ReaderUtilsTest.java | 36 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
index b47d676081..5298f2605d 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
@@ -60,6 +60,11 @@ public class ReaderUtils
// Find columns we need to read from the flattenSpec
if (flattenSpec != null) {
+ if (dimensionsSpec.getDimensions().isEmpty() &&
flattenSpec.isUseFieldDiscovery()) {
+ // Schemaless ingestion with useFieldDiscovery needs to read all
columns
+ return fullInputSchema;
+ }
+
// Parse columns needed from flattenSpec
for (JSONPathFieldSpec fields : flattenSpec.getFields()) {
if (fields.getType() == JSONPathFieldType.ROOT) {
@@ -117,21 +122,27 @@ public class ReaderUtils
fieldsRequired.retainAll(fullInputSchema);
return fieldsRequired;
}
+ } else {
+ // Without flattenSpec, useFieldDiscovery is default to true and thus
needs to read all columns since this is
+ // schemaless
+ if (dimensionsSpec.getDimensions().isEmpty()) {
+ return fullInputSchema;
+ }
}
- // Determine any fields we need to read from parquet file that is used in
the transformSpec
+ // Determine any fields we need to read from input file that is used in
the transformSpec
List<Transform> transforms = transformSpec.getTransforms();
for (Transform transform : transforms) {
fieldsRequired.addAll(transform.getRequiredColumns());
}
- // Determine any fields we need to read from parquet file that is used in
the dimensionsSpec
+ // Determine any fields we need to read from input file that is used in
the dimensionsSpec
List<DimensionSchema> dimensionSchema = dimensionsSpec.getDimensions();
for (DimensionSchema dim : dimensionSchema) {
fieldsRequired.add(dim.getName());
}
- // Determine any fields we need to read from parquet file that is used in
the metricsSpec
+ // Determine any fields we need to read from input file that is used in
the metricsSpec
for (AggregatorFactory agg : aggregators) {
fieldsRequired.addAll(agg.requiredFields());
}
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
index 6ffe7bded6..6fddd3b4d6 100644
---
a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
+++
b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
@@ -328,4 +328,40 @@ public class ReaderUtilsTest extends
InitializedNullHandlingTest
Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
Assert.assertEquals(ImmutableSet.of("B", "C"), actual);
}
+
+ @Test
+ public void testGetColumnsRequiredForSchemalessIngestionWithoutFlattenSpec()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, null);
+ Assert.assertEquals(fullInputSchema, actual);
+ }
+
+ @Test
+ public void
testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndUseFieldDiscovery()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(fullInputSchema, actual);
+ }
+
+ @Test
+ public void
testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndNotUseFieldDiscovery()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(ImmutableSet.of("A", "C"), actual);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]