This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f376d24  [SPARK-31280][SQL] Perform propagating empty relation after 
RewritePredicateSubquery
f376d24 is described below

commit f376d24ea1f40740864d38ceb424713372e7e6ce
Author: Kent Yao <[email protected]>
AuthorDate: Sun Mar 29 11:32:22 2020 -0700

    [SPARK-31280][SQL] Perform propagating empty relation after 
RewritePredicateSubquery
    
    ### What changes were proposed in this pull request?
    ```sql
    scala> spark.sql(" select * from values(1), (2) t(key) where key in (select 
1 as key where 1=0)").queryExecution
    res15: org.apache.spark.sql.execution.QueryExecution =
    == Parsed Logical Plan ==
    'Project [*]
    +- 'Filter 'key IN (list#39 [])
       :  +- Project [1 AS key#38]
       :     +- Filter (1 = 0)
       :        +- OneRowRelation
       +- 'SubqueryAlias t
          +- 'UnresolvedInlineTable [key], [List(1), List(2)]
    
    == Analyzed Logical Plan ==
    key: int
    Project [key#40]
    +- Filter key#40 IN (list#39 [])
       :  +- Project [1 AS key#38]
       :     +- Filter (1 = 0)
       :        +- OneRowRelation
       +- SubqueryAlias t
          +- LocalRelation [key#40]
    
    == Optimized Logical Plan ==
    Join LeftSemi, (key#40 = key#38)
    :- LocalRelation [key#40]
    +- LocalRelation <empty>, [key#38]
    
    == Physical Plan ==
    *(1) BroadcastHashJoin [key#40], [key#38], LeftSemi, BuildRight
    :- *(1) LocalTableScan [key#40]
    +- Br...
    ```
    
    `LocalRelation <empty> ` should be able to propagate after subqueries are 
lift up to joins
    
    ### Why are the changes needed?
    
    optimize query
    
    ### Does this PR introduce any user-facing change?
    
    no
    ### How was this patch tested?
    
    add new tests
    
    Closes #28043 from yaooqinn/SPARK-31280.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 ++
 .../catalyst/optimizer/RewriteSubquerySuite.scala  | 17 +++++++---
 .../apache/spark/sql/execution/PlannerSuite.scala  | 36 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index da147dd..827f528 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -198,6 +198,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
       CheckCartesianProducts) :+
     Batch("RewriteSubquery", Once,
       RewritePredicateSubquery,
+      ConvertToLocalRelation,
+      PropagateEmptyRelation,
       ColumnPruning,
       CollapseProject,
       RemoveNoopOperators) :+
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
index f00d22e..2238afd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
@@ -22,17 +22,17 @@ import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.ListQuery
 import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 
 
 class RewriteSubquerySuite extends PlanTest {
 
-  object Optimize extends RuleExecutor[LogicalPlan] {
+  case class Optimize(addOn: Rule[LogicalPlan]) extends 
RuleExecutor[LogicalPlan] {
     val batches =
       Batch("Column Pruning", FixedPoint(100), ColumnPruning) ::
       Batch("Rewrite Subquery", FixedPoint(1),
         RewritePredicateSubquery,
-        ColumnPruning,
+        addOn,
         CollapseProject,
         RemoveNoopOperators) :: Nil
   }
@@ -43,7 +43,7 @@ class RewriteSubquerySuite extends PlanTest {
 
     val query = 
relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
 
-    val optimized = Optimize.execute(query.analyze)
+    val optimized = Optimize(ColumnPruning).execute(query.analyze)
     val correctAnswer = relation
       .select('a)
       .join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
@@ -52,4 +52,13 @@ class RewriteSubquerySuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("SPARK-31280: Perform propagating empty relation after 
RewritePredicateSubquery") {
+    val relation = LocalRelation('a.int)
+    val relInSubquery = LocalRelation('x.int)
+
+    val query = 
relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
+
+    val plan = Optimize(PropagateEmptyRelation).execute(query.analyze)
+    comparePlans(plan, relation)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index dfa8046..5251365 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -994,6 +994,42 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
       }
     }
   }
+
+  test("SPARK-31280: Perform propagating empty relation after 
RewritePredicateSubquery") {
+    val df1 = sql(
+      s"""
+         |SELECT *
+         |FROM VALUES(1), (2) t1(key)
+         |WHERE key IN
+         |  (SELECT key FROM VALUES(1) t2(key) WHERE 1=0)
+       """.stripMargin)
+    assert(df1.queryExecution.optimizedPlan.isInstanceOf[LocalRelation])
+    assert(df1.queryExecution.sparkPlan.isInstanceOf[LocalTableScanExec])
+
+    val df2 = sql(
+      s"""
+         |SELECT *
+         |FROM VALUES(1), (2) t1(key)
+         |WHERE key NOT IN
+         |  (SELECT key FROM VALUES(1) t2(key) WHERE 1=0)
+       """.stripMargin)
+
+    assert(df2.queryExecution.optimizedPlan.isInstanceOf[LocalRelation])
+    assert(df2.queryExecution.sparkPlan.isInstanceOf[LocalTableScanExec])
+
+    // Because RewriteNonCorrelatedExists will rewrite non-correlated exists 
subqueries to
+    // scalar expressions early, so this only take effects on correlated 
exists subqueries
+    val df3 = sql(
+      s"""
+         |SELECT *
+         |FROM VALUES(1), (2) t1(key)
+         |WHERE EXISTS
+         |  (SELECT key FROM VALUES(1) t2(key) WHERE t1.key = 1 AND 1=0)
+       """.stripMargin)
+
+    assert(df3.queryExecution.optimizedPlan.isInstanceOf[LocalRelation])
+    assert(df3.queryExecution.sparkPlan.isInstanceOf[LocalTableScanExec])
+  }
 }
 
 // Used for unit-testing EnsureRequirements


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to