This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7201e60bb3e8 [SPARK-53512][SQL] Better unification of DSv2 PushDownUtils 7201e60bb3e8 is described below commit 7201e60bb3e898f65db5045ec293c650e14cfa95 Author: yhuang-db <itisyuch...@gmail.com> AuthorDate: Wed Sep 10 20:26:32 2025 -0700 [SPARK-53512][SQL] Better unification of DSv2 PushDownUtils ### What changes were proposed in this pull request? This PR makes SupportsPushDownCatalystFilters a trait that extends ScanBuilder, and adds it to the pattern matching of PushDownUtils. ### Why are the changes needed? Currently, the PushDownUtils has following matching cases: ``` object PushDownUtils { def pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression]) : (Either[Seq[sources.Filter], Seq[Predicate]], Seq[Expression]) = { scanBuilder match { case r: SupportsPushDownFilters => ... // public interface extends ScanBuilder case r: SupportsPushDownV2Filters => ... // public interface extends ScanBuilder case f: FileScanBuilder => ... // which extends with SupportsPushDownCatalystFilters case _ => (Left(Nil), filters) } } } ``` As a result, when a new scanBuilder wants to SupportsPushDownCatalystFilters (but doesn't want to extend FileScanBuilder), it will not be picked up by PushDownUtils. To better unify these filter pushdown interfaces, it would be better if the matching cases are like following: ``` scanBuilder match { case r: SupportsPushDownFilters => ... // public interface extends ScanBuilder case r: SupportsPushDownV2Filters => ... // public interface extends ScanBuilder case r: SupportsPushDownCatalystFilters => ... // trait extends ScanBuilder case _ => (Left(Nil), filters) } ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52257 from yhuang-db/support_pushdown_catalyst_filter. Authored-by: yhuang-db <itisyuch...@gmail.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../sql/internal/connector/SupportsPushDownCatalystFilters.scala | 7 ++++--- .../apache/spark/sql/execution/datasources/v2/PushDownUtils.scala | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala index 4641a06ba3e1..2dc9651adf77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala @@ -18,14 +18,15 @@ package org.apache.spark.sql.internal.connector import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read.ScanBuilder /** - * A mix-in interface for {@link FileScanBuilder}. File sources can implement this interface to - * push down filters to the file source. The pushed down filters will be separated into partition + * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to + * push down filters to the data source. The pushed down filters will be separated into partition * filters and data filters. Partition filters are used for partition pruning and data filters are * used to reduce the size of the data to be read. */ -trait SupportsPushDownCatalystFilters { +trait SupportsPushDownCatalystFilters extends ScanBuilder { /** * Pushes down catalyst Expression filters (which will be separated into partition filters and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 34a1adcb6e09..7c7b6d5488d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset, SupportsPushDownRequiredColumns, SupportsPushDownTableSample, SupportsPushDownTopN, SupportsPushDownV2Filters} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType import org.apache.spark.util.ArrayImplicits._ @@ -107,9 +108,9 @@ object PushDownUtils { (Right(r.pushedPredicates.toImmutableArraySeq), (postScanFilters ++ untranslatableExprs).toImmutableArraySeq) - case f: FileScanBuilder => - val postScanFilters = f.pushFilters(filters) - (Right(f.pushedFilters.toImmutableArraySeq), postScanFilters) + case r: SupportsPushDownCatalystFilters => + val postScanFilters = r.pushFilters(filters) + (Right(r.pushedFilters.toImmutableArraySeq), postScanFilters) case _ => (Left(Nil), filters) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org