This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f54ebbf47 Fix Parquet Parser missing column when reading parquet file 
 (#13612)
7f54ebbf47 is described below

commit 7f54ebbf478b9a2122edcc4bee4815ae86831927
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Jan 11 20:08:48 2023 -1000

    Fix Parquet Parser missing column when reading parquet file  (#13612)
    
    * fix parquet reader
    
    * fix checkstyle
    
    * fix bug
    
    * fix inspection
    
    * refactor
    
    * fix checkstyle
    
    * fix checkstyle
    
    * fix checkstyle
    
    * fix checkstyle
    
    * add test
    
    * fix checkstyle
    
    * fix tests
    
    * add IT
    
    * add IT
    
    * add more tests
    
    * fix checkstyle
    
    * fix stuff
    
    * fix stuff
    
    * add more tests
    
    * add more tests
---
 .../util/common/parsers/JSONPathFieldSpec.java     |   6 +
 .../example/compat/nested_array_struct.json        |   3 +-
 .../example/compat/parquet_thrift_compat.json      |  13 +-
 .../example/compat/proto_struct_with_array.json    |   5 +-
 .../flattening/flat_1_autodiscover_fields.json     |   7 +-
 .../example/flattening/flat_1_flatten.json         |   4 +
 .../example/flattening/flat_1_list_index.json      |   4 +
 .../flattening/nested_1_autodiscover_fields.json   |   4 +-
 .../parquet/simple/DruidParquetReadSupport.java    |  52 ++--
 .../environment-configs/override-examples/hdfs     |   2 +-
 .../druid/tests/hadoop/ITHadoopIndexTest.java      | 142 +++++++++
 .../tests/indexer/AbstractITBatchIndexTest.java    |  44 ++-
 .../wikipedia_index_data1.parquet                  | Bin 0 -> 10653 bytes
 .../wikipedia_index_data2.parquet                  | Bin 0 -> 9935 bytes
 .../wikipedia_index_data3.parquet                  | Bin 0 -> 9983 bytes
 .../wikipedia_hadoop_paquet_parser_index_data.json | 131 ++++++++
 .../wikipedia_hadoop_paquet_parser_query_data.json | 157 ++++++++++
 .../apache/druid/segment/indexing/ReaderUtils.java | 143 +++++++++
 .../druid/segment/indexing/ReaderUtilsTest.java    | 331 +++++++++++++++++++++
 19 files changed, 1006 insertions(+), 42 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathFieldSpec.java
 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathFieldSpec.java
index 442c9bcea1..9ddde9b897 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathFieldSpec.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathFieldSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.java.util.common.parsers;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.jayway.jsonpath.JsonPath;
 import org.apache.druid.utils.CollectionUtils;
 
 import java.util.List;
@@ -125,6 +126,11 @@ public class JSONPathFieldSpec
     return new JSONPathFieldSpec(JSONPathFieldType.TREE, name, null, nodes);
   }
 
+  public static String getCompilePath(String expr)
+  {
+    return JsonPath.compile(expr).getPath();
+  }
+
   @Override
   public boolean equals(final Object o)
   {
diff --git 
a/extensions-core/parquet-extensions/example/compat/nested_array_struct.json 
b/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
index 94f19ace50..6f8935f9ea 100644
--- a/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
+++ b/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
@@ -45,7 +45,8 @@
           },
           "dimensionsSpec": {
             "dimensions": [
-              "i32_dec"
+              "i32_dec",
+              "primitive"
             ],
             "dimensionExclusions": [],
             "spatialDimensions": []
diff --git 
a/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json 
b/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
index ef613dacfd..8c0ea2b4f4 100644
--- 
a/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
+++ 
b/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
@@ -44,7 +44,18 @@
             "missingValue": "2018-09-01T00:00:00.000Z"
           },
           "dimensionsSpec": {
-            "dimensions": [],
+            "dimensions": [
+              "boolColumn",
+              "byteColumn",
+              "shortColumn",
+              "intColumn",
+              "longColumn",
+              "doubleColumn",
+              "binaryColumn",
+              "stringColumn",
+              "enumColumn",
+              "stringsColumn",
+              "intSetColumn"            ],
             "dimensionExclusions": [],
             "spatialDimensions": []
           }
diff --git 
a/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
 
b/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
index 7909e0ee12..b448a3fa8f 100644
--- 
a/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
+++ 
b/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
@@ -49,7 +49,10 @@
             "missingValue": "2018-09-01T00:00:00.000Z"
           },
           "dimensionsSpec": {
-            "dimensions": [],
+            "dimensions": [
+              "optionalPrimitive",
+              "requiredPrimitive"
+            ],
             "dimensionExclusions": [],
             "spatialDimensions": []
           }
diff --git 
a/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
 
b/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
index 3ded41c2b5..c23ecdc5d4 100644
--- 
a/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
+++ 
b/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
@@ -32,7 +32,12 @@
             "format": "auto"
           },
           "dimensionsSpec": {
-            "dimensions": [],
+            "dimensions": [
+              "dim1",
+              "dim2",
+              "dim3",
+              "listDim"
+            ],
             "dimensionExclusions": [],
             "spatialDimensions": []
           }
diff --git 
a/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json 
b/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
index 760dea7757..a8d6a82c12 100644
--- a/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
+++ b/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
@@ -42,6 +42,10 @@
                 "type": "root",
                 "name": "dim3"
               },
+              {
+                "type": "root",
+                "name": "metric1"
+              },
               {
                 "type": "path",
                 "name": "list",
diff --git 
a/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json 
b/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
index 219afde889..6b65237d3c 100644
--- 
a/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
+++ 
b/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
@@ -38,6 +38,10 @@
                 "type": "root",
                 "name": "dim2"
               },
+              {
+                "type": "root",
+                "name": "metric1"
+              },
               {
                 "type": "path",
                 "name": "listextracted",
diff --git 
a/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
 
b/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
index 05171c0f13..3d7490bda8 100644
--- 
a/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
+++ 
b/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
@@ -32,7 +32,9 @@
             "format": "auto"
           },
           "dimensionsSpec": {
-            "dimensions": [],
+            "dimensions": [
+              "dim1"
+            ],
             "dimensionExclusions": [],
             "spatialDimensions": []
           }
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
index a1ed46c0f6..b68c4f1dd7 100644
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
@@ -19,22 +19,25 @@
 
 package org.apache.druid.data.input.parquet.simple;
 
-import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.indexer.HadoopDruidIndexerConfig;
-import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.segment.indexing.ReaderUtils;
 import org.apache.parquet.hadoop.api.InitContext;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class DruidParquetReadSupport extends GroupReadSupport
 {
+  private static final Logger LOG = new Logger(DruidParquetReadSupport.class);
+
   /**
    * Select the columns from the parquet schema that are used in the schema of 
the ingestion job
    *
@@ -44,45 +47,36 @@ public class DruidParquetReadSupport extends 
GroupReadSupport
    */
   private MessageType getPartialReadSchema(InitContext context)
   {
-    MessageType fullSchema = context.getFileSchema();
+    List<Type> partialFields = new ArrayList<>();
 
+    MessageType fullSchema = context.getFileSchema();
     String name = fullSchema.getName();
 
     HadoopDruidIndexerConfig config = 
HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
     ParseSpec parseSpec = config.getParser().getParseSpec();
-
-    // this is kind of lame, maybe we can still trim what we read if we
-    // parse the flatten spec and determine it isn't auto discovering props?
-    if (parseSpec instanceof ParquetParseSpec) {
-      if (((ParquetParseSpec) parseSpec).getFlattenSpec() != null) {
-        return fullSchema;
-      }
+    JSONPathSpec flattenSpec = null;
+    if (parseSpec instanceof ParquetParseSpec && ((ParquetParseSpec) 
parseSpec).getFlattenSpec() != null) {
+      flattenSpec = ((ParquetParseSpec) parseSpec).getFlattenSpec();
     }
+    Set<String> fullSchemaFields = 
fullSchema.getFields().stream().map(Type::getName).collect(Collectors.toSet());
 
-    String tsField = parseSpec.getTimestampSpec().getTimestampColumn();
-
-    List<DimensionSchema> dimensionSchema = 
parseSpec.getDimensionsSpec().getDimensions();
-    Set<String> dimensions = new HashSet<>();
-    for (DimensionSchema dim : dimensionSchema) {
-      dimensions.add(dim.getName());
-    }
-
-    Set<String> metricsFields = new HashSet<>();
-    for (AggregatorFactory agg : 
config.getSchema().getDataSchema().getAggregators()) {
-      metricsFields.addAll(agg.requiredFields());
-    }
-
-    List<Type> partialFields = new ArrayList<>();
+    Set<String> requiredFields = ReaderUtils.getColumnsRequiredForIngestion(
+        fullSchemaFields,
+        parseSpec.getTimestampSpec(),
+        parseSpec.getDimensionsSpec(),
+        config.getSchema().getDataSchema().getTransformSpec(),
+        config.getSchema().getDataSchema().getAggregators(),
+        flattenSpec
+    );
 
     for (Type type : fullSchema.getFields()) {
-      if (tsField.equals(type.getName())
-          || metricsFields.contains(type.getName())
-          || dimensions.size() > 0 && dimensions.contains(type.getName())
-          || dimensions.size() == 0) {
+      if (requiredFields.contains(type.getName())) {
         partialFields.add(type);
       }
     }
 
+    LOG.info("Parquet schema name[%s] with full schema[%s] requires 
fields[%s]", name, fullSchemaFields, requiredFields);
+
     return new MessageType(name, partialFields);
   }
 
diff --git 
a/integration-tests/docker/environment-configs/override-examples/hdfs 
b/integration-tests/docker/environment-configs/override-examples/hdfs
index 63b851e508..2e25b929b7 100644
--- a/integration-tests/docker/environment-configs/override-examples/hdfs
+++ b/integration-tests/docker/environment-configs/override-examples/hdfs
@@ -21,4 +21,4 @@ druid_storage_storageDirectory=/druid/segments
 # Depending on the test, additional extension(s) may be required.
 # Please refer to the other 
integration-tests/docker/environment-configs/override-examples/ files and Druid 
docs for
 # additional env vars to provide for each extension.
-druid_extensions_loadList=["druid-hdfs-storage", "druid-parquet-extensions", 
"druid-orc-extensions"]
\ No newline at end of file
+druid_extensions_loadList=["druid-hdfs-storage", 
"mysql-metadata-storage","druid-basic-security","simple-client-sslcontext","druid-testing-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches","druid-parquet-extensions","druid-avro-extensions","druid-protobuf-extensions","druid-orc-extensions","druid-kafka-indexing-service","druid-s3-extensions"]
\ No newline at end of file
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
index a61fae4e0e..b8ab40bb49 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.tests.hadoop;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -65,6 +66,9 @@ public class ITHadoopIndexTest extends 
AbstractITBatchIndexTest
   private static final String BATCH_QUERIES_RESOURCE = 
"/hadoop/batch_hadoop_queries.json";
   private static final String BATCH_DATASOURCE = "batchLegacyHadoop";
 
+  private static final String BATCH_TASK_WITH_PARQUET_PARSER_RENAME = 
"/hadoop/wikipedia_hadoop_paquet_parser_index_data.json";
+  private static final String BATCH_QUERIES_RESOURCE_FOR_PARQUET_PARSER_RENAME 
= "/hadoop/wikipedia_hadoop_paquet_parser_query_data.json";
+
   private static final String INDEX_TASK = 
"/hadoop/wikipedia_hadoop_index_task.json";
   private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
   private static final String INDEX_DATASOURCE = "wikipedia_hadoop_index_test";
@@ -133,6 +137,144 @@ public class ITHadoopIndexTest extends 
AbstractITBatchIndexTest
     }
   }
 
+  @Test
+  public void testHadoopParquetParserWithRenameIndexTest() throws Exception
+  {
+    String indexDatasource = BATCH_DATASOURCE + "_" + UUID.randomUUID();
+    try (
+        final Closeable ignored0 = unloader(indexDatasource + 
config.getExtraDatasourceNameSuffix());
+    ) {
+      final Function<String, String> specPathsTransform = spec -> {
+        try {
+          String path = "/batch_index/parquet";
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_PATHS%%",
+              path
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%TRANSFORMS%%",
+              jsonMapper.writeValueAsString(
+                  ImmutableList.of(
+                      ImmutableMap.of("type", "expression", "name", 
"userTransformed", "expression", "user"),
+                      ImmutableMap.of("type", "expression", "name", 
"regionAndCity", "expression", "concat(region,city)")
+                  )
+              )
+          );
+
+          return spec;
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      final Function<String, String> queryTransform = query -> {
+        try {
+          query = StringUtils.replace(
+              query,
+              "%%EXPECTED_NUMBER_SUM_RESULT%%",
+              jsonMapper.writeValueAsString(
+                  ImmutableMap.of("sum_added", 3090, "sum_deleted", 712, 
"sum_delta", 2378)
+              )
+          );
+          return query;
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      doIndexTest(
+          indexDatasource,
+          BATCH_TASK_WITH_PARQUET_PARSER_RENAME,
+          specPathsTransform,
+          BATCH_QUERIES_RESOURCE_FOR_PARQUET_PARSER_RENAME,
+          queryTransform,
+          false,
+          true,
+          true,
+          new Pair<>(false, false)
+      );
+    }
+  }
+
+  @Test
+  public void testHadoopParquetParserWithDifferentSchemaTest() throws Exception
+  {
+    /*
+    This test reads from three parquet files, each with a different schema.
+    The difference in the schema is shown below:
+    File 1's columns: userRenamed, continent, country, added, deleted, 
deltaRenamed
+    File 2's columns: user, continentRenamed, country, added, delta
+    File 3's columns: user, continent, countryRenamed, deleted, delta
+
+    Note that `deleted` was dropped from File 2 and `added` was dropped from 
File 3
+     */
+    String indexDatasource = BATCH_DATASOURCE + "_" + UUID.randomUUID();
+    try (
+        final Closeable ignored0 = unloader(indexDatasource + 
config.getExtraDatasourceNameSuffix());
+    ) {
+      final Function<String, String> specPathsTransform = spec -> {
+        try {
+          String path = "/batch_index/multiple_schema_parquet";
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_PATHS%%",
+              path
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%TRANSFORMS%%",
+              jsonMapper.writeValueAsString(
+                  ImmutableList.of(
+                      ImmutableMap.of("type", "expression", "name", 
"userTransformed", "expression", "nvl(user,userRenamed)"),
+                      ImmutableMap.of("type", "expression", "name", 
"countryFlat", "expression", "nvl(countryFlat,countryRenamed)"),
+                      ImmutableMap.of("type", "expression", "name", 
"continentFlat", "expression", "nvl(continentFlat,continentRenamed)"),
+                      ImmutableMap.of("type", "expression", "name", "delta", 
"expression", "nvl(delta,deltaRenamed)"),
+                      ImmutableMap.of("type", "expression", "name", 
"regionAndCity", "expression", "concat(region,city)")
+                  )
+              )
+          );
+
+          return spec;
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      final Function<String, String> queryTransform = query -> {
+        try {
+          query = StringUtils.replace(
+              query,
+              "%%EXPECTED_NUMBER_SUM_RESULT%%",
+              jsonMapper.writeValueAsString(
+                  ImmutableMap.of("sum_added", 1602, "sum_deleted", 497, 
"sum_delta", 2378)
+              )
+          );
+          return query;
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      doIndexTest(
+          indexDatasource,
+          BATCH_TASK_WITH_PARQUET_PARSER_RENAME,
+          specPathsTransform,
+          BATCH_QUERIES_RESOURCE_FOR_PARQUET_PARSER_RENAME,
+          queryTransform,
+          false,
+          true,
+          true,
+          new Pair<>(false, false)
+      );
+    }
+  }
+
   @Test(dataProvider = "resources")
   public void testIndexData(DimensionBasedPartitionsSpec partitionsSpec) 
throws Exception
   {
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index 06f692162e..8bcfaa0df1 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -133,6 +133,31 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
       boolean waitForSegmentsToLoad,
       Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
   ) throws IOException
+  {
+    doIndexTest(
+        dataSource,
+        indexTaskFilePath,
+        taskSpecTransform,
+        queryFilePath,
+        Function.identity(),
+        waitForNewVersion,
+        runTestQueries,
+        waitForSegmentsToLoad,
+        segmentAvailabilityConfirmationPair
+    );
+  }
+
+  protected void doIndexTest(
+      String dataSource,
+      String indexTaskFilePath,
+      Function<String, String> taskSpecTransform,
+      String queryFilePath,
+      Function<String, String> queryTransform,
+      boolean waitForNewVersion,
+      boolean runTestQueries,
+      boolean waitForSegmentsToLoad,
+      Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+  ) throws IOException
   {
     final String fullDatasourceName = dataSource + 
config.getExtraDatasourceNameSuffix();
     final String taskSpec = taskSpecTransform.apply(
@@ -151,11 +176,16 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
         segmentAvailabilityConfirmationPair
     );
     if (runTestQueries) {
-      doTestQuery(dataSource, queryFilePath);
+      doTestQuery(dataSource, queryFilePath, queryTransform);
     }
   }
 
   protected void doTestQuery(String dataSource, String queryFilePath)
+  {
+    doTestQuery(dataSource, queryFilePath, Function.identity());
+  }
+
+  protected void doTestQuery(String dataSource, String queryFilePath, 
Function<String, String> queryTransform)
   {
     try {
       String queryResponseTemplate;
@@ -166,14 +196,14 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
       catch (IOException e) {
         throw new ISE(e, "could not read query file: %s", queryFilePath);
       }
-
-      queryResponseTemplate = StringUtils.replace(
-          queryResponseTemplate,
-          "%%DATASOURCE%%",
-          dataSource + config.getExtraDatasourceNameSuffix()
+      queryResponseTemplate = queryTransform.apply(
+          StringUtils.replace(
+              queryResponseTemplate,
+              "%%DATASOURCE%%",
+              dataSource + config.getExtraDatasourceNameSuffix()
+          )
       );
       queryHelper.testQueriesFromString(queryResponseTemplate);
-
     }
     catch (Exception e) {
       LOG.error(e, "Error while testing");
diff --git 
a/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data1.parquet
 
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data1.parquet
new file mode 100644
index 0000000000..f211113a41
Binary files /dev/null and 
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data1.parquet
 differ
diff --git 
a/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data2.parquet
 
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data2.parquet
new file mode 100644
index 0000000000..f42f951f07
Binary files /dev/null and 
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data2.parquet
 differ
diff --git 
a/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data3.parquet
 
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data3.parquet
new file mode 100644
index 0000000000..4eaa9716ae
Binary files /dev/null and 
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data3.parquet
 differ
diff --git 
a/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_index_data.json
 
b/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_index_data.json
new file mode 100644
index 0000000000..fd474677dc
--- /dev/null
+++ 
b/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_index_data.json
@@ -0,0 +1,131 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "%%DATASOURCE%%",
+      "parser": {
+        "parseSpec": {
+          "dimensionsSpec": {
+            "dimensionExclusions": [],
+            "dimensions": [
+              "page",
+              {"type": "string", "name": "language", "createBitmapIndex": 
false},
+              "userTransformed",
+              "unpatrolled",
+              "newPage",
+              "robot",
+              "anonymous",
+              "namespace",
+              "continentFlat",
+              "countryFlat",
+              "regionAndCity"
+            ],
+            "spatialDimensions": []
+          },
+          "format": "parquet",
+          "timestampSpec": {
+            "column": "timestamp"
+          },
+          "flattenSpec": {
+            "fields": [
+              {
+                "type": "path",
+                "name": "countryFlat",
+                "expr": "$.country"
+              },
+              {
+                "type": "root",
+                "expr": "continent",
+                "name": "continentFlat"
+              },
+              {
+                "type": "root",
+                "name": "namespace"
+              }
+            ]
+          }
+        },
+        "binaryAsString": true,
+        "type": "parquet"
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "added",
+          "fieldName": "added"
+        },
+        {
+          "type": "doubleSum",
+          "name": "deleted",
+          "fieldName": "deleted"
+        },
+        {
+          "type": "doubleSum",
+          "name": "delta",
+          "fieldName": "delta"
+        },
+        {
+          "name": "thetaSketch",
+          "type": "thetaSketch",
+          "fieldName": "userTransformed"
+        },
+        {
+          "name": "quantilesDoublesSketch",
+          "type": "quantilesDoublesSketch",
+          "fieldName": "delta"
+        },
+        {
+          "name": "HLLSketchBuild",
+          "type": "HLLSketchBuild",
+          "fieldName": "userTransformed"
+        }
+      ],
+      "granularitySpec": {
+        "segmentGranularity": "DAY",
+        "queryGranularity": "second",
+        "intervals" : [ "2013-08-31/2013-09-02" ]
+      },
+      "transformSpec": {
+        "filter": null,
+        "transforms": %%TRANSFORMS%%
+      }
+    },
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "inputFormat": 
"org.apache.druid.data.input.parquet.DruidParquetInputFormat",
+        "type": "static",
+        "paths": "%%INPUT_PATHS%%"
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "partitionsSpec": {
+        "assumeGrouped": true,
+        "targetPartitionSize": 75000,
+        "type": "hashed"
+      },
+      "jobProperties": {
+        "fs.permissions.umask-mode": "022",
+        "fs.default.name" : "hdfs://druid-it-hadoop:9000",
+        "fs.defaultFS" : "hdfs://druid-it-hadoop:9000",
+        "dfs.datanode.address" : "druid-it-hadoop",
+        "dfs.client.use.datanode.hostname" : "true",
+        "dfs.datanode.use.datanode.hostname" : "true",
+        "yarn.resourcemanager.hostname" : "druid-it-hadoop",
+        "yarn.nodemanager.vmem-check-enabled" : "false",
+        "mapreduce.job.classloader": "true",
+        "mapreduce.map.java.opts" : "-Duser.timezone=UTC 
-Dfile.encoding=UTF-8",
+        "mapreduce.job.user.classpath.first" : "true",
+        "mapreduce.reduce.java.opts" : "-Duser.timezone=UTC 
-Dfile.encoding=UTF-8",
+        "mapreduce.map.memory.mb" : 1024,
+        "mapreduce.reduce.memory.mb" : 1024
+      },
+      "rowFlushBoundary": 10000
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_query_data.json
 
b/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_query_data.json
new file mode 100644
index 0000000000..a0924a9870
--- /dev/null
+++ 
b/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_query_data.json
@@ -0,0 +1,157 @@
+[
+  {
+    "description": "timeseries, 1 agg, all",
+    "query":{
+      "queryType" : "timeBoundary",
+      "dataSource": "%%DATASOURCE%%"
+    },
+    "expectedResults":[
+      {
+        "timestamp" : "2013-08-31T01:02:33.000Z",
+        "result" : {
+          "minTime" : "2013-08-31T01:02:33.000Z",
+          "maxTime" : "2013-09-01T12:41:27.000Z"
+        }
+      }
+    ]
+  },
+  {
+    "description": "timeseries, datasketch aggs, all",
+    "query":{
+      "queryType" : "timeseries",
+      "dataSource": "%%DATASOURCE%%",
+      "granularity":"week",
+      "intervals":[
+        "2013-08-31T00:00/2013-09-02T00:00"
+      ],
+      "filter":null,
+      "aggregations":[
+        {
+          "type": "HLLSketchMerge",
+          "name": "approxCountHLL",
+          "fieldName": "HLLSketchBuild",
+          "lgK": 12,
+          "tgtHllType": "HLL_4",
+          "round": true
+        },
+        {
+          "type":"thetaSketch",
+          "name":"approxCountTheta",
+          "fieldName":"thetaSketch",
+          "size":16384,
+          "shouldFinalize":true,
+          "isInputThetaSketch":false,
+          "errorBoundsStdDev":null
+        },
+        {
+          "type":"quantilesDoublesSketch",
+          "name":"quantilesSketch",
+          "fieldName":"quantilesDoublesSketch",
+          "k":128
+        }
+      ]
+    },
+    "expectedResults":[
+      {
+        "timestamp" : "2013-08-26T00:00:00.000Z",
+        "result" : {
+          "quantilesSketch":10,
+          "approxCountTheta":5.0,
+          "approxCountHLL":5
+        }
+      }
+    ]
+  },
+  {
+    "description": "timeseries, stringFirst/stringLast aggs, all",
+    "query":{
+      "queryType" : "timeseries",
+      "dataSource": "%%DATASOURCE%%",
+      "granularity":"week",
+      "intervals":[
+        "2013-08-31T00:00/2013-09-02T00:00"
+      ],
+      "filter":null,
+      "aggregations":[
+        {
+          "type": "stringFirst",
+          "name": "first_user",
+          "fieldName": "userTransformed"
+        },
+        {
+          "type":"stringLast",
+          "name":"last_user",
+          "fieldName":"userTransformed"
+        },
+        {
+          "type": "stringFirst",
+          "name": "first_countryFlat",
+          "fieldName": "countryFlat"
+        },
+        {
+          "type": "stringFirst",
+          "name": "first_continentFlat",
+          "fieldName": "continentFlat"
+        },
+        {
+          "type": "stringFirst",
+          "name": "first_namespace",
+          "fieldName": "namespace"
+        },
+        {
+          "type": "stringFirst",
+          "name": "first_regionAndCity",
+          "fieldName": "regionAndCity"
+        }
+      ]
+    },
+    "expectedResults":[
+      {
+        "timestamp" : "2013-08-26T00:00:00.000Z",
+        "result" : {
+          "first_user":"nuclear",
+          "first_continentFlat":"North America",
+          "first_namespace":"article",
+          "last_user":"stringer",
+          "first_regionAndCity":"Bay AreaSan Francisco",
+          "first_countryFlat":"United States"
+        }
+      }
+    ]
+  },
+  {
+    "description": "timeseries, number sum aggs, all",
+    "query":{
+      "queryType" : "timeseries",
+      "dataSource": "%%DATASOURCE%%",
+      "granularity":"week",
+      "intervals":[
+        "2013-08-31T00:00/2013-09-02T00:00"
+      ],
+      "filter":null,
+      "aggregations":[
+        {
+          "type": "longSum",
+          "name": "sum_added",
+          "fieldName": "added"
+        },
+        {
+          "type":"longSum",
+          "name":"sum_deleted",
+          "fieldName":"deleted"
+        },
+        {
+          "type": "longSum",
+          "name": "sum_delta",
+          "fieldName": "delta"
+        }
+      ]
+    },
+    "expectedResults":[
+      {
+        "timestamp" : "2013-08-26T00:00:00.000Z",
+        "result" : %%EXPECTED_NUMBER_SUM_RESULT%%
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java 
b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
new file mode 100644
index 0000000000..b47d676081
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.transform.Transform;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import javax.annotation.Nullable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ReaderUtils
+{
+  private static final Logger LOG = new Logger(ReaderUtils.class);
+
+  private static final Pattern JSON_PATH_PATTERN = 
Pattern.compile("\\[(.*?)]");
+  private static final Pattern BRACKET_NOTATED_CHILD_PATTERN = 
Pattern.compile("'(.*?)'");
+
+  public static Set<String> getColumnsRequiredForIngestion(
+      Set<String> fullInputSchema,
+      TimestampSpec timestampSpec,
+      DimensionsSpec dimensionsSpec,
+      TransformSpec transformSpec,
+      AggregatorFactory[] aggregators,
+      @Nullable JSONPathSpec flattenSpec
+  )
+  {
+    Set<String> fieldsRequired = new HashSet<>();
+
+    // We need to read timestamp column for Druid timestamp field
+    fieldsRequired.add(timestampSpec.getTimestampColumn());
+
+    // Find columns we need to read from the flattenSpec
+    if (flattenSpec != null) {
+      // Parse columns needed from flattenSpec
+      for (JSONPathFieldSpec fields : flattenSpec.getFields()) {
+        if (fields.getType() == JSONPathFieldType.ROOT) {
+          // ROOT type just get top level field using the expr as the key
+          fieldsRequired.add(fields.getExpr());
+        } else if (fields.getType() == JSONPathFieldType.PATH) {
+          // Parse PATH type to determine columns needed
+          String parsedPath;
+          try {
+            parsedPath = JSONPathFieldSpec.getCompilePath(fields.getExpr());
+          }
+          catch (Exception e) {
+            // We can skip columns used in this path as the path is invalid
+            LOG.debug("Ignoring columns from JSON path [%s] as path expression 
is invalid", fields.getExpr());
+            continue;
+          }
+          // Remove the $
+          parsedPath = parsedPath.substring(1);
+          // If the first level is a deep scan, then we need all columns
+          if (parsedPath.length() >= 2 && "..".equals(parsedPath.substring(0, 
2))) {
+            return fullInputSchema;
+          }
+          Matcher jsonPathMatcher = JSON_PATH_PATTERN.matcher(parsedPath);
+          if (!jsonPathMatcher.find()) {
+            LOG.warn("Failed to parse JSON path for required column from path 
[%s]", fields.getExpr());
+            return fullInputSchema;
+          }
+          String matchedGroup = jsonPathMatcher.group();
+          Matcher childMatcher = 
BRACKET_NOTATED_CHILD_PATTERN.matcher(matchedGroup);
+          if (childMatcher.find()) {
+            // Get name of the column from bracket-notated child i.e. 
['region']
+            childMatcher.reset();
+            while (childMatcher.find()) {
+              String columnName = childMatcher.group();
+              // Remove the quote around column name
+              fieldsRequired.add(columnName.substring(1, columnName.length() - 
1));
+            }
+          } else if ("[*]".equals(matchedGroup)) {
+            // If the first level is a wildcard, then we need all columns
+            return fullInputSchema;
+          } else {
+            // This can happen if it is a filter expression, slice operator, 
or index / indexes
+            // We just return all columns...
+            return fullInputSchema;
+          }
+        } else {
+          // Others type aren't supported but returning full schema just in 
case...
+          LOG.warn("Got unexpected JSONPathFieldType [%s]", fields.getType());
+          return fullInputSchema;
+        }
+      }
+      // If useFieldDiscovery is false then we have already determined all the 
columns we need to read from
+      // (as only explicitly specified fields will be available to use in the 
other specs)
+      if (!flattenSpec.isUseFieldDiscovery()) {
+        fieldsRequired.retainAll(fullInputSchema);
+        return fieldsRequired;
+      }
+    }
+
+    // Determine any fields we need to read from parquet file that is used in 
the transformSpec
+    List<Transform> transforms = transformSpec.getTransforms();
+    for (Transform transform : transforms) {
+      fieldsRequired.addAll(transform.getRequiredColumns());
+    }
+
+    // Determine any fields we need to read from parquet file that is used in 
the dimensionsSpec
+    List<DimensionSchema> dimensionSchema = dimensionsSpec.getDimensions();
+    for (DimensionSchema dim : dimensionSchema) {
+      fieldsRequired.add(dim.getName());
+    }
+
+    // Determine any fields we need to read from parquet file that is used in 
the metricsSpec
+    for (AggregatorFactory agg : aggregators) {
+      fieldsRequired.addAll(agg.requiredFields());
+    }
+
+    // Only required fields that actually exist in the input schema
+    fieldsRequired.retainAll(fullInputSchema);
+    return fieldsRequired;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java 
b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
new file mode 100644
index 0000000000..6ffe7bded6
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.transform.ExpressionTransform;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class ReaderUtilsTest extends InitializedNullHandlingTest
+{
+
+  private final Set<String> fullInputSchema = ImmutableSet.of("A", "B", "C", 
"D", "E", "F", "G", "H", "I");
+
+  @Test
+  public void 
testGetColumnsRequiredForIngestionWithoutMetricsWithoutTransformAndWithoutFlatten()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new LongDimensionSchema("C"),
+            new FloatDimensionSchema("D")
+        )
+    );
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, null);
+    Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D"), actual);
+  }
+
+  @Test
+  public void 
testGetColumnsRequiredForIngestionWithoutTransformAndWithoutFlatten()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new LongDimensionSchema("C"),
+            new FloatDimensionSchema("D")
+        )
+    );
+    AggregatorFactory[] aggregators = new AggregatorFactory[]{
+        new CountAggregatorFactory("custom_count"),
+        new LongSumAggregatorFactory("custom_long_sum", "E"),
+        new FloatMinAggregatorFactory("custom_float_min", "F")
+    };
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, aggregators, null);
+    Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D", "E", "F"), actual);
+  }
+
+  @Test
+  public void 
testGetColumnsRequiredForIngestionWithTransformAndWithoutFlatten()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new StringDimensionSchema("C*"),
+            new LongDimensionSchema("D*"),
+            new FloatDimensionSchema("E*"),
+            new LongDimensionSchema("G")
+        )
+    );
+
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(
+            // Function and rename
+            new ExpressionTransform("C*", "json_value(C, '$.dim2')", 
TestExprMacroTable.INSTANCE),
+            // Rename
+            new ExpressionTransform("D*", "D", TestExprMacroTable.INSTANCE),
+            // Function with multiple input columns
+            new ExpressionTransform("E*", "concat(E, F)", 
TestExprMacroTable.INSTANCE),
+            // Function with same name
+            new ExpressionTransform("G", "CAST(G, LONG)", 
TestExprMacroTable.INSTANCE)
+        )
+    );
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, transformSpec, new AggregatorFactory[]{}, null);
+    Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D", "E", "F", "G"), 
actual);
+  }
+
+  @Test
+  public void 
testGetColumnsRequiredForIngestionWithFlattenAndUseFieldDiscoveryFalse()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new StringDimensionSchema("C*"),
+            new StringDimensionSchema("D*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("E*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("F*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("H")
+        )
+    );
+
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$.D.M[*].T")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new ExpressionTransform("E*", "E", TestExprMacroTable.INSTANCE),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new ExpressionTransform("F*", "concat(F, G)", 
TestExprMacroTable.INSTANCE)
+        )
+    );
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, transformSpec, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D"), actual);
+  }
+
+  @Test
+  public void 
testGetColumnsRequiredForIngestionWithFlattenAndUseFieldDiscoveryTrue()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new StringDimensionSchema("C*"),
+            new StringDimensionSchema("D*"),
+            new StringDimensionSchema("E*"),
+            new StringDimensionSchema("F*"),
+            new StringDimensionSchema("H")
+        )
+    );
+
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$.D.M[*].T")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
+
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(
+            new ExpressionTransform("E*", "E", TestExprMacroTable.INSTANCE),
+            new ExpressionTransform("F*", "concat(F, G)", 
TestExprMacroTable.INSTANCE)
+        )
+    );
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, transformSpec, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D", "E", "F", "G", 
"H"), actual);
+  }
+
+  @Test
+  public void testGetColumnsRequiredForIngestionWithFlattenDeepScan()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new StringDimensionSchema("C*"),
+            new StringDimensionSchema("D*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("E*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("F*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("H")
+        )
+    );
+
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+        // This is doing a deep scan (need to read all columns)
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$..D")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(fullInputSchema, actual);
+  }
+
+  @Test
+  public void testGetColumnsRequiredForIngestionWithFlattenWildcard()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new StringDimensionSchema("C*"),
+            new StringDimensionSchema("D*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("E*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("F*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("H")
+        )
+    );
+
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+        // This is doing a wildcard (need to read all columns)
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$.*")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(fullInputSchema, actual);
+  }
+
+  @Test
+  public void testGetColumnsRequiredForIngestionWithUnmatchedGroup()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new StringDimensionSchema("C*"),
+            new StringDimensionSchema("D*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("E*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("F*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("H")
+        )
+    );
+
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+        // This is doing a wildcard (need to read all columns)
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$.[2:4].D")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(fullInputSchema, actual);
+  }
+
+  @Test
+  public void 
testGetColumnsRequiredForIngestionWithUnsupportedJsonPathFieldType()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("B"),
+            new StringDimensionSchema("C*"),
+            new StringDimensionSchema("D*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("E*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("F*"),
+            // This will not be ingested as it is not in the flattenSpec and 
useFieldDiscovery is false
+            new StringDimensionSchema("H")
+        )
+    );
+
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+        // This is unsupported. Hence, return all columns
+        new JSONPathFieldSpec(JSONPathFieldType.JQ, "foobar", ".foo.bar")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(fullInputSchema, actual);
+  }
+
+  @Test
+  public void testGetColumnsRequiredForIngestionWithFlattenTimestamp()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec("CFlat", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(
+            new StringDimensionSchema("B")
+        )
+    );
+    List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+        new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time")
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
+
+    Set<String> actual = 
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, 
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+    Assert.assertEquals(ImmutableSet.of("B", "C"), actual);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to