Hi,

looks like the error comes from the Parquet library, has the library version changed moving to 3.2.1? What are the parquet versions used in 3.0.1 and 3.2.1? Can you read that parquet file with the newer parquet library version natively (without Spark)? Then this might be a Parquet issue, not a Spark issue.

Unless Spark 3.2.1 does predicate filter pushdown while 3.0.1 did not and it has never been supported by Parquet. Then disable filter pushdown feature should help: config("spark.sql.parquet.filterPushdown", false).

Enrico


Am 05.06.22 um 10:37 schrieb Amin Borjian:

Hi.

We are updating our Spark cluster from version 3.0.1 to 3.2.1 in order to get benefits from lots of improvement. Everything was good until we see strange behavior. Assume follow protobuf structure:

message Model {

     string name = 1;

     repeated string keywords = 2;

}

We store protobuf in parquet files with parquet library in HDFS. Before Spark 3.2.1 we could run below query on Spark:

val df = spark.read.parquet("/path/to/parquet")

df.registerTempTable("models")

spark.sql("select * from models where array_contains(keywords, 'XXX’)").show(false)

However after updating Spark to version 3.2.1, we receive following error (at the end of email). I think we lost good feature! Is it by mistake or on purpose? Can we some how fix problem without reverting or not? Should we wait for new release or not? Thank you in advance for help.

Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.

  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)

  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)

  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)

  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)

  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)

  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)

  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)

  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)

  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)

  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)

  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)

  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)

  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)

  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)

  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)

  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)

  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)

  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)

  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)

  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)

  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)

  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)

  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)

  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)

  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)

  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)

  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

  at org.apache.spark.scheduler.Task.run(Task.scala:131)

  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)

  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)

  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)

  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

Reply via email to