[ https://issues.apache.org/jira/browse/SPARK-39393 ]
Dongjoon Hyun deleted comment on SPARK-39393:
---------------------------------------
was (Author: apachespark):
User 'dongjoon-hyun' has created a pull request for this issue:
https://github.com/apache/spark/pull/36819
> Parquet data source only supports push-down predicate filters for
> non-repeated primitive types
> ----------------------------------------------------------------------------------------------
>
> Key: SPARK-39393
> URL: https://issues.apache.org/jira/browse/SPARK-39393
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1
> Reporter: Amin Borjian
> Assignee: Amin Borjian
> Priority: Major
> Labels: parquet
> Fix For: 3.1.3, 3.3.0, 3.2.2, 3.4.0
>
>
> I use an example to illustrate the problem. The reason for the problem and
> the problem-solving approach are stated below.
> Assume follow Protocol buffer schema:
> {code:java}
> message Model {
> string name = 1;
> repeated string keywords = 2;
> }
> {code}
> Suppose a parquet file is created from a set of records in the above format
> with the help of the {{parquet-protobuf}} library.
> Using Spark version 3.0.2 or older, we could run the following query using
> {{{}spark-shell{}}}:
> {code:java}
> val data = spark.read.parquet("/path/to/parquet")
> data.registerTempTable("models")
> spark.sql("select * from models where array_contains(keywords,
> 'X')").show(false)
> {code}
> But after updating Spark, we get the following error:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: FilterPredicates do not
> currently support repeated columns. Column keywords is repeated.
> at
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
> at
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
> at
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
> at
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
> at
> org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
> at
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
> at
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
> at
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
> at
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
> at
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
> at
> org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
> at
> org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
> at
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
> ...
> {code}
> At first it seems the problem is the parquet library. But in fact, our
> problem is because of this line that has been around since 2014 (based on Git
> history):
> [Parquet Schema Compatibility
> Validator|https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java#L194]
> After some check, I notice that the cause of the problem is due to a change
> in the data filtering conditions:
> {code:java}
> spark.sql("select * from log where array_contains(keywords,
> 'X')").explain(true);
> // Spark 3.0.2 and older
> == Physical Plan ==
> ...
> +- FileScan parquet [link#0,keywords#1]
> DataFilters: [array_contains(keywords#1, Google)]
> PushedFilters: []
> ...
> // Spark 3.1.0 and newer
> == Physical Plan == ...
> +- FileScan parquet [link#0,keywords#1]
> DataFilters: [isnotnull(keywords#1), array_contains(keywords#1, Google)]
> PushedFilters: [IsNotNull(keywords)]
> ...{code}
> It's good that the filtering section has become smarter. Unfortunately, due
> to unfamiliarity with code base, I could not find the exact location of the
> change and related pull request. In general, this change is suitable for
> non-repeated parquet fields, but in the repeated field, it causes an error
> from the parquet library. (Like the example given)
> The only temporary solution in my opinion to solve the problem is to disable
> the following setting, which in general greatly reduces performance:
> {code:java}
> SET spark.sql.parquet.filterPushdown=false {code}
> I created a patch for this bug and a pull request will be sent soon.
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]