This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f376d24 [SPARK-31280][SQL] Perform propagating empty relation after
RewritePredicateSubquery
f376d24 is described below
commit f376d24ea1f40740864d38ceb424713372e7e6ce
Author: Kent Yao <[email protected]>
AuthorDate: Sun Mar 29 11:32:22 2020 -0700
[SPARK-31280][SQL] Perform propagating empty relation after
RewritePredicateSubquery
### What changes were proposed in this pull request?
```sql
scala> spark.sql(" select * from values(1), (2) t(key) where key in (select
1 as key where 1=0)").queryExecution
res15: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
'Project [*]
+- 'Filter 'key IN (list#39 [])
: +- Project [1 AS key#38]
: +- Filter (1 = 0)
: +- OneRowRelation
+- 'SubqueryAlias t
+- 'UnresolvedInlineTable [key], [List(1), List(2)]
== Analyzed Logical Plan ==
key: int
Project [key#40]
+- Filter key#40 IN (list#39 [])
: +- Project [1 AS key#38]
: +- Filter (1 = 0)
: +- OneRowRelation
+- SubqueryAlias t
+- LocalRelation [key#40]
== Optimized Logical Plan ==
Join LeftSemi, (key#40 = key#38)
:- LocalRelation [key#40]
+- LocalRelation <empty>, [key#38]
== Physical Plan ==
*(1) BroadcastHashJoin [key#40], [key#38], LeftSemi, BuildRight
:- *(1) LocalTableScan [key#40]
+- Br...
```
`LocalRelation <empty> ` should be able to propagate after subqueries are
lift up to joins
### Why are the changes needed?
optimize query
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
add new tests
Closes #28043 from yaooqinn/SPARK-31280.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 2 ++
.../catalyst/optimizer/RewriteSubquerySuite.scala | 17 +++++++---
.../apache/spark/sql/execution/PlannerSuite.scala | 36 ++++++++++++++++++++++
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index da147dd..827f528 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -198,6 +198,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
CheckCartesianProducts) :+
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
+ ConvertToLocalRelation,
+ PropagateEmptyRelation,
ColumnPruning,
CollapseProject,
RemoveNoopOperators) :+
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
index f00d22e..2238afd 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
@@ -22,17 +22,17 @@ import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.ListQuery
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
class RewriteSubquerySuite extends PlanTest {
- object Optimize extends RuleExecutor[LogicalPlan] {
+ case class Optimize(addOn: Rule[LogicalPlan]) extends
RuleExecutor[LogicalPlan] {
val batches =
Batch("Column Pruning", FixedPoint(100), ColumnPruning) ::
Batch("Rewrite Subquery", FixedPoint(1),
RewritePredicateSubquery,
- ColumnPruning,
+ addOn,
CollapseProject,
RemoveNoopOperators) :: Nil
}
@@ -43,7 +43,7 @@ class RewriteSubquerySuite extends PlanTest {
val query =
relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
- val optimized = Optimize.execute(query.analyze)
+ val optimized = Optimize(ColumnPruning).execute(query.analyze)
val correctAnswer = relation
.select('a)
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
@@ -52,4 +52,13 @@ class RewriteSubquerySuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("SPARK-31280: Perform propagating empty relation after
RewritePredicateSubquery") {
+ val relation = LocalRelation('a.int)
+ val relInSubquery = LocalRelation('x.int)
+
+ val query =
relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
+
+ val plan = Optimize(PropagateEmptyRelation).execute(query.analyze)
+ comparePlans(plan, relation)
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index dfa8046..5251365 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -994,6 +994,42 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
}
}
}
+
+ test("SPARK-31280: Perform propagating empty relation after
RewritePredicateSubquery") {
+ val df1 = sql(
+ s"""
+ |SELECT *
+ |FROM VALUES(1), (2) t1(key)
+ |WHERE key IN
+ | (SELECT key FROM VALUES(1) t2(key) WHERE 1=0)
+ """.stripMargin)
+ assert(df1.queryExecution.optimizedPlan.isInstanceOf[LocalRelation])
+ assert(df1.queryExecution.sparkPlan.isInstanceOf[LocalTableScanExec])
+
+ val df2 = sql(
+ s"""
+ |SELECT *
+ |FROM VALUES(1), (2) t1(key)
+ |WHERE key NOT IN
+ | (SELECT key FROM VALUES(1) t2(key) WHERE 1=0)
+ """.stripMargin)
+
+ assert(df2.queryExecution.optimizedPlan.isInstanceOf[LocalRelation])
+ assert(df2.queryExecution.sparkPlan.isInstanceOf[LocalTableScanExec])
+
+ // Because RewriteNonCorrelatedExists will rewrite non-correlated exists
subqueries to
+ // scalar expressions early, so this only take effects on correlated
exists subqueries
+ val df3 = sql(
+ s"""
+ |SELECT *
+ |FROM VALUES(1), (2) t1(key)
+ |WHERE EXISTS
+ | (SELECT key FROM VALUES(1) t2(key) WHERE t1.key = 1 AND 1=0)
+ """.stripMargin)
+
+ assert(df3.queryExecution.optimizedPlan.isInstanceOf[LocalRelation])
+ assert(df3.queryExecution.sparkPlan.isInstanceOf[LocalTableScanExec])
+ }
}
// Used for unit-testing EnsureRequirements
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]