This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f860af67db34 [SPARK-48155][FOLLOWUP][SQL] AQEPropagateEmptyRelation
for left anti join should check if remain child is just BroadcastQueryStageExec
f860af67db34 is described below
commit f860af67db34c9ae68076a867d4d61caf574cbb8
Author: zml1206 <[email protected]>
AuthorDate: Thu Oct 17 01:23:16 2024 +0800
[SPARK-48155][FOLLOWUP][SQL] AQEPropagateEmptyRelation for left anti join
should check if remain child is just BroadcastQueryStageExec
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
We encountered BroadcastNestedLoopJoin LeftAnti BuildLeft, and it's right
is empty. It is left child of left outer BroadcastHashJoin. The case is more
complicated, part of the Initial Plan is as follows
```
:- Project (214)
: +- BroadcastHashJoin LeftOuter BuildRight (213)
: :- BroadcastNestedLoopJoin LeftAnti BuildLeft (211)
: : :- BroadcastExchange (187)
: : : +- Project (186)
: : : +- Filter (185)
: : : +- Scan parquet (31)
: : +- LocalLimit (210)
: : +- Project (209)
: : +- BroadcastHashJoin Inner BuildLeft (208)
: : :- BroadcastExchange (194)
: : : +- Project (193)
: : : +- BroadcastHashJoin LeftOuter
BuildRight (192)
: : : :- Project (189)
: : : : +- Filter (188)
: : : : +- Scan parquet (37)
: : : +- BroadcastExchange (191)
```
After AQEPropagateEmptyRelation, report an error "HashJoin should not take
LeftOuter as the JoinType with building left side"
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48300 from zml1206/SPARK-48155-followup.
Authored-by: zml1206 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../optimizer/PropagateEmptyRelation.scala | 3 +-
.../adaptive/AdaptiveQueryExecSuite.scala | 32 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 832af340c339..d23d43acc217 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -111,7 +111,8 @@ abstract class PropagateEmptyRelationBase extends
Rule[LogicalPlan] with CastSup
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
- case LeftAnti if isRightEmpty | isFalseCondition => p.left
+ case LeftAnti if (isRightEmpty | isFalseCondition) &&
canExecuteWithoutJoin(p.left) =>
+ p.left
case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
case LeftOuter | FullOuter if isRightEmpty &&
canExecuteWithoutJoin(p.left) =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index c5e64c96b2c8..4bf993f82495 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2829,6 +2829,38 @@ class AdaptiveQueryExecSuite
assert(findTopLevelBroadcastNestedLoopJoin(adaptivePlan).size == 1)
assert(findTopLevelUnion(adaptivePlan).size == 0)
}
+
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "100") {
+ withTempView("t1", "t2", "t3", "t4") {
+ Seq(1).toDF().createOrReplaceTempView("t1")
+ spark.range(100).createOrReplaceTempView("t2")
+ spark.range(2).createOrReplaceTempView("t3")
+ spark.range(2).createOrReplaceTempView("t4")
+ val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+ """
+ |SELECT tt2.value
+ |FROM (
+ | SELECT value
+ | FROM t1
+ | WHERE NOT EXISTS (
+ | SELECT 1
+ | FROM (
+ | SELECT t2.id
+ | FROM t2
+ | JOIN t3 ON t2.id = t3.id
+ | AND t2.id > 100
+ | ) tt
+ | WHERE t1.value = tt.id
+ | )
+ | AND t1.value = 1
+ |) tt2
+ | LEFT JOIN t4 ON tt2.value = t4.id
+ |""".stripMargin
+ )
+ assert(findTopLevelBroadcastNestedLoopJoin(adaptivePlan).size == 1)
+ }
+ }
}
test("SPARK-39915: Dataset.repartition(N) may not create N partitions") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]