This is an automated email from the ASF dual-hosted git repository.
szehon-ho pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new e77dd06b39dc [SPARK-57139][SQL] Skip deriving PartitionPredicate for
partially pushed down filters
e77dd06b39dc is described below
commit e77dd06b39dcdf69c47fb21c9d889f99acf66298
Author: Szehon Ho <[email protected]>
AuthorDate: Fri May 29 13:55:07 2026 -0700
[SPARK-57139][SQL] Skip deriving PartitionPredicate for partially pushed
down filters
### What changes were proposed in this pull request?
In `PushDownUtils.pushFilters`, for scans implementing
`SupportsPushDownV2Filters` with iterative pushdown
(`supportsIterativePushdown() == true`), a second pass derives
`PartitionPredicate`s from filters left over after the first pass and pushes
them down.
Previously, the candidate filters for this second pass were taken from the
predicates **returned** by `pushPredicates()` (the post-scan filters). Per the
`SupportsPushDownV2Filters` contract, that return value contains both:
- non-pushable predicates, and
- pushable predicates that were accepted but still need post-scan
evaluation (partial pushdown, e.g. a Parquet row group filter).
The latter are reported by `pushedPredicates()`. Using the returned
predicates as candidates therefore re-derived `PartitionPredicate`s from
filters that were **already pushed** in the first pass, pushing the same filter
down twice.
This PR changes the second-pass candidate selection to only use filters
that were **not** already pushed down in the first pass (i.e. not in
`pushedPredicates()`). Filters that were pushed but still need post-scan
evaluation remain in the post-scan set, but are no longer re-derived as
`PartitionPredicate`s. This mirrors the existing runtime-filter path
(`pushRuntimeFilters`), which already excludes already-pushed predicates.
### Why are the changes needed?
The previous behavior pushed the same filter to the data source twice (once
as the original predicate in the first pass, and again as a
`PartitionPredicate` in the second pass) whenever a data source partially
pushes a partition filter (accepts it but also returns it for post-scan
evaluation). This is redundant work and inconsistent with the documented
contract and the runtime-filter path.
### Does this PR introduce _any_ user-facing change?
No. This affects the in-progress iterative `PartitionPredicate` pushdown
path (SPARK-55596) and is not part of a released Spark version.
### How was this patch tested?
Added unit tests to `DataSourceV2EnhancedPartitionFilterSuite` (case 9,
plus a nested-partition variant) covering a partition filter that is accepted
and also returned in the first pass; the test asserts it is pruned in the first
pass, kept as a post-scan filter, and not re-pushed as a `PartitionPredicate`
in the second pass. A new `return-accepted-partition-predicates` property was
added to `InMemoryEnhancedPartitionFilterTable` to simulate partial pushdown.
All 28 tests in the suite pass.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4.8)
Closes #56195 from szehon-ho/fix_partition_predicate_pushdown.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Szehon Ho <[email protected]>
(cherry picked from commit c97b33d8d35c6c2623d53a9f02eb4300575a6b9b)
Signed-off-by: Szehon Ho <[email protected]>
---
.../InMemoryEnhancedPartitionFilterTable.scala | 21 +++++++++
.../execution/datasources/v2/PushDownUtils.scala | 34 ++++++++++----
.../DataSourceV2EnhancedPartitionFilterSuite.scala | 54 ++++++++++++++++++++++
3 files changed, 101 insertions(+), 8 deletions(-)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala
index 4cfca8a62f57..979cf1fded74 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala
@@ -73,6 +73,15 @@ class InMemoryEnhancedPartitionFilterTable(
InMemoryEnhancedPartitionFilterTable.AcceptDataPredicatesKey, "false")
.toBoolean
+ // Default false. When true, first-pass partition predicates that are
accepted (pushed) are
+ // also returned for post-scan evaluation, simulating a partial pushdown
(e.g. a Parquet
+ // row group filter). Such predicates are reported by pushedPredicates()
but still appear in
+ // the pushPredicates() return value.
+ private val returnAcceptedPartitionPredicates =
+ InMemoryEnhancedPartitionFilterTable.this.properties.getOrDefault(
+
InMemoryEnhancedPartitionFilterTable.ReturnAcceptedPartitionPredicatesKey,
"false")
+ .toBoolean
+
override def supportsIterativePushdown(): Boolean = true
override def pushPredicates(predicates: Array[Predicate]):
Array[Predicate] = {
@@ -95,6 +104,10 @@ class InMemoryEnhancedPartitionFilterTable(
InMemoryTableWithV2Filter.supportsPredicates(Array(p)) =>
if (acceptPartitionPredicates) {
firstPassPushedPredicates += p
+ // Simulate partial pushdown: pushed, but still returned for
post-scan.
+ if (returnAcceptedPartitionPredicates) {
+ returned += p
+ }
} else {
returned += p
}
@@ -166,4 +179,12 @@ object InMemoryEnhancedPartitionFilterTable {
* mocking a data source that can evaluate this particular data predicate).
*/
private[catalog] val AcceptDataPredicatesKey = "accept-data-predicates"
+
+ /**
+ * Table property: when "true", first-pass partition predicates that are
accepted (pushed) are
+ * also returned for post-scan evaluation, simulating a partial pushdown
(e.g. a Parquet
+ * row group filter). Used to verify that already-pushed predicates are not
re-derived as
+ * PartitionPredicates in the second pass.
+ */
+ private[catalog] val ReturnAcceptedPartitionPredicatesKey =
"return-accepted-partition-predicates"
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index c54cc98014d7..7df6ade32c48 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -112,19 +112,25 @@ object PushDownUtils extends Logging {
}
}
- val rejectedFilters = r.pushPredicates(translatedFilters.toArray).map
{ predicate =>
- DataSourceV2Strategy.rebuildExpressionFromFilter(predicate,
translatedFilterToExpr)
- }
+ val postScanPredicates = r.pushPredicates(translatedFilters.toArray)
- val remainingFilters = (rejectedFilters ++ untranslatableExprs).toSeq
- val postScanFilters =
+ val finalPostScanFilters =
if (!partitionFields.exists(_.nonEmpty) ||
!r.supportsIterativePushdown) {
- remainingFilters
+ rebuildExpressions(postScanPredicates.toSeq,
translatedFilterToExpr) ++
+ untranslatableExprs
} else {
- pushPartitionPredicates(r, partitionFields.get, remainingFilters)
+ // Second pass: only filters that were not already pushed down
(partially or fully)
+ // in the first pass (not in pushedPredicates) are eligible to be
pushed down again.
+ // This avoids pushing the same filter down twice.
+ val (pushedPostScanFilters, notPushedPostScanFilters) =
+
postScanPredicates.toSeq.partition(r.pushedPredicates().toSet.contains)
+ val candidates = rebuildExpressions(notPushedPostScanFilters,
translatedFilterToExpr) ++
+ untranslatableExprs
+ pushPartitionPredicates(r, partitionFields.get, candidates) ++
+ rebuildExpressions(pushedPostScanFilters, translatedFilterToExpr)
}
- val orderedPostScanFilters = prioritizeFilters(postScanFilters,
+ val orderedPostScanFilters = prioritizeFilters(finalPostScanFilters,
ExpressionSet(untranslatableExprs))
(Right(r.pushedPredicates.toImmutableArraySeq), orderedPostScanFilters)
case r: SupportsPushDownCatalystFilters =>
@@ -134,6 +140,18 @@ object PushDownUtils extends Logging {
}
}
+ /**
+ * Rebuilds the Catalyst [[Expression]]s for a sequence of data source
[[Predicate]]s, using the
+ * mapping from translated data source predicates to their original Catalyst
expressions.
+ */
+ private def rebuildExpressions(
+ predicates: Seq[Predicate],
+ translatedFilterToExpr: mutable.HashMap[Predicate, Expression]):
Seq[Expression] = {
+ predicates.map { predicate =>
+ DataSourceV2Strategy.rebuildExpressionFromFilter(predicate,
translatedFilterToExpr)
+ }
+ }
+
/**
* Pushes runtime filters to a [[SupportsRuntimeV2Filtering]] scan.
Translatable filters are
* pushed first, followed by [[PartitionPredicate]] if the scan supports
iterative filtering.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2EnhancedPartitionFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2EnhancedPartitionFilterSuite.scala
index 956a88406b0f..3fe928daf9e0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2EnhancedPartitionFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2EnhancedPartitionFilterSuite.scala
@@ -49,6 +49,8 @@ import org.apache.spark.sql.test.SharedSparkSession
* 6. Untranslatable, Data Filter -> Post-Scan Filters
* 7. Untranslatable, Partition Filter, 2nd Pass Returned -> Post-Scan Filters
* 8. Untranslatable, Partition Filter, 2nd Pass Accepted -> Pushed Down
+ * 9. Translated, Partition Filter, 1st Pass Accepted AND Returned (partial
pushdown) ->
+ * Pushed Down in 1st pass, NOT re-derived in 2nd pass.
*/
class DataSourceV2EnhancedPartitionFilterSuite
extends SharedSparkSession with BeforeAndAfter with PredicateHelper {
@@ -220,6 +222,30 @@ class DataSourceV2EnhancedPartitionFilterSuite
}
}
+ test("case 9: partition filter pushed but returned in first pass is not
re-pushed second pass") {
+ withTable(partFilterTableName) {
+ // The source accepts the partition predicate in the first pass (so it
is reported by
+ // pushedPredicates and prunes partitions) but also returns it for
post-scan evaluation,
+ // simulating a partial pushdown (e.g. a row group filter).
+ sql(s"CREATE TABLE $partFilterTableName (part_col string, data string)
USING $v2Source " +
+ "PARTITIONED BY (part_col) " +
+ "TBLPROPERTIES('return-accepted-partition-predicates' = 'true')")
+ sql(s"INSERT INTO $partFilterTableName VALUES ('a', 'x'), ('b', 'y'),
('c', 'z')")
+
+ // Translated, Partition Filter; 1st Pass Accepted AND Returned.
+ // The second pass derives PartitionPredicates only from filters that
were NOT already
+ // pushed (not in pushedPredicates). Since this filter was pushed in the
first pass, it must
+ // NOT be pushed again as a PartitionPredicate in the second pass.
+ val df = sql(s"SELECT * FROM $partFilterTableName WHERE part_col = 'b'")
+ checkAnswer(df, Seq(Row("b", "y")))
+ assertPushedPartitionPredicates(df, 0)
+ assertScanReturnsPartitionKeys(df, Set("b"))
+ // The returned filter is still evaluated after the scan.
+ assert(df.queryExecution.executedPlan.exists(_.isInstanceOf[FilterExec]),
+ "Partition filter returned in first pass should remain as a post-scan
Filter")
+ }
+ }
+
test("nested identity partition: second-pass PartitionPredicate with UDF on
nested key") {
withTable(partFilterTableName) {
sql(s"CREATE TABLE $partFilterTableName " +
@@ -257,6 +283,34 @@ class DataSourceV2EnhancedPartitionFilterSuite
}
}
+ test("nested identity partition: case 9 partition filter pushed but returned
in first pass " +
+ "is not re-pushed second pass") {
+ withTable(partFilterTableName) {
+ // The source accepts the partition predicate in the first pass (so it
is reported by
+ // pushedPredicates and prunes partitions) but also returns it for
post-scan evaluation,
+ // simulating a partial pushdown (e.g. a row group filter).
+ sql(s"CREATE TABLE $partFilterTableName " +
+ s"(s struct<tz: string, x: int>, data string) USING $v2Source " +
+ "PARTITIONED BY (s.tz) " +
+ "TBLPROPERTIES('return-accepted-partition-predicates' = 'true')")
+ sql(s"INSERT INTO $partFilterTableName VALUES " +
+ "(named_struct('tz', 'LA', 'x', 1), 'a'), " +
+ "(named_struct('tz', 'NY', 'x', 2), 'b')")
+
+ // Translated, Partition Filter; 1st Pass Accepted AND Returned.
+ // The second pass derives PartitionPredicates only from filters that
were NOT already
+ // pushed (not in pushedPredicates). Since this filter was pushed in the
first pass, it must
+ // NOT be pushed again as a PartitionPredicate in the second pass.
+ val df = sql(s"SELECT * FROM $partFilterTableName WHERE s.tz = 'LA'")
+ checkAnswer(df, Seq(Row(Row("LA", 1), "a")))
+ assertPushedPartitionPredicates(df, 0)
+ assertScanReturnsPartitionKeys(df, Set("LA"))
+ // The returned filter is still evaluated after the scan.
+ assert(df.queryExecution.executedPlan.exists(_.isInstanceOf[FilterExec]),
+ "Partition filter returned in first pass should remain as a post-scan
Filter")
+ }
+ }
+
test("nested identity partition: field name containing a dot") {
withTable(partFilterTableName) {
sql(s"CREATE TABLE $partFilterTableName " +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]