This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7221d754075 [SPARK-38950][SQL] Return Array of Predicate for 
SupportsPushDownCatalystFilters.pushedFilters
7221d754075 is described below

commit 7221d754075656ce41edacb0fccc1cf89a62fc77
Author: huaxingao <[email protected]>
AuthorDate: Thu Apr 21 23:16:22 2022 +0800

    [SPARK-38950][SQL] Return Array of Predicate for 
SupportsPushDownCatalystFilters.pushedFilters
    
    ### What changes were proposed in this pull request?
    in `SupportsPushDownCatalystFilters`, change
    ```
    def pushedFilters: Array[Filter]
    ```
    to
    
    ```
    def pushedFilters: Array[Predicate]
    ```
    
    ### Why are the changes needed?
    use v2Filter in DS V2
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ### How was this patch tested?
    existing tests
    
    Closes #36264 from huaxingao/V2Filter.
    
    Authored-by: huaxingao <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/internal/connector/SupportsPushDownCatalystFilters.scala    | 6 +++---
 .../apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala | 3 ++-
 .../apache/spark/sql/execution/datasources/v2/PushDownUtils.scala   | 2 +-
 .../sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala   | 3 ++-
 4 files changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
index 9c2a4ac78a2..99590480220 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.internal.connector
 
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.connector.expressions.filter.Predicate
 
 /**
  * A mix-in interface for {@link FileScanBuilder}. File sources can implement 
this interface to
@@ -35,7 +35,7 @@ trait SupportsPushDownCatalystFilters {
 
   /**
    * Returns the data filters that are pushed to the data source via
-   * {@link #pushFilters(Expression[])}.
+   * {@link #pushFilters(Predicate[])}.
    */
-  def pushedFilters: Array[Filter]
+  def pushedFilters: Array[Predicate]
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
index 2dc4137d6f9..ae82eecd313 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
@@ -20,6 +20,7 @@ import scala.collection.mutable
 
 import org.apache.spark.sql.{sources, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.read.{ScanBuilder, 
SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
DataSourceUtils, PartitioningAwareFileIndex, PartitioningUtils}
 import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
@@ -84,7 +85,7 @@ abstract class FileScanBuilder(
     dataFilters
   }
 
-  override def pushedFilters: Array[Filter] = pushedDataFilters
+  override def pushedFilters: Array[Predicate] = pushedDataFilters.map(_.toV2)
 
   /*
    * Push down data filters to the file source, so the data filters can be 
evaluated there to
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index f0f7f37192b..492db45626a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -98,7 +98,7 @@ object PushDownUtils extends PredicateHelper {
 
       case f: FileScanBuilder =>
         val postScanFilters = f.pushFilters(filters)
-        (Left(f.pushedFilters), postScanFilters)
+        (Right(f.pushedFilters), postScanFilters)
       case _ => (Left(Nil), filters)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index 1f2f75aebd7..2093f4a16ef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
+import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownAggregates}
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitioningAwareFileIndex}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, 
SparkToParquetSchemaConverter}
@@ -84,7 +85,7 @@ case class ParquetScanBuilder(
   // Note: for Parquet, the actual filter push down happens in 
[[ParquetPartitionReaderFactory]].
   // It requires the Parquet physical schema to determine whether a filter is 
convertible.
   // All filters that can be converted to Parquet are pushed down.
-  override def pushedFilters(): Array[Filter] = pushedParquetFilters
+  override def pushedFilters: Array[Predicate] = 
pushedParquetFilters.map(_.toV2)
 
   override def pushAggregation(aggregation: Aggregation): Boolean = {
     if (!sparkSession.sessionState.conf.parquetAggregatePushDown) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to