This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 908318f [SPARK-28220][SQL] Improve PropagateEmptyRelation to support
join with false condition
908318f is described below
commit 908318f30d445db9b122db464dca373120935922
Author: Yuming Wang <[email protected]>
AuthorDate: Sat Mar 20 22:57:02 2021 +0800
[SPARK-28220][SQL] Improve PropagateEmptyRelation to support join with
false condition
### What changes were proposed in this pull request?
Improve `PropagateEmptyRelation` to support join with false condition. For
example:
```sql
SELECT * FROM t1 LEFT JOIN t2 ON false
```
Before this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, false
:- FileScan parquet default.t1[a#4L]
+- BroadcastExchange IdentityBroadcastMode, [id=#40]
+- FileScan parquet default.t2[b#5L]
```
After this pr:
```
== Physical Plan ==
*(1) Project [a#4L, null AS b#5L]
+- *(1) ColumnarToRow
+- FileScan parquet default.t1[a#4L]
```
### Why are the changes needed?
Avoid `BroadcastNestedLoopJoin` to improve query performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #31857 from wangyum/SPARK-28220.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
---
.../optimizer/PropagateEmptyRelation.scala | 18 +++++++++++++----
.../optimizer/PropagateEmptyRelationSuite.scala | 23 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 15d4561..05b6163 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules._
* - Join with one or two empty children (including Intersect/Except).
* 2. Unary-node Logical Plans
* - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
+ * - Join with false condition.
* - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may
return results.
*/
@@ -71,24 +73,32 @@ object PropagateEmptyRelation extends Rule[LogicalPlan]
with PredicateHelper wit
// Joins on empty LocalRelations generated from streaming sources are not
eliminated
// as stateful streaming joins need to perform other state management
operations other than
// just processing the input data.
- case p @ Join(_, _, joinType, _, _)
+ case p @ Join(_, _, joinType, conditionOpt, _)
if !p.children.exists(_.isStreaming) =>
val isLeftEmpty = isEmptyLocalRelation(p.left)
val isRightEmpty = isEmptyLocalRelation(p.right)
- if (isLeftEmpty || isRightEmpty) {
+ val isFalseCondition = conditionOpt match {
+ case Some(FalseLiteral) => true
+ case _ => false
+ }
+ if (isLeftEmpty || isRightEmpty || isFalseCondition) {
joinType match {
case _: InnerLike => empty(p)
// Intersect is handled as LeftSemi by
`ReplaceIntersectWithSemiJoin` rule.
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
- case LeftSemi if isRightEmpty => empty(p)
- case LeftAnti if isRightEmpty => p.left
+ case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
+ case LeftAnti if isRightEmpty | isFalseCondition => p.left
case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
case LeftOuter | FullOuter if isRightEmpty =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
case RightOuter if isRightEmpty => empty(p)
case RightOuter | FullOuter if isLeftEmpty =>
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
+ case LeftOuter if isFalseCondition =>
+ Project(p.left.output ++ nullValueProjectList(p.right), p.left)
+ case RightOuter if isFalseCondition =>
+ Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
case _ => p
}
} else {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index 54c692c..b5dcb8a 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters,
InternalRow}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation,
LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -155,6 +156,28 @@ class PropagateEmptyRelationSuite extends PlanTest {
}
}
+ test("SPARK-28220: Propagate empty relation through Join if condition is
FalseLiteral") {
+ val testcases = Seq(
+ (Inner, Some(LocalRelation('a.int, 'b.int))),
+ (Cross, Some(LocalRelation('a.int, 'b.int))),
+ (LeftOuter,
+ Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)),
testRelation1).analyze)),
+ (RightOuter,
+ Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b),
testRelation2).analyze)),
+ (FullOuter, None),
+ (LeftAnti, Some(testRelation1)),
+ (LeftSemi, Some(LocalRelation('a.int)))
+ )
+
+ testcases.foreach { case (jt, answer) =>
+ val query = testRelation1.join(testRelation2, joinType = jt, condition =
Some(FalseLiteral))
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer =
+
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
+ comparePlans(optimized, correctAnswer)
+ }
+ }
+
test("propagate empty relation through UnaryNode") {
val query = testRelation1
.where(false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]