This is an automated email from the ASF dual-hosted git repository.

szehon-ho pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new e77dd06b39dc [SPARK-57139][SQL] Skip deriving PartitionPredicate for 
partially pushed down filters
e77dd06b39dc is described below

commit e77dd06b39dcdf69c47fb21c9d889f99acf66298
Author: Szehon Ho <[email protected]>
AuthorDate: Fri May 29 13:55:07 2026 -0700

    [SPARK-57139][SQL] Skip deriving PartitionPredicate for partially pushed 
down filters
    
    ### What changes were proposed in this pull request?
    
    In `PushDownUtils.pushFilters`, for scans implementing 
`SupportsPushDownV2Filters` with iterative pushdown 
(`supportsIterativePushdown() == true`), a second pass derives 
`PartitionPredicate`s from filters left over after the first pass and pushes 
them down.
    
    Previously, the candidate filters for this second pass were taken from the 
predicates **returned** by `pushPredicates()` (the post-scan filters). Per the 
`SupportsPushDownV2Filters` contract, that return value contains both:
    - non-pushable predicates, and
    - pushable predicates that were accepted but still need post-scan 
evaluation (partial pushdown, e.g. a Parquet row group filter).
    
    The latter are reported by `pushedPredicates()`. Using the returned 
predicates as candidates therefore re-derived `PartitionPredicate`s from 
filters that were **already pushed** in the first pass, pushing the same filter 
down twice.
    
    This PR changes the second-pass candidate selection to only use filters 
that were **not** already pushed down in the first pass (i.e. not in 
`pushedPredicates()`). Filters that were pushed but still need post-scan 
evaluation remain in the post-scan set, but are no longer re-derived as 
`PartitionPredicate`s. This mirrors the existing runtime-filter path 
(`pushRuntimeFilters`), which already excludes already-pushed predicates.
    
    ### Why are the changes needed?
    
    The previous behavior pushed the same filter to the data source twice (once 
as the original predicate in the first pass, and again as a 
`PartitionPredicate` in the second pass) whenever a data source partially 
pushes a partition filter (accepts it but also returns it for post-scan 
evaluation). This is redundant work and inconsistent with the documented 
contract and the runtime-filter path.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This affects the in-progress iterative `PartitionPredicate` pushdown 
path (SPARK-55596) and is not part of a released Spark version.
    
    ### How was this patch tested?
    
    Added unit tests to `DataSourceV2EnhancedPartitionFilterSuite` (case 9, 
plus a nested-partition variant) covering a partition filter that is accepted 
and also returned in the first pass; the test asserts it is pruned in the first 
pass, kept as a post-scan filter, and not re-pushed as a `PartitionPredicate` 
in the second pass. A new `return-accepted-partition-predicates` property was 
added to `InMemoryEnhancedPartitionFilterTable` to simulate partial pushdown. 
All 28 tests in the suite pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor (Claude Opus 4.8)
    
    Closes #56195 from szehon-ho/fix_partition_predicate_pushdown.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Szehon Ho <[email protected]>
    (cherry picked from commit c97b33d8d35c6c2623d53a9f02eb4300575a6b9b)
    Signed-off-by: Szehon Ho <[email protected]>
---
 .../InMemoryEnhancedPartitionFilterTable.scala     | 21 +++++++++
 .../execution/datasources/v2/PushDownUtils.scala   | 34 ++++++++++----
 .../DataSourceV2EnhancedPartitionFilterSuite.scala | 54 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala
index 4cfca8a62f57..979cf1fded74 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala
@@ -73,6 +73,15 @@ class InMemoryEnhancedPartitionFilterTable(
         InMemoryEnhancedPartitionFilterTable.AcceptDataPredicatesKey, "false")
         .toBoolean
 
+    // Default false. When true, first-pass partition predicates that are 
accepted (pushed) are
+    // also returned for post-scan evaluation, simulating a partial pushdown 
(e.g. a Parquet
+    // row group filter). Such predicates are reported by pushedPredicates() 
but still appear in
+    // the pushPredicates() return value.
+    private val returnAcceptedPartitionPredicates =
+      InMemoryEnhancedPartitionFilterTable.this.properties.getOrDefault(
+        
InMemoryEnhancedPartitionFilterTable.ReturnAcceptedPartitionPredicatesKey, 
"false")
+        .toBoolean
+
     override def supportsIterativePushdown(): Boolean = true
 
     override def pushPredicates(predicates: Array[Predicate]): 
Array[Predicate] = {
@@ -95,6 +104,10 @@ class InMemoryEnhancedPartitionFilterTable(
             InMemoryTableWithV2Filter.supportsPredicates(Array(p)) =>
           if (acceptPartitionPredicates) {
             firstPassPushedPredicates += p
+            // Simulate partial pushdown: pushed, but still returned for 
post-scan.
+            if (returnAcceptedPartitionPredicates) {
+              returned += p
+            }
           } else {
             returned += p
           }
@@ -166,4 +179,12 @@ object InMemoryEnhancedPartitionFilterTable {
    * mocking a data source that can evaluate this particular data predicate).
    */
   private[catalog] val AcceptDataPredicatesKey = "accept-data-predicates"
+
+  /**
+   * Table property: when "true", first-pass partition predicates that are 
accepted (pushed) are
+   * also returned for post-scan evaluation, simulating a partial pushdown 
(e.g. a Parquet
+   * row group filter). Used to verify that already-pushed predicates are not 
re-derived as
+   * PartitionPredicates in the second pass.
+   */
+  private[catalog] val ReturnAcceptedPartitionPredicatesKey = 
"return-accepted-partition-predicates"
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index c54cc98014d7..7df6ade32c48 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -112,19 +112,25 @@ object PushDownUtils extends Logging {
           }
         }
 
-        val rejectedFilters = r.pushPredicates(translatedFilters.toArray).map 
{ predicate =>
-          DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
-        }
+        val postScanPredicates = r.pushPredicates(translatedFilters.toArray)
 
-        val remainingFilters = (rejectedFilters ++ untranslatableExprs).toSeq
-        val postScanFilters =
+        val finalPostScanFilters =
           if (!partitionFields.exists(_.nonEmpty) || 
!r.supportsIterativePushdown) {
-            remainingFilters
+            rebuildExpressions(postScanPredicates.toSeq, 
translatedFilterToExpr) ++
+              untranslatableExprs
           } else {
-            pushPartitionPredicates(r, partitionFields.get, remainingFilters)
+            // Second pass: only filters that were not already pushed down 
(partially or fully)
+            // in the first pass (not in pushedPredicates) are eligible to be 
pushed down again.
+            // This avoids pushing the same filter down twice.
+            val (pushedPostScanFilters, notPushedPostScanFilters) =
+              
postScanPredicates.toSeq.partition(r.pushedPredicates().toSet.contains)
+            val candidates = rebuildExpressions(notPushedPostScanFilters, 
translatedFilterToExpr) ++
+              untranslatableExprs
+            pushPartitionPredicates(r, partitionFields.get, candidates) ++
+              rebuildExpressions(pushedPostScanFilters, translatedFilterToExpr)
           }
 
-        val orderedPostScanFilters = prioritizeFilters(postScanFilters,
+        val orderedPostScanFilters = prioritizeFilters(finalPostScanFilters,
           ExpressionSet(untranslatableExprs))
         (Right(r.pushedPredicates.toImmutableArraySeq), orderedPostScanFilters)
       case r: SupportsPushDownCatalystFilters =>
@@ -134,6 +140,18 @@ object PushDownUtils extends Logging {
     }
   }
 
+  /**
+   * Rebuilds the Catalyst [[Expression]]s for a sequence of data source 
[[Predicate]]s, using the
+   * mapping from translated data source predicates to their original Catalyst 
expressions.
+   */
+  private def rebuildExpressions(
+      predicates: Seq[Predicate],
+      translatedFilterToExpr: mutable.HashMap[Predicate, Expression]): 
Seq[Expression] = {
+    predicates.map { predicate =>
+      DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+    }
+  }
+
   /**
    * Pushes runtime filters to a [[SupportsRuntimeV2Filtering]] scan. 
Translatable filters are
    * pushed first, followed by [[PartitionPredicate]] if the scan supports 
iterative filtering.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2EnhancedPartitionFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2EnhancedPartitionFilterSuite.scala
index 956a88406b0f..3fe928daf9e0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2EnhancedPartitionFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2EnhancedPartitionFilterSuite.scala
@@ -49,6 +49,8 @@ import org.apache.spark.sql.test.SharedSparkSession
  * 6. Untranslatable, Data Filter -> Post-Scan Filters
  * 7. Untranslatable, Partition Filter, 2nd Pass Returned -> Post-Scan Filters
  * 8. Untranslatable, Partition Filter, 2nd Pass Accepted -> Pushed Down
+ * 9. Translated, Partition Filter, 1st Pass Accepted AND Returned (partial 
pushdown) ->
+ *    Pushed Down in 1st pass, NOT re-derived in 2nd pass.
  */
 class DataSourceV2EnhancedPartitionFilterSuite
   extends SharedSparkSession with BeforeAndAfter with PredicateHelper {
@@ -220,6 +222,30 @@ class DataSourceV2EnhancedPartitionFilterSuite
     }
   }
 
+  test("case 9: partition filter pushed but returned in first pass is not 
re-pushed second pass") {
+    withTable(partFilterTableName) {
+      // The source accepts the partition predicate in the first pass (so it 
is reported by
+      // pushedPredicates and prunes partitions) but also returns it for 
post-scan evaluation,
+      // simulating a partial pushdown (e.g. a row group filter).
+      sql(s"CREATE TABLE $partFilterTableName (part_col string, data string) 
USING $v2Source " +
+        "PARTITIONED BY (part_col) " +
+        "TBLPROPERTIES('return-accepted-partition-predicates' = 'true')")
+      sql(s"INSERT INTO $partFilterTableName VALUES ('a', 'x'), ('b', 'y'), 
('c', 'z')")
+
+      // Translated, Partition Filter; 1st Pass Accepted AND Returned.
+      // The second pass derives PartitionPredicates only from filters that 
were NOT already
+      // pushed (not in pushedPredicates). Since this filter was pushed in the 
first pass, it must
+      // NOT be pushed again as a PartitionPredicate in the second pass.
+      val df = sql(s"SELECT * FROM $partFilterTableName WHERE part_col = 'b'")
+      checkAnswer(df, Seq(Row("b", "y")))
+      assertPushedPartitionPredicates(df, 0)
+      assertScanReturnsPartitionKeys(df, Set("b"))
+      // The returned filter is still evaluated after the scan.
+      assert(df.queryExecution.executedPlan.exists(_.isInstanceOf[FilterExec]),
+        "Partition filter returned in first pass should remain as a post-scan 
Filter")
+    }
+  }
+
   test("nested identity partition: second-pass PartitionPredicate with UDF on 
nested key") {
     withTable(partFilterTableName) {
       sql(s"CREATE TABLE $partFilterTableName " +
@@ -257,6 +283,34 @@ class DataSourceV2EnhancedPartitionFilterSuite
     }
   }
 
+  test("nested identity partition: case 9 partition filter pushed but returned 
in first pass " +
+    "is not re-pushed second pass") {
+    withTable(partFilterTableName) {
+      // The source accepts the partition predicate in the first pass (so it 
is reported by
+      // pushedPredicates and prunes partitions) but also returns it for 
post-scan evaluation,
+      // simulating a partial pushdown (e.g. a row group filter).
+      sql(s"CREATE TABLE $partFilterTableName " +
+        s"(s struct<tz: string, x: int>, data string) USING $v2Source " +
+        "PARTITIONED BY (s.tz) " +
+        "TBLPROPERTIES('return-accepted-partition-predicates' = 'true')")
+      sql(s"INSERT INTO $partFilterTableName VALUES " +
+        "(named_struct('tz', 'LA', 'x', 1), 'a'), " +
+        "(named_struct('tz', 'NY', 'x', 2), 'b')")
+
+      // Translated, Partition Filter; 1st Pass Accepted AND Returned.
+      // The second pass derives PartitionPredicates only from filters that 
were NOT already
+      // pushed (not in pushedPredicates). Since this filter was pushed in the 
first pass, it must
+      // NOT be pushed again as a PartitionPredicate in the second pass.
+      val df = sql(s"SELECT * FROM $partFilterTableName WHERE s.tz = 'LA'")
+      checkAnswer(df, Seq(Row(Row("LA", 1), "a")))
+      assertPushedPartitionPredicates(df, 0)
+      assertScanReturnsPartitionKeys(df, Set("LA"))
+      // The returned filter is still evaluated after the scan.
+      assert(df.queryExecution.executedPlan.exists(_.isInstanceOf[FilterExec]),
+        "Partition filter returned in first pass should remain as a post-scan 
Filter")
+    }
+  }
+
   test("nested identity partition: field name containing a dot") {
     withTable(partFilterTableName) {
       sql(s"CREATE TABLE $partFilterTableName " +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to