This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7201e60bb3e8 [SPARK-53512][SQL] Better unification of DSv2 
PushDownUtils
7201e60bb3e8 is described below

commit 7201e60bb3e898f65db5045ec293c650e14cfa95
Author: yhuang-db <itisyuch...@gmail.com>
AuthorDate: Wed Sep 10 20:26:32 2025 -0700

    [SPARK-53512][SQL] Better unification of DSv2 PushDownUtils
    
    ### What changes were proposed in this pull request?
    
    This PR makes SupportsPushDownCatalystFilters a trait that extends 
ScanBuilder, and adds it to the pattern matching of PushDownUtils.
    
    ### Why are the changes needed?
    
    Currently, the PushDownUtils has following matching cases:
    
     ```
    object PushDownUtils {
      def pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression])
        : (Either[Seq[sources.Filter], Seq[Predicate]], Seq[Expression]) = {
        scanBuilder match {
          case r: SupportsPushDownFilters => ...  // public interface extends 
ScanBuilder
          case r: SupportsPushDownV2Filters => ... // public interface extends 
ScanBuilder
          case f: FileScanBuilder => ... // which extends with 
SupportsPushDownCatalystFilters
          case _ => (Left(Nil), filters)
        }
      }
    }
    ```
    
    As a result, when a new scanBuilder wants to 
SupportsPushDownCatalystFilters (but doesn't want to extend FileScanBuilder), 
it will not be picked up by PushDownUtils. To better unify these filter 
pushdown interfaces, it would be better if the matching cases are like 
following:
    
    ```
    scanBuilder match {
      case r: SupportsPushDownFilters => ... // public interface extends 
ScanBuilder
      case r: SupportsPushDownV2Filters => ... // public interface extends 
ScanBuilder
      case r: SupportsPushDownCatalystFilters => ... // trait extends 
ScanBuilder
      case _ => (Left(Nil), filters)
    }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52257 from yhuang-db/support_pushdown_catalyst_filter.
    
    Authored-by: yhuang-db <itisyuch...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../sql/internal/connector/SupportsPushDownCatalystFilters.scala   | 7 ++++---
 .../apache/spark/sql/execution/datasources/v2/PushDownUtils.scala  | 7 ++++---
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
index 4641a06ba3e1..2dc9651adf77 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
@@ -18,14 +18,15 @@ package org.apache.spark.sql.internal.connector
 
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.read.ScanBuilder
 
 /**
- * A mix-in interface for {@link FileScanBuilder}. File sources can implement 
this interface to
- * push down filters to the file source. The pushed down filters will be 
separated into partition
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
+ * push down filters to the data source. The pushed down filters will be 
separated into partition
  * filters and data filters. Partition filters are used for partition pruning 
and data filters are
  * used to reduce the size of the data to be read.
  */
-trait SupportsPushDownCatalystFilters {
+trait SupportsPushDownCatalystFilters extends ScanBuilder {
 
   /**
    * Pushes down catalyst Expression filters (which will be separated into 
partition filters and
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index 34a1adcb6e09..7c7b6d5488d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -27,6 +27,7 @@ import 
org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset, 
SupportsPushDownRequiredColumns, SupportsPushDownTableSample, 
SupportsPushDownTopN, SupportsPushDownV2Filters}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.ArrayImplicits._
@@ -107,9 +108,9 @@ object PushDownUtils {
         (Right(r.pushedPredicates.toImmutableArraySeq),
           (postScanFilters ++ untranslatableExprs).toImmutableArraySeq)
 
-      case f: FileScanBuilder =>
-        val postScanFilters = f.pushFilters(filters)
-        (Right(f.pushedFilters.toImmutableArraySeq), postScanFilters)
+      case r: SupportsPushDownCatalystFilters =>
+        val postScanFilters = r.pushFilters(filters)
+        (Right(r.pushedFilters.toImmutableArraySeq), postScanFilters)
       case _ => (Left(Nil), filters)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to