This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 908318f  [SPARK-28220][SQL] Improve PropagateEmptyRelation to support 
join with false condition
908318f is described below

commit 908318f30d445db9b122db464dca373120935922
Author: Yuming Wang <[email protected]>
AuthorDate: Sat Mar 20 22:57:02 2021 +0800

    [SPARK-28220][SQL] Improve PropagateEmptyRelation to support join with 
false condition
    
    ### What changes were proposed in this pull request?
    
    Improve `PropagateEmptyRelation` to support join with false condition. For 
example:
    ```sql
    SELECT * FROM t1 LEFT JOIN t2 ON false
    ```
    
    Before this pr:
    ```
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- BroadcastNestedLoopJoin BuildRight, LeftOuter, false
       :- FileScan parquet default.t1[a#4L]
       +- BroadcastExchange IdentityBroadcastMode, [id=#40]
          +- FileScan parquet default.t2[b#5L]
    ```
    
    After this pr:
    ```
    == Physical Plan ==
    *(1) Project [a#4L, null AS b#5L]
    +- *(1) ColumnarToRow
       +- FileScan parquet default.t1[a#4L]
    ```
    
    ### Why are the changes needed?
    
    Avoid `BroadcastNestedLoopJoin` to improve query performance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #31857 from wangyum/SPARK-28220.
    
    Authored-by: Yuming Wang <[email protected]>
    Signed-off-by: Yuming Wang <[email protected]>
---
 .../optimizer/PropagateEmptyRelation.scala         | 18 +++++++++++++----
 .../optimizer/PropagateEmptyRelationSuite.scala    | 23 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 15d4561..05b6163 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.analysis.CastSupport
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules._
  *    - Join with one or two empty children (including Intersect/Except).
  * 2. Unary-node Logical Plans
  *    - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
+ *    - Join with false condition.
  *    - Aggregate with all empty children and at least one grouping expression.
  *    - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
  */
@@ -71,24 +73,32 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] 
with PredicateHelper wit
     // Joins on empty LocalRelations generated from streaming sources are not 
eliminated
     // as stateful streaming joins need to perform other state management 
operations other than
     // just processing the input data.
-    case p @ Join(_, _, joinType, _, _)
+    case p @ Join(_, _, joinType, conditionOpt, _)
         if !p.children.exists(_.isStreaming) =>
       val isLeftEmpty = isEmptyLocalRelation(p.left)
       val isRightEmpty = isEmptyLocalRelation(p.right)
-      if (isLeftEmpty || isRightEmpty) {
+      val isFalseCondition = conditionOpt match {
+        case Some(FalseLiteral) => true
+        case _ => false
+      }
+      if (isLeftEmpty || isRightEmpty || isFalseCondition) {
         joinType match {
           case _: InnerLike => empty(p)
           // Intersect is handled as LeftSemi by 
`ReplaceIntersectWithSemiJoin` rule.
           // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
           case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
-          case LeftSemi if isRightEmpty => empty(p)
-          case LeftAnti if isRightEmpty => p.left
+          case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
+          case LeftAnti if isRightEmpty | isFalseCondition => p.left
           case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
           case LeftOuter | FullOuter if isRightEmpty =>
             Project(p.left.output ++ nullValueProjectList(p.right), p.left)
           case RightOuter if isRightEmpty => empty(p)
           case RightOuter | FullOuter if isLeftEmpty =>
             Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
+          case LeftOuter if isFalseCondition =>
+            Project(p.left.output ++ nullValueProjectList(p.right), p.left)
+          case RightOuter if isFalseCondition =>
+            Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
           case _ => p
         }
       } else {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index 54c692c..b5dcb8a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, 
InternalRow}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -155,6 +156,28 @@ class PropagateEmptyRelationSuite extends PlanTest {
     }
   }
 
+  test("SPARK-28220: Propagate empty relation through Join if condition is 
FalseLiteral") {
+    val testcases = Seq(
+      (Inner, Some(LocalRelation('a.int, 'b.int))),
+      (Cross, Some(LocalRelation('a.int, 'b.int))),
+      (LeftOuter,
+        Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), 
testRelation1).analyze)),
+      (RightOuter,
+        Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), 
testRelation2).analyze)),
+      (FullOuter, None),
+      (LeftAnti, Some(testRelation1)),
+      (LeftSemi, Some(LocalRelation('a.int)))
+    )
+
+    testcases.foreach { case (jt, answer) =>
+      val query = testRelation1.join(testRelation2, joinType = jt, condition = 
Some(FalseLiteral))
+      val optimized = Optimize.execute(query.analyze)
+      val correctAnswer =
+        
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
+      comparePlans(optimized, correctAnswer)
+    }
+  }
+
   test("propagate empty relation through UnaryNode") {
     val query = testRelation1
       .where(false)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to