This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 601fac2  [SPARK-27411][SQL] DataSourceV2Strategy should not eliminate 
subquery
601fac2 is described below

commit 601fac2cb30b7dc2d033e7be5bbd2f6c3a3c5fc1
Author: francis0407 <[email protected]>
AuthorDate: Tue Apr 9 21:45:46 2019 +0800

    [SPARK-27411][SQL] DataSourceV2Strategy should not eliminate subquery
    
    ## What changes were proposed in this pull request?
    
    In DataSourceV2Strategy, it seems we eliminate the subqueries by mistake 
after normalizing filters.
    We have a sql with a scalar subquery:
    
    ``` scala
    val plan = spark.sql("select * from t2 where t2a > (select max(t1a) from 
t1)")
    plan.explain(true)
    ```
    
    And we get the log info of DataSourceV2Strategy:
    ```
    Pushing operators to csv:examples/src/main/resources/t2.txt
    Pushed Filters:
    Post-Scan Filters: isnotnull(t2a#30)
    Output: t2a#30, t2b#31
    ```
    
    The `Post-Scan Filters` should contain the scalar subquery, but we 
eliminate it by mistake.
    ```
    == Parsed Logical Plan ==
    'Project [*]
    +- 'Filter ('t2a > scalar-subquery#56 [])
       :  +- 'Project [unresolvedalias('max('t1a), None)]
       :     +- 'UnresolvedRelation `t1`
       +- 'UnresolvedRelation `t2`
    
    == Analyzed Logical Plan ==
    t2a: string, t2b: string
    Project [t2a#30, t2b#31]
    +- Filter (t2a#30 > scalar-subquery#56 [])
       :  +- Aggregate [max(t1a#13) AS max(t1a)#63]
       :     +- SubqueryAlias `t1`
       :        +- RelationV2[t1a#13, t1b#14] 
csv:examples/src/main/resources/t1.txt
       +- SubqueryAlias `t2`
          +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt
    
    == Optimized Logical Plan ==
    Filter (isnotnull(t2a#30) && (t2a#30 > scalar-subquery#56 []))
    :  +- Aggregate [max(t1a#13) AS max(t1a)#63]
    :     +- Project [t1a#13]
    :        +- RelationV2[t1a#13, t1b#14] 
csv:examples/src/main/resources/t1.txt
    +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt
    
    == Physical Plan ==
    *(1) Project [t2a#30, t2b#31]
    +- *(1) Filter isnotnull(t2a#30)
       +- *(1) BatchScan[t2a#30, t2b#31] class 
org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
    ```
    ## How was this patch tested?
    
    ut
    
    Closes #24321 from francis0407/SPARK-27411.
    
    Authored-by: francis0407 <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/execution/datasources/v2/DataSourceV2Strategy.scala |  7 +++++--
 .../org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala | 13 +++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index f8c7e2c..7681dc8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -106,13 +106,16 @@ object DataSourceV2Strategy extends Strategy with 
PredicateHelper {
     case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
       val scanBuilder = relation.newScanBuilder()
 
+      val (withSubquery, withoutSubquery) = 
filters.partition(SubqueryExpression.hasSubquery)
       val normalizedFilters = DataSourceStrategy.normalizeFilters(
-        filters.filterNot(SubqueryExpression.hasSubquery), relation.output)
+        withoutSubquery, relation.output)
 
       // `pushedFilters` will be pushed down and evaluated in the underlying 
data sources.
       // `postScanFilters` need to be evaluated after the scan.
       // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet 
row group filter.
-      val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, 
normalizedFilters)
+      val (pushedFilters, postScanFiltersWithoutSubquery) =
+        pushFilters(scanBuilder, normalizedFilters)
+      val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery
       val (scan, output) = pruneColumns(scanBuilder, relation, project ++ 
postScanFilters)
       logInfo(
         s"""
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 587cfa9..4e071c5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -392,6 +392,19 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") {
+    withTempView("t1") {
+      val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load()
+      Seq(2, 3).toDF("a").createTempView("t1")
+      val df = t2.where("i < (select max(a) from t1)").select('i)
+      val subqueries = df.queryExecution.executedPlan.collect {
+        case p => p.subqueries
+      }.flatten
+      assert(subqueries.length == 1)
+      checkAnswer(df, (0 until 3).map(i => Row(i)))
+    }
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to