This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7f54ebbf47 Fix Parquet Parser missing column when reading parquet file
(#13612)
7f54ebbf47 is described below
commit 7f54ebbf478b9a2122edcc4bee4815ae86831927
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Jan 11 20:08:48 2023 -1000
Fix Parquet Parser missing column when reading parquet file (#13612)
* fix parquet reader
* fix checkstyle
* fix bug
* fix inspection
* refactor
* fix checkstyle
* fix checkstyle
* fix checkstyle
* fix checkstyle
* add test
* fix checkstyle
* fix tests
* add IT
* add IT
* add more tests
* fix checkstyle
* fix stuff
* fix stuff
* add more tests
* add more tests
---
.../util/common/parsers/JSONPathFieldSpec.java | 6 +
.../example/compat/nested_array_struct.json | 3 +-
.../example/compat/parquet_thrift_compat.json | 13 +-
.../example/compat/proto_struct_with_array.json | 5 +-
.../flattening/flat_1_autodiscover_fields.json | 7 +-
.../example/flattening/flat_1_flatten.json | 4 +
.../example/flattening/flat_1_list_index.json | 4 +
.../flattening/nested_1_autodiscover_fields.json | 4 +-
.../parquet/simple/DruidParquetReadSupport.java | 52 ++--
.../environment-configs/override-examples/hdfs | 2 +-
.../druid/tests/hadoop/ITHadoopIndexTest.java | 142 +++++++++
.../tests/indexer/AbstractITBatchIndexTest.java | 44 ++-
.../wikipedia_index_data1.parquet | Bin 0 -> 10653 bytes
.../wikipedia_index_data2.parquet | Bin 0 -> 9935 bytes
.../wikipedia_index_data3.parquet | Bin 0 -> 9983 bytes
.../wikipedia_hadoop_paquet_parser_index_data.json | 131 ++++++++
.../wikipedia_hadoop_paquet_parser_query_data.json | 157 ++++++++++
.../apache/druid/segment/indexing/ReaderUtils.java | 143 +++++++++
.../druid/segment/indexing/ReaderUtilsTest.java | 331 +++++++++++++++++++++
19 files changed, 1006 insertions(+), 42 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathFieldSpec.java
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathFieldSpec.java
index 442c9bcea1..9ddde9b897 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathFieldSpec.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathFieldSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.java.util.common.parsers;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.jayway.jsonpath.JsonPath;
import org.apache.druid.utils.CollectionUtils;
import java.util.List;
@@ -125,6 +126,11 @@ public class JSONPathFieldSpec
return new JSONPathFieldSpec(JSONPathFieldType.TREE, name, null, nodes);
}
+ public static String getCompilePath(String expr)
+ {
+ return JsonPath.compile(expr).getPath();
+ }
+
@Override
public boolean equals(final Object o)
{
diff --git
a/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
b/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
index 94f19ace50..6f8935f9ea 100644
--- a/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
+++ b/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
@@ -45,7 +45,8 @@
},
"dimensionsSpec": {
"dimensions": [
- "i32_dec"
+ "i32_dec",
+ "primitive"
],
"dimensionExclusions": [],
"spatialDimensions": []
diff --git
a/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
b/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
index ef613dacfd..8c0ea2b4f4 100644
---
a/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
+++
b/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
@@ -44,7 +44,18 @@
"missingValue": "2018-09-01T00:00:00.000Z"
},
"dimensionsSpec": {
- "dimensions": [],
+ "dimensions": [
+ "boolColumn",
+ "byteColumn",
+ "shortColumn",
+ "intColumn",
+ "longColumn",
+ "doubleColumn",
+ "binaryColumn",
+ "stringColumn",
+ "enumColumn",
+ "stringsColumn",
+ "intSetColumn" ],
"dimensionExclusions": [],
"spatialDimensions": []
}
diff --git
a/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
b/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
index 7909e0ee12..b448a3fa8f 100644
---
a/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
+++
b/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
@@ -49,7 +49,10 @@
"missingValue": "2018-09-01T00:00:00.000Z"
},
"dimensionsSpec": {
- "dimensions": [],
+ "dimensions": [
+ "optionalPrimitive",
+ "requiredPrimitive"
+ ],
"dimensionExclusions": [],
"spatialDimensions": []
}
diff --git
a/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
b/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
index 3ded41c2b5..c23ecdc5d4 100644
---
a/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
+++
b/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
@@ -32,7 +32,12 @@
"format": "auto"
},
"dimensionsSpec": {
- "dimensions": [],
+ "dimensions": [
+ "dim1",
+ "dim2",
+ "dim3",
+ "listDim"
+ ],
"dimensionExclusions": [],
"spatialDimensions": []
}
diff --git
a/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
b/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
index 760dea7757..a8d6a82c12 100644
--- a/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
+++ b/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
@@ -42,6 +42,10 @@
"type": "root",
"name": "dim3"
},
+ {
+ "type": "root",
+ "name": "metric1"
+ },
{
"type": "path",
"name": "list",
diff --git
a/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
b/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
index 219afde889..6b65237d3c 100644
---
a/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
+++
b/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
@@ -38,6 +38,10 @@
"type": "root",
"name": "dim2"
},
+ {
+ "type": "root",
+ "name": "metric1"
+ },
{
"type": "path",
"name": "listextracted",
diff --git
a/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
b/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
index 05171c0f13..3d7490bda8 100644
---
a/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
+++
b/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
@@ -32,7 +32,9 @@
"format": "auto"
},
"dimensionsSpec": {
- "dimensions": [],
+ "dimensions": [
+ "dim1"
+ ],
"dimensionExclusions": [],
"spatialDimensions": []
}
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
index a1ed46c0f6..b68c4f1dd7 100644
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
@@ -19,22 +19,25 @@
package org.apache.druid.data.input.parquet.simple;
-import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
-import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.segment.indexing.ReaderUtils;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
public class DruidParquetReadSupport extends GroupReadSupport
{
+ private static final Logger LOG = new Logger(DruidParquetReadSupport.class);
+
/**
* Select the columns from the parquet schema that are used in the schema of
the ingestion job
*
@@ -44,45 +47,36 @@ public class DruidParquetReadSupport extends
GroupReadSupport
*/
private MessageType getPartialReadSchema(InitContext context)
{
- MessageType fullSchema = context.getFileSchema();
+ List<Type> partialFields = new ArrayList<>();
+ MessageType fullSchema = context.getFileSchema();
String name = fullSchema.getName();
HadoopDruidIndexerConfig config =
HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
ParseSpec parseSpec = config.getParser().getParseSpec();
-
- // this is kind of lame, maybe we can still trim what we read if we
- // parse the flatten spec and determine it isn't auto discovering props?
- if (parseSpec instanceof ParquetParseSpec) {
- if (((ParquetParseSpec) parseSpec).getFlattenSpec() != null) {
- return fullSchema;
- }
+ JSONPathSpec flattenSpec = null;
+ if (parseSpec instanceof ParquetParseSpec && ((ParquetParseSpec)
parseSpec).getFlattenSpec() != null) {
+ flattenSpec = ((ParquetParseSpec) parseSpec).getFlattenSpec();
}
+ Set<String> fullSchemaFields =
fullSchema.getFields().stream().map(Type::getName).collect(Collectors.toSet());
- String tsField = parseSpec.getTimestampSpec().getTimestampColumn();
-
- List<DimensionSchema> dimensionSchema =
parseSpec.getDimensionsSpec().getDimensions();
- Set<String> dimensions = new HashSet<>();
- for (DimensionSchema dim : dimensionSchema) {
- dimensions.add(dim.getName());
- }
-
- Set<String> metricsFields = new HashSet<>();
- for (AggregatorFactory agg :
config.getSchema().getDataSchema().getAggregators()) {
- metricsFields.addAll(agg.requiredFields());
- }
-
- List<Type> partialFields = new ArrayList<>();
+ Set<String> requiredFields = ReaderUtils.getColumnsRequiredForIngestion(
+ fullSchemaFields,
+ parseSpec.getTimestampSpec(),
+ parseSpec.getDimensionsSpec(),
+ config.getSchema().getDataSchema().getTransformSpec(),
+ config.getSchema().getDataSchema().getAggregators(),
+ flattenSpec
+ );
for (Type type : fullSchema.getFields()) {
- if (tsField.equals(type.getName())
- || metricsFields.contains(type.getName())
- || dimensions.size() > 0 && dimensions.contains(type.getName())
- || dimensions.size() == 0) {
+ if (requiredFields.contains(type.getName())) {
partialFields.add(type);
}
}
+ LOG.info("Parquet schema name[%s] with full schema[%s] requires
fields[%s]", name, fullSchemaFields, requiredFields);
+
return new MessageType(name, partialFields);
}
diff --git
a/integration-tests/docker/environment-configs/override-examples/hdfs
b/integration-tests/docker/environment-configs/override-examples/hdfs
index 63b851e508..2e25b929b7 100644
--- a/integration-tests/docker/environment-configs/override-examples/hdfs
+++ b/integration-tests/docker/environment-configs/override-examples/hdfs
@@ -21,4 +21,4 @@ druid_storage_storageDirectory=/druid/segments
# Depending on the test, additional extension(s) may be required.
# Please refer to the other
integration-tests/docker/environment-configs/override-examples/ files and Druid
docs for
# additional env vars to provide for each extension.
-druid_extensions_loadList=["druid-hdfs-storage", "druid-parquet-extensions",
"druid-orc-extensions"]
\ No newline at end of file
+druid_extensions_loadList=["druid-hdfs-storage",
"mysql-metadata-storage","druid-basic-security","simple-client-sslcontext","druid-testing-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches","druid-parquet-extensions","druid-avro-extensions","druid-protobuf-extensions","druid-orc-extensions","druid-kafka-indexing-service","druid-s3-extensions"]
\ No newline at end of file
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
index a61fae4e0e..b8ab40bb49 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.tests.hadoop;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -65,6 +66,9 @@ public class ITHadoopIndexTest extends
AbstractITBatchIndexTest
private static final String BATCH_QUERIES_RESOURCE =
"/hadoop/batch_hadoop_queries.json";
private static final String BATCH_DATASOURCE = "batchLegacyHadoop";
+ private static final String BATCH_TASK_WITH_PARQUET_PARSER_RENAME =
"/hadoop/wikipedia_hadoop_paquet_parser_index_data.json";
+ private static final String BATCH_QUERIES_RESOURCE_FOR_PARQUET_PARSER_RENAME
= "/hadoop/wikipedia_hadoop_paquet_parser_query_data.json";
+
private static final String INDEX_TASK =
"/hadoop/wikipedia_hadoop_index_task.json";
private static final String INDEX_QUERIES_RESOURCE =
"/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_hadoop_index_test";
@@ -133,6 +137,144 @@ public class ITHadoopIndexTest extends
AbstractITBatchIndexTest
}
}
+ @Test
+ public void testHadoopParquetParserWithRenameIndexTest() throws Exception
+ {
+ String indexDatasource = BATCH_DATASOURCE + "_" + UUID.randomUUID();
+ try (
+ final Closeable ignored0 = unloader(indexDatasource +
config.getExtraDatasourceNameSuffix());
+ ) {
+ final Function<String, String> specPathsTransform = spec -> {
+ try {
+ String path = "/batch_index/parquet";
+ spec = StringUtils.replace(
+ spec,
+ "%%INPUT_PATHS%%",
+ path
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%TRANSFORMS%%",
+ jsonMapper.writeValueAsString(
+ ImmutableList.of(
+ ImmutableMap.of("type", "expression", "name",
"userTransformed", "expression", "user"),
+ ImmutableMap.of("type", "expression", "name",
"regionAndCity", "expression", "concat(region,city)")
+ )
+ )
+ );
+
+ return spec;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ final Function<String, String> queryTransform = query -> {
+ try {
+ query = StringUtils.replace(
+ query,
+ "%%EXPECTED_NUMBER_SUM_RESULT%%",
+ jsonMapper.writeValueAsString(
+ ImmutableMap.of("sum_added", 3090, "sum_deleted", 712,
"sum_delta", 2378)
+ )
+ );
+ return query;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ doIndexTest(
+ indexDatasource,
+ BATCH_TASK_WITH_PARQUET_PARSER_RENAME,
+ specPathsTransform,
+ BATCH_QUERIES_RESOURCE_FOR_PARQUET_PARSER_RENAME,
+ queryTransform,
+ false,
+ true,
+ true,
+ new Pair<>(false, false)
+ );
+ }
+ }
+
+ @Test
+ public void testHadoopParquetParserWithDifferentSchemaTest() throws Exception
+ {
+ /*
+ This test reads from three parquet files, each with a different schema.
+ The difference in the schema is shown below:
+ File 1's columns: userRenamed, continent, country, added, deleted,
deltaRenamed
+ File 2's columns: user, continentRenamed, country, added, delta
+ File 3's columns: user, continent, countryRenamed, deleted, delta
+
+ Note that `deleted` was dropped from File 2 and `added` was dropped from
File 3
+ */
+ String indexDatasource = BATCH_DATASOURCE + "_" + UUID.randomUUID();
+ try (
+ final Closeable ignored0 = unloader(indexDatasource +
config.getExtraDatasourceNameSuffix());
+ ) {
+ final Function<String, String> specPathsTransform = spec -> {
+ try {
+ String path = "/batch_index/multiple_schema_parquet";
+ spec = StringUtils.replace(
+ spec,
+ "%%INPUT_PATHS%%",
+ path
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%TRANSFORMS%%",
+ jsonMapper.writeValueAsString(
+ ImmutableList.of(
+ ImmutableMap.of("type", "expression", "name",
"userTransformed", "expression", "nvl(user,userRenamed)"),
+ ImmutableMap.of("type", "expression", "name",
"countryFlat", "expression", "nvl(countryFlat,countryRenamed)"),
+ ImmutableMap.of("type", "expression", "name",
"continentFlat", "expression", "nvl(continentFlat,continentRenamed)"),
+ ImmutableMap.of("type", "expression", "name", "delta",
"expression", "nvl(delta,deltaRenamed)"),
+ ImmutableMap.of("type", "expression", "name",
"regionAndCity", "expression", "concat(region,city)")
+ )
+ )
+ );
+
+ return spec;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ final Function<String, String> queryTransform = query -> {
+ try {
+ query = StringUtils.replace(
+ query,
+ "%%EXPECTED_NUMBER_SUM_RESULT%%",
+ jsonMapper.writeValueAsString(
+ ImmutableMap.of("sum_added", 1602, "sum_deleted", 497,
"sum_delta", 2378)
+ )
+ );
+ return query;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ doIndexTest(
+ indexDatasource,
+ BATCH_TASK_WITH_PARQUET_PARSER_RENAME,
+ specPathsTransform,
+ BATCH_QUERIES_RESOURCE_FOR_PARQUET_PARSER_RENAME,
+ queryTransform,
+ false,
+ true,
+ true,
+ new Pair<>(false, false)
+ );
+ }
+ }
+
@Test(dataProvider = "resources")
public void testIndexData(DimensionBasedPartitionsSpec partitionsSpec)
throws Exception
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index 06f692162e..8bcfaa0df1 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -133,6 +133,31 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
boolean waitForSegmentsToLoad,
Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
) throws IOException
+ {
+ doIndexTest(
+ dataSource,
+ indexTaskFilePath,
+ taskSpecTransform,
+ queryFilePath,
+ Function.identity(),
+ waitForNewVersion,
+ runTestQueries,
+ waitForSegmentsToLoad,
+ segmentAvailabilityConfirmationPair
+ );
+ }
+
+ protected void doIndexTest(
+ String dataSource,
+ String indexTaskFilePath,
+ Function<String, String> taskSpecTransform,
+ String queryFilePath,
+ Function<String, String> queryTransform,
+ boolean waitForNewVersion,
+ boolean runTestQueries,
+ boolean waitForSegmentsToLoad,
+ Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+ ) throws IOException
{
final String fullDatasourceName = dataSource +
config.getExtraDatasourceNameSuffix();
final String taskSpec = taskSpecTransform.apply(
@@ -151,11 +176,16 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
segmentAvailabilityConfirmationPair
);
if (runTestQueries) {
- doTestQuery(dataSource, queryFilePath);
+ doTestQuery(dataSource, queryFilePath, queryTransform);
}
}
protected void doTestQuery(String dataSource, String queryFilePath)
+ {
+ doTestQuery(dataSource, queryFilePath, Function.identity());
+ }
+
+ protected void doTestQuery(String dataSource, String queryFilePath,
Function<String, String> queryTransform)
{
try {
String queryResponseTemplate;
@@ -166,14 +196,14 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryFilePath);
}
-
- queryResponseTemplate = StringUtils.replace(
- queryResponseTemplate,
- "%%DATASOURCE%%",
- dataSource + config.getExtraDatasourceNameSuffix()
+ queryResponseTemplate = queryTransform.apply(
+ StringUtils.replace(
+ queryResponseTemplate,
+ "%%DATASOURCE%%",
+ dataSource + config.getExtraDatasourceNameSuffix()
+ )
);
queryHelper.testQueriesFromString(queryResponseTemplate);
-
}
catch (Exception e) {
LOG.error(e, "Error while testing");
diff --git
a/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data1.parquet
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data1.parquet
new file mode 100644
index 0000000000..f211113a41
Binary files /dev/null and
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data1.parquet
differ
diff --git
a/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data2.parquet
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data2.parquet
new file mode 100644
index 0000000000..f42f951f07
Binary files /dev/null and
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data2.parquet
differ
diff --git
a/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data3.parquet
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data3.parquet
new file mode 100644
index 0000000000..4eaa9716ae
Binary files /dev/null and
b/integration-tests/src/test/resources/data/batch_index/multiple_schema_parquet/wikipedia_index_data3.parquet
differ
diff --git
a/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_index_data.json
b/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_index_data.json
new file mode 100644
index 0000000000..fd474677dc
--- /dev/null
+++
b/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_index_data.json
@@ -0,0 +1,131 @@
+{
+ "type": "index_hadoop",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "parser": {
+ "parseSpec": {
+ "dimensionsSpec": {
+ "dimensionExclusions": [],
+ "dimensions": [
+ "page",
+ {"type": "string", "name": "language", "createBitmapIndex":
false},
+ "userTransformed",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continentFlat",
+ "countryFlat",
+ "regionAndCity"
+ ],
+ "spatialDimensions": []
+ },
+ "format": "parquet",
+ "timestampSpec": {
+ "column": "timestamp"
+ },
+ "flattenSpec": {
+ "fields": [
+ {
+ "type": "path",
+ "name": "countryFlat",
+ "expr": "$.country"
+ },
+ {
+ "type": "root",
+ "expr": "continent",
+ "name": "continentFlat"
+ },
+ {
+ "type": "root",
+ "name": "namespace"
+ }
+ ]
+ }
+ },
+ "binaryAsString": true,
+ "type": "parquet"
+ },
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ },
+ {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ },
+ {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ },
+ {
+ "name": "thetaSketch",
+ "type": "thetaSketch",
+ "fieldName": "userTransformed"
+ },
+ {
+ "name": "quantilesDoublesSketch",
+ "type": "quantilesDoublesSketch",
+ "fieldName": "delta"
+ },
+ {
+ "name": "HLLSketchBuild",
+ "type": "HLLSketchBuild",
+ "fieldName": "userTransformed"
+ }
+ ],
+ "granularitySpec": {
+ "segmentGranularity": "DAY",
+ "queryGranularity": "second",
+ "intervals" : [ "2013-08-31/2013-09-02" ]
+ },
+ "transformSpec": {
+ "filter": null,
+ "transforms": %%TRANSFORMS%%
+ }
+ },
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "inputFormat":
"org.apache.druid.data.input.parquet.DruidParquetInputFormat",
+ "type": "static",
+ "paths": "%%INPUT_PATHS%%"
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "partitionsSpec": {
+ "assumeGrouped": true,
+ "targetPartitionSize": 75000,
+ "type": "hashed"
+ },
+ "jobProperties": {
+ "fs.permissions.umask-mode": "022",
+ "fs.default.name" : "hdfs://druid-it-hadoop:9000",
+ "fs.defaultFS" : "hdfs://druid-it-hadoop:9000",
+ "dfs.datanode.address" : "druid-it-hadoop",
+ "dfs.client.use.datanode.hostname" : "true",
+ "dfs.datanode.use.datanode.hostname" : "true",
+ "yarn.resourcemanager.hostname" : "druid-it-hadoop",
+ "yarn.nodemanager.vmem-check-enabled" : "false",
+ "mapreduce.job.classloader": "true",
+ "mapreduce.map.java.opts" : "-Duser.timezone=UTC
-Dfile.encoding=UTF-8",
+ "mapreduce.job.user.classpath.first" : "true",
+ "mapreduce.reduce.java.opts" : "-Duser.timezone=UTC
-Dfile.encoding=UTF-8",
+ "mapreduce.map.memory.mb" : 1024,
+ "mapreduce.reduce.memory.mb" : 1024
+ },
+ "rowFlushBoundary": 10000
+ }
+ }
+}
\ No newline at end of file
diff --git
a/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_query_data.json
b/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_query_data.json
new file mode 100644
index 0000000000..a0924a9870
--- /dev/null
+++
b/integration-tests/src/test/resources/hadoop/wikipedia_hadoop_paquet_parser_query_data.json
@@ -0,0 +1,157 @@
+[
+ {
+ "description": "timeseries, 1 agg, all",
+ "query":{
+ "queryType" : "timeBoundary",
+ "dataSource": "%%DATASOURCE%%"
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T01:02:33.000Z",
+ "result" : {
+ "minTime" : "2013-08-31T01:02:33.000Z",
+ "maxTime" : "2013-09-01T12:41:27.000Z"
+ }
+ }
+ ]
+ },
+ {
+ "description": "timeseries, datasketch aggs, all",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"week",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-02T00:00"
+ ],
+ "filter":null,
+ "aggregations":[
+ {
+ "type": "HLLSketchMerge",
+ "name": "approxCountHLL",
+ "fieldName": "HLLSketchBuild",
+ "lgK": 12,
+ "tgtHllType": "HLL_4",
+ "round": true
+ },
+ {
+ "type":"thetaSketch",
+ "name":"approxCountTheta",
+ "fieldName":"thetaSketch",
+ "size":16384,
+ "shouldFinalize":true,
+ "isInputThetaSketch":false,
+ "errorBoundsStdDev":null
+ },
+ {
+ "type":"quantilesDoublesSketch",
+ "name":"quantilesSketch",
+ "fieldName":"quantilesDoublesSketch",
+ "k":128
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-26T00:00:00.000Z",
+ "result" : {
+ "quantilesSketch":10,
+ "approxCountTheta":5.0,
+ "approxCountHLL":5
+ }
+ }
+ ]
+ },
+ {
+ "description": "timeseries, stringFirst/stringLast aggs, all",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"week",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-02T00:00"
+ ],
+ "filter":null,
+ "aggregations":[
+ {
+ "type": "stringFirst",
+ "name": "first_user",
+ "fieldName": "userTransformed"
+ },
+ {
+ "type":"stringLast",
+ "name":"last_user",
+ "fieldName":"userTransformed"
+ },
+ {
+ "type": "stringFirst",
+ "name": "first_countryFlat",
+ "fieldName": "countryFlat"
+ },
+ {
+ "type": "stringFirst",
+ "name": "first_continentFlat",
+ "fieldName": "continentFlat"
+ },
+ {
+ "type": "stringFirst",
+ "name": "first_namespace",
+ "fieldName": "namespace"
+ },
+ {
+ "type": "stringFirst",
+ "name": "first_regionAndCity",
+ "fieldName": "regionAndCity"
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-26T00:00:00.000Z",
+ "result" : {
+ "first_user":"nuclear",
+ "first_continentFlat":"North America",
+ "first_namespace":"article",
+ "last_user":"stringer",
+ "first_regionAndCity":"Bay AreaSan Francisco",
+ "first_countryFlat":"United States"
+ }
+ }
+ ]
+ },
+ {
+ "description": "timeseries, number sum aggs, all",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"week",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-02T00:00"
+ ],
+ "filter":null,
+ "aggregations":[
+ {
+ "type": "longSum",
+ "name": "sum_added",
+ "fieldName": "added"
+ },
+ {
+ "type":"longSum",
+ "name":"sum_deleted",
+ "fieldName":"deleted"
+ },
+ {
+ "type": "longSum",
+ "name": "sum_delta",
+ "fieldName": "delta"
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-26T00:00:00.000Z",
+ "result" : %%EXPECTED_NUMBER_SUM_RESULT%%
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
new file mode 100644
index 0000000000..b47d676081
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.transform.Transform;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import javax.annotation.Nullable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ReaderUtils
+{
+ private static final Logger LOG = new Logger(ReaderUtils.class);
+
+ private static final Pattern JSON_PATH_PATTERN =
Pattern.compile("\\[(.*?)]");
+ private static final Pattern BRACKET_NOTATED_CHILD_PATTERN =
Pattern.compile("'(.*?)'");
+
+ public static Set<String> getColumnsRequiredForIngestion(
+ Set<String> fullInputSchema,
+ TimestampSpec timestampSpec,
+ DimensionsSpec dimensionsSpec,
+ TransformSpec transformSpec,
+ AggregatorFactory[] aggregators,
+ @Nullable JSONPathSpec flattenSpec
+ )
+ {
+ Set<String> fieldsRequired = new HashSet<>();
+
+ // We need to read timestamp column for Druid timestamp field
+ fieldsRequired.add(timestampSpec.getTimestampColumn());
+
+ // Find columns we need to read from the flattenSpec
+ if (flattenSpec != null) {
+ // Parse columns needed from flattenSpec
+ for (JSONPathFieldSpec fields : flattenSpec.getFields()) {
+ if (fields.getType() == JSONPathFieldType.ROOT) {
+ // ROOT type just get top level field using the expr as the key
+ fieldsRequired.add(fields.getExpr());
+ } else if (fields.getType() == JSONPathFieldType.PATH) {
+ // Parse PATH type to determine columns needed
+ String parsedPath;
+ try {
+ parsedPath = JSONPathFieldSpec.getCompilePath(fields.getExpr());
+ }
+ catch (Exception e) {
+ // We can skip columns used in this path as the path is invalid
+ LOG.debug("Ignoring columns from JSON path [%s] as path expression
is invalid", fields.getExpr());
+ continue;
+ }
+ // Remove the $
+ parsedPath = parsedPath.substring(1);
+ // If the first level is a deep scan, then we need all columns
+ if (parsedPath.length() >= 2 && "..".equals(parsedPath.substring(0,
2))) {
+ return fullInputSchema;
+ }
+ Matcher jsonPathMatcher = JSON_PATH_PATTERN.matcher(parsedPath);
+ if (!jsonPathMatcher.find()) {
+ LOG.warn("Failed to parse JSON path for required column from path
[%s]", fields.getExpr());
+ return fullInputSchema;
+ }
+ String matchedGroup = jsonPathMatcher.group();
+ Matcher childMatcher =
BRACKET_NOTATED_CHILD_PATTERN.matcher(matchedGroup);
+ if (childMatcher.find()) {
+ // Get name of the column from bracket-notated child i.e.
['region']
+ childMatcher.reset();
+ while (childMatcher.find()) {
+ String columnName = childMatcher.group();
+ // Remove the quote around column name
+ fieldsRequired.add(columnName.substring(1, columnName.length() -
1));
+ }
+ } else if ("[*]".equals(matchedGroup)) {
+ // If the first level is a wildcard, then we need all columns
+ return fullInputSchema;
+ } else {
+ // This can happen if it is a filter expression, slice operator,
or index / indexes
+ // We just return all columns...
+ return fullInputSchema;
+ }
+ } else {
+ // Others type aren't supported but returning full schema just in
case...
+ LOG.warn("Got unexpected JSONPathFieldType [%s]", fields.getType());
+ return fullInputSchema;
+ }
+ }
+ // If useFieldDiscovery is false then we have already determined all the
columns we need to read from
+ // (as only explicitly specified fields will be available to use in the
other specs)
+ if (!flattenSpec.isUseFieldDiscovery()) {
+ fieldsRequired.retainAll(fullInputSchema);
+ return fieldsRequired;
+ }
+ }
+
+ // Determine any fields we need to read from parquet file that is used in
the transformSpec
+ List<Transform> transforms = transformSpec.getTransforms();
+ for (Transform transform : transforms) {
+ fieldsRequired.addAll(transform.getRequiredColumns());
+ }
+
+ // Determine any fields we need to read from parquet file that is used in
the dimensionsSpec
+ List<DimensionSchema> dimensionSchema = dimensionsSpec.getDimensions();
+ for (DimensionSchema dim : dimensionSchema) {
+ fieldsRequired.add(dim.getName());
+ }
+
+ // Determine any fields we need to read from parquet file that is used in
the metricsSpec
+ for (AggregatorFactory agg : aggregators) {
+ fieldsRequired.addAll(agg.requiredFields());
+ }
+
+ // Only required fields that actually exist in the input schema
+ fieldsRequired.retainAll(fullInputSchema);
+ return fieldsRequired;
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
new file mode 100644
index 0000000000..6ffe7bded6
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.transform.ExpressionTransform;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class ReaderUtilsTest extends InitializedNullHandlingTest
+{
+
+ private final Set<String> fullInputSchema = ImmutableSet.of("A", "B", "C",
"D", "E", "F", "G", "H", "I");
+
+ @Test
+ public void
testGetColumnsRequiredForIngestionWithoutMetricsWithoutTransformAndWithoutFlatten()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new LongDimensionSchema("C"),
+ new FloatDimensionSchema("D")
+ )
+ );
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, null);
+ Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D"), actual);
+ }
+
+ @Test
+ public void
testGetColumnsRequiredForIngestionWithoutTransformAndWithoutFlatten()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new LongDimensionSchema("C"),
+ new FloatDimensionSchema("D")
+ )
+ );
+ AggregatorFactory[] aggregators = new AggregatorFactory[]{
+ new CountAggregatorFactory("custom_count"),
+ new LongSumAggregatorFactory("custom_long_sum", "E"),
+ new FloatMinAggregatorFactory("custom_float_min", "F")
+ };
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, aggregators, null);
+ Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D", "E", "F"), actual);
+ }
+
+ @Test
+ public void
testGetColumnsRequiredForIngestionWithTransformAndWithoutFlatten()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new StringDimensionSchema("C*"),
+ new LongDimensionSchema("D*"),
+ new FloatDimensionSchema("E*"),
+ new LongDimensionSchema("G")
+ )
+ );
+
+ TransformSpec transformSpec = new TransformSpec(
+ null,
+ ImmutableList.of(
+ // Function and rename
+ new ExpressionTransform("C*", "json_value(C, '$.dim2')",
TestExprMacroTable.INSTANCE),
+ // Rename
+ new ExpressionTransform("D*", "D", TestExprMacroTable.INSTANCE),
+ // Function with multiple input columns
+ new ExpressionTransform("E*", "concat(E, F)",
TestExprMacroTable.INSTANCE),
+ // Function with same name
+ new ExpressionTransform("G", "CAST(G, LONG)",
TestExprMacroTable.INSTANCE)
+ )
+ );
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, transformSpec, new AggregatorFactory[]{}, null);
+ Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D", "E", "F", "G"),
actual);
+ }
+
+ @Test
+ public void
testGetColumnsRequiredForIngestionWithFlattenAndUseFieldDiscoveryFalse()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new StringDimensionSchema("C*"),
+ new StringDimensionSchema("D*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("E*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("F*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("H")
+ )
+ );
+
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$.D.M[*].T")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+ TransformSpec transformSpec = new TransformSpec(
+ null,
+ ImmutableList.of(
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new ExpressionTransform("E*", "E", TestExprMacroTable.INSTANCE),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new ExpressionTransform("F*", "concat(F, G)",
TestExprMacroTable.INSTANCE)
+ )
+ );
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, transformSpec, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D"), actual);
+ }
+
+ @Test
+ public void
testGetColumnsRequiredForIngestionWithFlattenAndUseFieldDiscoveryTrue()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new StringDimensionSchema("C*"),
+ new StringDimensionSchema("D*"),
+ new StringDimensionSchema("E*"),
+ new StringDimensionSchema("F*"),
+ new StringDimensionSchema("H")
+ )
+ );
+
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$.D.M[*].T")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
+
+ TransformSpec transformSpec = new TransformSpec(
+ null,
+ ImmutableList.of(
+ new ExpressionTransform("E*", "E", TestExprMacroTable.INSTANCE),
+ new ExpressionTransform("F*", "concat(F, G)",
TestExprMacroTable.INSTANCE)
+ )
+ );
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, transformSpec, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D", "E", "F", "G",
"H"), actual);
+ }
+
+ @Test
+ public void testGetColumnsRequiredForIngestionWithFlattenDeepScan()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new StringDimensionSchema("C*"),
+ new StringDimensionSchema("D*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("E*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("F*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("H")
+ )
+ );
+
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+ // This is doing a deep scan (need to read all columns)
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$..D")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(fullInputSchema, actual);
+ }
+
+ @Test
+ public void testGetColumnsRequiredForIngestionWithFlattenWildcard()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new StringDimensionSchema("C*"),
+ new StringDimensionSchema("D*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("E*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("F*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("H")
+ )
+ );
+
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+ // This is doing a wildcard (need to read all columns)
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$.*")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(fullInputSchema, actual);
+ }
+
+ @Test
+ public void testGetColumnsRequiredForIngestionWithUnmatchedGroup()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new StringDimensionSchema("C*"),
+ new StringDimensionSchema("D*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("E*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("F*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("H")
+ )
+ );
+
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+ // This is doing a wildcard (need to read all columns)
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "D*", "$.[2:4].D")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(fullInputSchema, actual);
+ }
+
+ @Test
+ public void
testGetColumnsRequiredForIngestionWithUnsupportedJsonPathFieldType()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("B"),
+ new StringDimensionSchema("C*"),
+ new StringDimensionSchema("D*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("E*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("F*"),
+ // This will not be ingested as it is not in the flattenSpec and
useFieldDiscovery is false
+ new StringDimensionSchema("H")
+ )
+ );
+
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "B", "B"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "C*", "$.C"),
+ // This is unsupported. Hence, return all columns
+ new JSONPathFieldSpec(JSONPathFieldType.JQ, "foobar", ".foo.bar")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(fullInputSchema, actual);
+ }
+
+ @Test
+ public void testGetColumnsRequiredForIngestionWithFlattenTimestamp()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec("CFlat", "iso", null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ ImmutableList.of(
+ new StringDimensionSchema("B")
+ )
+ );
+ List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time")
+ );
+ JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
+
+ Set<String> actual =
ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec,
dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
+ Assert.assertEquals(ImmutableSet.of("B", "C"), actual);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]