This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 601fac2 [SPARK-27411][SQL] DataSourceV2Strategy should not eliminate
subquery
601fac2 is described below
commit 601fac2cb30b7dc2d033e7be5bbd2f6c3a3c5fc1
Author: francis0407 <[email protected]>
AuthorDate: Tue Apr 9 21:45:46 2019 +0800
[SPARK-27411][SQL] DataSourceV2Strategy should not eliminate subquery
## What changes were proposed in this pull request?
In DataSourceV2Strategy, it seems we eliminate the subqueries by mistake
after normalizing filters.
We have a sql with a scalar subquery:
``` scala
val plan = spark.sql("select * from t2 where t2a > (select max(t1a) from
t1)")
plan.explain(true)
```
And we get the log info of DataSourceV2Strategy:
```
Pushing operators to csv:examples/src/main/resources/t2.txt
Pushed Filters:
Post-Scan Filters: isnotnull(t2a#30)
Output: t2a#30, t2b#31
```
The `Post-Scan Filters` should contain the scalar subquery, but we
eliminate it by mistake.
```
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('t2a > scalar-subquery#56 [])
: +- 'Project [unresolvedalias('max('t1a), None)]
: +- 'UnresolvedRelation `t1`
+- 'UnresolvedRelation `t2`
== Analyzed Logical Plan ==
t2a: string, t2b: string
Project [t2a#30, t2b#31]
+- Filter (t2a#30 > scalar-subquery#56 [])
: +- Aggregate [max(t1a#13) AS max(t1a)#63]
: +- SubqueryAlias `t1`
: +- RelationV2[t1a#13, t1b#14]
csv:examples/src/main/resources/t1.txt
+- SubqueryAlias `t2`
+- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt
== Optimized Logical Plan ==
Filter (isnotnull(t2a#30) && (t2a#30 > scalar-subquery#56 []))
: +- Aggregate [max(t1a#13) AS max(t1a)#63]
: +- Project [t1a#13]
: +- RelationV2[t1a#13, t1b#14]
csv:examples/src/main/resources/t1.txt
+- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt
== Physical Plan ==
*(1) Project [t2a#30, t2b#31]
+- *(1) Filter isnotnull(t2a#30)
+- *(1) BatchScan[t2a#30, t2b#31] class
org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
```
## How was this patch tested?
ut
Closes #24321 from francis0407/SPARK-27411.
Authored-by: francis0407 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 7 +++++--
.../org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala | 13 +++++++++++++
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index f8c7e2c..7681dc8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -106,13 +106,16 @@ object DataSourceV2Strategy extends Strategy with
PredicateHelper {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.newScanBuilder()
+ val (withSubquery, withoutSubquery) =
filters.partition(SubqueryExpression.hasSubquery)
val normalizedFilters = DataSourceStrategy.normalizeFilters(
- filters.filterNot(SubqueryExpression.hasSubquery), relation.output)
+ withoutSubquery, relation.output)
// `pushedFilters` will be pushed down and evaluated in the underlying
data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet
row group filter.
- val (pushedFilters, postScanFilters) = pushFilters(scanBuilder,
normalizedFilters)
+ val (pushedFilters, postScanFiltersWithoutSubquery) =
+ pushFilters(scanBuilder, normalizedFilters)
+ val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery
val (scan, output) = pruneColumns(scanBuilder, relation, project ++
postScanFilters)
logInfo(
s"""
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 587cfa9..4e071c5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -392,6 +392,19 @@ class DataSourceV2Suite extends QueryTest with
SharedSQLContext {
}
}
}
+
+ test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") {
+ withTempView("t1") {
+ val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load()
+ Seq(2, 3).toDF("a").createTempView("t1")
+ val df = t2.where("i < (select max(a) from t1)").select('i)
+ val subqueries = df.queryExecution.executedPlan.collect {
+ case p => p.subqueries
+ }.flatten
+ assert(subqueries.length == 1)
+ checkAnswer(df, (0 until 3).map(i => Row(i)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]