Hi,
looks like the error comes from the Parquet library, has the library
version changed moving to 3.2.1? What are the parquet versions used in
3.0.1 and 3.2.1? Can you read that parquet file with the newer parquet
library version natively (without Spark)? Then this might be a Parquet
issue, not a Spark issue.
Unless Spark 3.2.1 does predicate filter pushdown while 3.0.1 did not
and it has never been supported by Parquet. Then disable filter pushdown
feature should help: config("spark.sql.parquet.filterPushdown", false).
Enrico
Am 05.06.22 um 10:37 schrieb Amin Borjian:
Hi.
We are updating our Spark cluster from version 3.0.1 to 3.2.1 in order
to get benefits from lots of improvement. Everything was good until we
see strange behavior. Assume follow protobuf structure:
message Model {
string name = 1;
repeated string keywords = 2;
}
We store protobuf in parquet files with parquet library in HDFS.
Before Spark 3.2.1 we could run below query on Spark:
val df = spark.read.parquet("/path/to/parquet")
df.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords,
'XXX’)").show(false)
However after updating Spark to version 3.2.1, we receive following
error (at the end of email). I think we lost good feature! Is it by
mistake or on purpose? Can we some how fix problem without reverting
or not? Should we wait for new release or not? Thank you in advance
for help.
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not
currently support repeated columns. Column keywords is repeated.
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
at
org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
at
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
at
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
at
org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
at
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
at
org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
at
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
at
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
at
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
at
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)