This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 24d588c8587 [SPARK-38950][SQL] Return Array of Predicate for
SupportsPushDownCatalystFilters.pushedFilters
24d588c8587 is described below
commit 24d588c8587f84f7e8c1c9f665d55eb14869b707
Author: huaxingao <[email protected]>
AuthorDate: Thu Apr 21 23:16:22 2022 +0800
[SPARK-38950][SQL] Return Array of Predicate for
SupportsPushDownCatalystFilters.pushedFilters
### What changes were proposed in this pull request?
in `SupportsPushDownCatalystFilters`, change
```
def pushedFilters: Array[Filter]
```
to
```
def pushedFilters: Array[Predicate]
```
### Why are the changes needed?
use v2Filter in DS V2
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
existing tests
Closes #36264 from huaxingao/V2Filter.
Authored-by: huaxingao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 7221d754075656ce41edacb0fccc1cf89a62fc77)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/internal/connector/SupportsPushDownCatalystFilters.scala | 6 +++---
.../apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala | 3 ++-
.../apache/spark/sql/execution/datasources/v2/PushDownUtils.scala | 2 +-
.../sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala | 3 ++-
4 files changed, 8 insertions(+), 6 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
index 9c2a4ac78a2..99590480220 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsPushDownCatalystFilters.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.internal.connector
import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.connector.expressions.filter.Predicate
/**
* A mix-in interface for {@link FileScanBuilder}. File sources can implement
this interface to
@@ -35,7 +35,7 @@ trait SupportsPushDownCatalystFilters {
/**
* Returns the data filters that are pushed to the data source via
- * {@link #pushFilters(Expression[])}.
+ * {@link #pushFilters(Predicate[])}.
*/
- def pushedFilters: Array[Filter]
+ def pushedFilters: Array[Predicate]
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
index 2dc4137d6f9..ae82eecd313 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
@@ -20,6 +20,7 @@ import scala.collection.mutable
import org.apache.spark.sql.{sources, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{ScanBuilder,
SupportsPushDownRequiredColumns}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
DataSourceUtils, PartitioningAwareFileIndex, PartitioningUtils}
import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
@@ -84,7 +85,7 @@ abstract class FileScanBuilder(
dataFilters
}
- override def pushedFilters: Array[Filter] = pushedDataFilters
+ override def pushedFilters: Array[Predicate] = pushedDataFilters.map(_.toV2)
/*
* Push down data filters to the file source, so the data filters can be
evaluated there to
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index 8ac91e02579..0ebfed2fe9e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -98,7 +98,7 @@ object PushDownUtils extends PredicateHelper {
case f: FileScanBuilder =>
val postScanFilters = f.pushFilters(filters)
- (Left(f.pushedFilters), postScanFilters)
+ (Right(f.pushedFilters), postScanFilters)
case _ => (Left(Nil), filters)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index 1f2f75aebd7..2093f4a16ef 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
+import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownAggregates}
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters,
SparkToParquetSchemaConverter}
@@ -84,7 +85,7 @@ case class ParquetScanBuilder(
// Note: for Parquet, the actual filter push down happens in
[[ParquetPartitionReaderFactory]].
// It requires the Parquet physical schema to determine whether a filter is
convertible.
// All filters that can be converted to Parquet are pushed down.
- override def pushedFilters(): Array[Filter] = pushedParquetFilters
+ override def pushedFilters: Array[Predicate] =
pushedParquetFilters.map(_.toV2)
override def pushAggregation(aggregation: Aggregation): Boolean = {
if (!sparkSession.sessionState.conf.parquetAggregatePushDown) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]