Ethan Guo created HUDI-4303:
-------------------------------
Summary: Partition pruning fails for non-string partition field in
Spark
Key: HUDI-4303
URL: https://issues.apache.org/jira/browse/HUDI-4303
Project: Apache Hudi
Issue Type: Bug
Components: spark
Affects Versions: 0.11.1
Reporter: Ethan Guo
Fix For: 0.12.0
When querying a partitioned Hudi table storing github archive data (schema
shown below) with the partition field in timestamp type, the query triggering
partition prunning fails due to ClassCastException.
Environment: Spark 3.2.1, Hudi 0.11.1. With Hudi 0.10.0, the filtering works.
Schema:
{code:java}
scala> df.printSchema
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- type: string (nullable = true)
|-- public: boolean (nullable = false)
|-- payload: string (nullable = true)
|-- repo: struct (nullable = false)
| |-- id: long (nullable = false)
| |-- name: string (nullable = true)
| |-- url: string (nullable = true)
|-- actor: struct (nullable = false)
| |-- id: long (nullable = false)
| |-- login: string (nullable = true)
| |-- gravatar_id: string (nullable = true)
| |-- url: string (nullable = true)
| |-- avatar_url: string (nullable = true)
|-- org: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- login: string (nullable = true)
| |-- gravatar_id: string (nullable = true)
| |-- url: string (nullable = true)
| |-- avatar_url: string (nullable = true)
|-- created_at: timestamp (nullable = true)
|-- id: string (nullable = true)
|-- other: string (nullable = true) {code}
hoodie.properties:
{code:java}
hoodie.table.name=github-raw
hoodie.table.type=MERGE_ON_READ
hoodie.table.version=4
hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
hoodie.archivelog.folder=archived
hoodie.table.base.file.format=PARQUET
hoodie.table.precombine.field=created_at
hoodie.table.partition.fields=created_at
hoodie.table.recordkey.fields=id
hoodie.populate.meta.fields=true
hoodie.table.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.timeline.layout.version=1
hoodie.table.checksum=3814878680 {code}
Fullstack stace:
{code:java}
scala> val df =
spark.read.format("hudi").load("<table_path>").filter(col("created_at").between("2021-10",
"2022-03"))
scala> df.count
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:49)
at scala.math.LowPriorityOrderingImplicits$$anon$2.compare(Ordering.scala:150)
at scala.math.Ordering.gteq(Ordering.scala:94)
at scala.math.Ordering.gteq$(Ordering.scala:94)
at scala.math.LowPriorityOrderingImplicits$$anon$2.gteq(Ordering.scala:149)
at
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.nullSafeEval(predicates.scala:1153)
at
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:574)
at org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:724)
at org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:720)
at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:64)
at
org.apache.hudi.SparkHoodieTableFileIndex.$anonfun$prunePartition$4(SparkHoodieTableFileIndex.scala:186)
at
org.apache.hudi.SparkHoodieTableFileIndex.$anonfun$prunePartition$4$adapted(SparkHoodieTableFileIndex.scala:186)
at
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at
org.apache.hudi.SparkHoodieTableFileIndex.prunePartition(SparkHoodieTableFileIndex.scala:186)
at
org.apache.hudi.SparkHoodieTableFileIndex.listFileSlices(SparkHoodieTableFileIndex.scala:147)
at
org.apache.hudi.MergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:104)
at
org.apache.hudi.MergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:41)
at org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:273)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:323)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:357)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:436)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:356)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:323)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:468)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:157)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:157)
at
org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:150)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:150)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:170)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:170)
at
org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
at
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3011)
... 47 elided {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)