This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f860af67db34 [SPARK-48155][FOLLOWUP][SQL] AQEPropagateEmptyRelation 
for left anti join should check if remain child is just BroadcastQueryStageExec
f860af67db34 is described below

commit f860af67db34c9ae68076a867d4d61caf574cbb8
Author: zml1206 <[email protected]>
AuthorDate: Thu Oct 17 01:23:16 2024 +0800

    [SPARK-48155][FOLLOWUP][SQL] AQEPropagateEmptyRelation for left anti join 
should check if remain child is just BroadcastQueryStageExec
    
    ### What changes were proposed in this pull request?
    As title.
    
    ### Why are the changes needed?
    
    We encountered BroadcastNestedLoopJoin LeftAnti BuildLeft, and it's right 
is empty. It is left child of left outer BroadcastHashJoin. The case is more 
complicated, part of the Initial Plan is as follows
    ```
                   :- Project (214)
                   :  +- BroadcastHashJoin LeftOuter BuildRight (213)
                   :     :- BroadcastNestedLoopJoin LeftAnti BuildLeft (211)
                   :     :  :- BroadcastExchange (187)
                   :     :  :  +- Project (186)
                   :     :  :     +- Filter (185)
                   :     :  :        +- Scan parquet  (31)
                   :     :  +- LocalLimit (210)
                   :     :     +- Project (209)
                   :     :        +- BroadcastHashJoin Inner BuildLeft (208)
                   :     :           :- BroadcastExchange (194)
                   :     :           :  +- Project (193)
                   :     :           :     +- BroadcastHashJoin LeftOuter 
BuildRight (192)
                   :     :           :        :- Project (189)
                   :     :           :        :  +- Filter (188)
                   :     :           :        :     +- Scan parquet  (37)
                   :     :           :        +- BroadcastExchange (191)
    ```
    After AQEPropagateEmptyRelation, report an error "HashJoin should not take 
LeftOuter as the JoinType with building left side"
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #48300 from zml1206/SPARK-48155-followup.
    
    Authored-by: zml1206 <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../optimizer/PropagateEmptyRelation.scala         |  3 +-
 .../adaptive/AdaptiveQueryExecSuite.scala          | 32 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 832af340c339..d23d43acc217 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -111,7 +111,8 @@ abstract class PropagateEmptyRelationBase extends 
Rule[LogicalPlan] with CastSup
           // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
           case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
           case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
-          case LeftAnti if isRightEmpty | isFalseCondition => p.left
+          case LeftAnti if (isRightEmpty | isFalseCondition) && 
canExecuteWithoutJoin(p.left) =>
+            p.left
           case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
           case LeftOuter | FullOuter if isRightEmpty && 
canExecuteWithoutJoin(p.left) =>
             Project(p.left.output ++ nullValueProjectList(p.right), p.left)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index c5e64c96b2c8..4bf993f82495 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2829,6 +2829,38 @@ class AdaptiveQueryExecSuite
       assert(findTopLevelBroadcastNestedLoopJoin(adaptivePlan).size == 1)
       assert(findTopLevelUnion(adaptivePlan).size == 0)
     }
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "100") {
+      withTempView("t1", "t2", "t3", "t4") {
+        Seq(1).toDF().createOrReplaceTempView("t1")
+        spark.range(100).createOrReplaceTempView("t2")
+        spark.range(2).createOrReplaceTempView("t3")
+        spark.range(2).createOrReplaceTempView("t4")
+        val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+          """
+            |SELECT tt2.value
+            |FROM (
+            |  SELECT value
+            |  FROM t1
+            |  WHERE NOT EXISTS (
+            |      SELECT 1
+            |      FROM (
+            |        SELECT t2.id
+            |        FROM t2
+            |          JOIN t3 ON t2.id = t3.id
+            |        AND t2.id > 100
+            |      ) tt
+            |      WHERE t1.value = tt.id
+            |    )
+            |    AND t1.value = 1
+            |) tt2
+            |  LEFT JOIN t4 ON tt2.value = t4.id
+            |""".stripMargin
+        )
+        assert(findTopLevelBroadcastNestedLoopJoin(adaptivePlan).size == 1)
+      }
+    }
   }
 
   test("SPARK-39915: Dataset.repartition(N) may not create N partitions") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to