Thanks for answering. I found problem and create an issue for Spark with 
related pull request:

https://issues.apache.org/jira/browse/SPARK-39393

Thanks again for your help.

________________________________
From: Enrico Minack <i...@enrico.minack.dev>
Sent: Tuesday, June 7, 2022, 9:49 PM
To: Amin Borjian
Subject: Re: [Spark] [SQL] Updating Spark from version 3.0.1 to 3.2.1 reduced 
functionality for working with parquet files

Hi,

even though the config option has been around since 1.2.0, it might be that 
more filters are being pushed into Parquet after 3.0.1 under the same option.

Are you sure the filter had been pushed into Parquet in 3.0.1? Did you run 
df.explan(true) for both versions? Can you share the plans?

Enrico


Am 05.06.22 um 12:34 schrieb Amin Borjian:
Thanks for answer.

- looks like the error comes from the Parquet library, has the library version 
changed moving to 3.2.1? What are the parquet versions used in 3.0.1 and 3.2.1? 
Can you read that parquet file with the newer parquet library version natively 
(without Spark)? Then this might be a Parquet issue, not a Spark issue.

At first I think problem was from parquet library. Parquet library updated from 
version 1.10 to 1.12.2 in Spark 3.2.1
I checked all classes of exception stack trace in order to find any suspect 
change in 2020-2022. In more detail, I looked at the following classes:


  *   SchemaCompatibilityValidator
  *   Operators
  *   RowGroupFilter
  *   FilterCompat
  *   ParquetFileReader
  *   ParquetRecordReader

I do not find any big change related to problem. In fact, the problem we have 
is because of this line, which has existed since 2014 (based on Git history):

https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java#L194

- Unless Spark 3.2.1 does predicate filter pushdown while 3.0.1 did not and it 
has never been supported by Parquet. Then disable filter pushdown feature 
should help: config("spark.sql.parquet.filterPushdown", false).

So I think it should be some how related to Spark (For example Spark call new 
functions of parquet library) Also I check option you mentioned from Spark code 
base:

val PARQUET_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.parquet.filterPushdown")
         .doc("Enables Parquet filter push-down optimization when set to true.")
         .version("1.2.0")
         .booleanConf
         .createWithDefault(true)

This option has been around for a long time and we even had it enabled in the 
previous version (Spark 3.0.1).
Of course, disabling it solved the problem, but I wonder why we did not have a 
problem before.

It was difficult to check the Spark stack trace and I checked only the 
following two classes (after these classes the parquet library functions were 
called) I did not see any significant change in the direction of the problem 
created:


  *   ParquetFileFormat
  *   FileScanRDD

Do you think the parquet library could still be a problem? Have I forgotten a 
place in the review? Did Spark have a issue and has not used the 
"spark.sql.parquet.filterPushdown" setting correctly yet?

From: Enrico Minack<mailto:i...@enrico.minack.dev>
Sent: Sunday, June 5, 2022 1:32 PM
To: Amin Borjian<mailto:borjianami...@outlook.com>; 
dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: [Spark] [SQL] Updating Spark from version 3.0.1 to 3.2.1 reduced 
functionality for working with parquet files

Hi,

looks like the error comes from the Parquet library, has the library version 
changed moving to 3.2.1? What are the parquet versions used in 3.0.1 and 3.2.1? 
Can you read that parquet file with the newer parquet library version natively 
(without Spark)? Then this might be a Parquet issue, not a Spark issue.

Unless Spark 3.2.1 does predicate filter pushdown while 3.0.1 did not and it 
has never been supported by Parquet. Then disable filter pushdown feature 
should help: config("spark.sql.parquet.filterPushdown", false).

Enrico


Am 05.06.22 um 10:37 schrieb Amin Borjian:

Hi.

We are updating our Spark cluster from version 3.0.1 to 3.2.1 in order to get 
benefits from lots of improvement. Everything was good until we see strange 
behavior. Assume follow protobuf structure:

message Model {
     string name = 1;
     repeated string keywords = 2;
}

We store protobuf in parquet files with parquet library in HDFS. Before Spark 
3.2.1 we could run below query on Spark:

val df = spark.read.parquet("/path/to/parquet")
df.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 
'XXX’)").show(false)

However after updating Spark to version 3.2.1, we receive following error (at 
the end of email). I think we lost good feature! Is it by mistake or on 
purpose? Can we some how fix problem without reverting or not? Should we wait 
for new release or not? Thank you in advance for help.

Caused by: java.lang.IllegalArgumentException: FilterPredicates do not 
currently support repeated columns. Column keywords is repeated.
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at 
org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at 
org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at 
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at 
org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
  at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
  at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
  at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
  at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
  at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)







Reply via email to