This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1582d74f37 Fix Parquet Reader for schema-less ingestion need to read 
all columns (#13689)
1582d74f37 is described below

commit 1582d74f375bf97a26fbc3327c97f96906ab247d
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Jan 18 10:52:12 2023 -1000

    Fix Parquet Reader for schema-less ingestion need to read all columns 
(#13689)
    
    * fix stuff
    
    * address comments
---
 .../apache/druid/segment/indexing/ReaderUtils.java | 17 ++++++++--
 .../druid/segment/indexing/ReaderUtilsTest.java    | 36 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java 
b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
index b47d676081..5298f2605d 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
@@ -60,6 +60,11 @@ public class ReaderUtils
 
     // Find columns we need to read from the flattenSpec
     if (flattenSpec != null) {
+      if (dimensionsSpec.getDimensions().isEmpty() && 
flattenSpec.isUseFieldDiscovery()) {
+        // Schemaless ingestion with useFieldDiscovery needs to read all 
columns
+        return fullInputSchema;
+      }
+
       // Parse columns needed from flattenSpec
       for (JSONPathFieldSpec fields : flattenSpec.getFields()) {
         if (fields.getType() == JSONPathFieldType.ROOT) {
@@ -117,21 +122,27 @@ public class ReaderUtils
         fieldsRequired.retainAll(fullInputSchema);
         return fieldsRequired;
       }
+    } else {
+      // Without flattenSpec, useFieldDiscovery is default to true and thus 
needs to read all columns since this is
+      // schemaless
+      if (dimensionsSpec.getDimensions().isEmpty()) {
+        return fullInputSchema;
+      }
     }
 
-    // Determine any fields we need to read from parquet file that is used in 
the transformSpec
+    // Determine any fields we need to read from input file that is used in 
the transformSpec
     List<Transform> transforms = transformSpec.getTransforms();
     for (Transform transform : transforms) {
       fieldsRequired.addAll(transform.getRequiredColumns());
     }
 
-    // Determine any fields we need to read from parquet file that is used in 
the dimensionsSpec
+    // Determine any fields we need to read from input file that is used in 
the dimensionsSpec
     List<DimensionSchema> dimensionSchema = dimensionsSpec.getDimensions();
     for (DimensionSchema dim : dimensionSchema) {
       fieldsRequired.add(dim.getName());
     }
 
-    // Determine any fields we need to read from parquet file that is used in 
the metricsSpec
+    // Determine any fields we need to read from input file that is used in 
the metricsSpec
     for (AggregatorFactory agg : aggregators) {
       fieldsRequired.addAll(agg.requiredFields());
     }
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java 
b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
index 6ffe7bded6..6fddd3b4d6 100644
--- 
a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
@@ -328,4 +328,40 @@ public class ReaderUtilsTest extends 
InitializedNullHandlingTest
     Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
     Assert.assertEquals(ImmutableSet.of("B", "C"), actual);
   }
+
+  @Test
+  public void testGetColumnsRequiredForSchemalessIngestionWithoutFlattenSpec()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, null);
+    Assert.assertEquals(fullInputSchema, actual);
+  }
+
+  @Test
+  public void 
testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndUseFieldDiscovery()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(fullInputSchema, actual);
+  }
+
+  @Test
+  public void 
testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndNotUseFieldDiscovery()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(ImmutableSet.of("A", "C"), actual);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to