This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bc7a766ff3e8 feat: use ScanOperation for Spark 3.3 and 3.4 partition 
pruning (#17936)
bc7a766ff3e8 is described below

commit bc7a766ff3e810caaf18d288cc3a386d0c94d008
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Apr 7 11:40:04 2026 -0700

    feat: use ScanOperation for Spark 3.3 and 3.4 partition pruning (#17936)
    
    This PR adds version-specific partition pruning implementations for Spark 
3.3, 3.4, and 3.5 to use the appropriate pattern matching strategy.
    
    Changes:
    
    Added Spark33HoodiePruneFileSourcePartitions using ScanOperation for Spark 
3.3
    Updated HoodieAnalysis to route to the correct implementation based on 
Spark version. Spark3.4 and greater will use 
Spark3HoodiePruneFileSourcePartitions.
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |   6 +-
 .../Spark33HoodiePruneFileSourcePartitions.scala   | 133 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 3371a6c039bb..e7576359c2a4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -160,8 +160,12 @@ object HoodieAnalysis extends SparkAdapterSupport {
     //          - Precedes actual [[customEarlyScanPushDownRules]] invocation
     val pruneFileSourcePartitionsClass = if (HoodieSparkUtils.gteqSpark4_0) {
       
"org.apache.spark.sql.hudi.analysis.Spark4HoodiePruneFileSourcePartitions"
-    } else {
+    } else if (HoodieSparkUtils.gteqSpark3_4) {
+      // Spark 3.4 and 3.5: PhysicalOperation and ScanOperation unified 
(SPARK-39764)
       
"org.apache.spark.sql.hudi.analysis.Spark3HoodiePruneFileSourcePartitions"
+    } else {
+      // Spark 3.3: Use ScanOperation for better compatibility
+      
"org.apache.spark.sql.hudi.analysis.Spark33HoodiePruneFileSourcePartitions"
     }
     rules += (spark => instantiateKlass(pruneFileSourcePartitionsClass, spark))
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/analysis/Spark33HoodiePruneFileSourcePartitions.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/analysis/Spark33HoodiePruneFileSourcePartitions.scala
new file mode 100644
index 000000000000..7d7240231cd0
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/analysis/Spark33HoodiePruneFileSourcePartitions.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex}
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, ExpressionSet, NamedExpression, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, 
LogicalPlan, Project}
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import 
org.apache.spark.sql.hudi.analysis.Spark33HoodiePruneFileSourcePartitions.{exprUtils,
 getPartitionFiltersAndDataFilters, rebuildPhysicalOperation, 
HoodieRelationMatcher}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Prune the partitions of Hudi table based relations by the means of pushing 
down the
+ * partition filters
+ *
+ * NOTE: In Spark 3.3 and earlier, ScanOperation and PhysicalOperation had 
different behaviors
+ *       for filter pushdown and partition pruning. ScanOperation provides 
better compatibility
+ *       with Spark's internal query optimization in 3.3.
+ *
+ * NOTE: SPARK-39764 unified PhysicalOperation and ScanOperation in Spark 
3.4+, making this
+ *       version-specific implementation necessary only for Spark 3.3.
+ *
+ * @see [[Spark3HoodiePruneFileSourcePartitions]] for Spark 3.4 and 3.5
+ * @see [[Spark4HoodiePruneFileSourcePartitions]] for Spark 4.0+
+ */
+case class Spark33HoodiePruneFileSourcePartitions(spark: SparkSession) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    case op @ ScanOperation(projects, filters, lr @ 
LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _))
+      if !fileIndex.hasPredicatesPushedDown =>
+
+      val deterministicFilters = filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f))
+      val normalizedFilters = exprUtils.normalizeExprs(deterministicFilters, 
lr.output)
+
+      val (partitionPruningFilters, dataFilters) =
+        getPartitionFiltersAndDataFilters(fileIndex.partitionSchema, 
normalizedFilters)
+
+      // [[HudiFileIndex]] is a caching one, therefore we don't need to 
reconstruct new relation,
+      // instead we simply just refresh the index and update the stats
+      fileIndex.filterFileSlices(dataFilters, partitionPruningFilters, 
isPartitionPruned = true)
+
+      if (partitionPruningFilters.nonEmpty) {
+        // Change table stats based on the sizeInBytes of pruned files
+        val filteredStats = 
FilterEstimation(Filter(partitionPruningFilters.reduce(And), lr)).estimate
+        val colStats = filteredStats.map {
+          _.attributeStats.map { case (attr, colStat) =>
+            (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType))
+          }
+        }
+
+        val tableWithStats = lr.catalogTable.map(_.copy(
+          stats = Some(
+            CatalogStatistics(
+              sizeInBytes = BigInt(fileIndex.sizeInBytes),
+              rowCount = filteredStats.flatMap(_.rowCount),
+              colStats = colStats.getOrElse(Map.empty)))
+        ))
+
+        val prunedLogicalRelation = lr.copy(catalogTable = tableWithStats)
+        // Keep partition-pruning predicates so that they are visible in 
physical planning
+        rebuildPhysicalOperation(projects, filters, prunedLogicalRelation)
+      } else {
+        op
+      }
+  }
+
+}
+
+private object Spark33HoodiePruneFileSourcePartitions extends PredicateHelper {
+
+  private val exprUtils = sparkAdapter.getCatalystExpressionUtils
+
+  private object HoodieRelationMatcher {
+    def unapply(relation: BaseRelation): Option[HoodieFileIndex] = relation 
match {
+      case HadoopFsRelation(fileIndex: HoodieFileIndex, _, _, _, _, _) => 
Some(fileIndex)
+      case r: HoodieBaseRelation => Some(r.fileIndex)
+      case _ => None
+    }
+  }
+
+  private def rebuildPhysicalOperation(projects: Seq[NamedExpression],
+                                       filters: Seq[Expression],
+                                       relation: LeafNode): Project = {
+    val withFilter = if (filters.nonEmpty) {
+      val filterExpression = filters.reduceLeft(And)
+      Filter(filterExpression, relation)
+    } else {
+      relation
+    }
+    Project(projects, withFilter)
+  }
+
+  def getPartitionFiltersAndDataFilters(partitionSchema: StructType,
+                                        normalizedFilters: Seq[Expression]): 
(Seq[Expression], Seq[Expression]) = {
+    val partitionColumns = normalizedFilters.flatMap { expr =>
+      expr.collect {
+        case attr: AttributeReference if 
partitionSchema.names.contains(attr.name) =>
+          attr
+      }
+    }
+    val partitionSet = AttributeSet(partitionColumns)
+    val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
+      f.references.subsetOf(partitionSet)
+    )
+    val extraPartitionFilter =
+      dataFilters.flatMap(exprUtils.extractPredicatesWithinOutputSet(_, 
partitionSet))
+    (ExpressionSet(partitionFilters ++ extraPartitionFilter).toSeq, 
dataFilters)
+  }
+
+}

Reply via email to