This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6ce5f24 [SPARK-35568][SQL] Add the BroadcastExchange after
re-optimizing the physical plan to fix the UnsupportedOperationException when
enabling both AQE and DPP
6ce5f24 is described below
commit 6ce5f2491c156e0bf4af713948e780d3215a992d
Author: Ke Jia <[email protected]>
AuthorDate: Fri Jun 4 13:29:36 2021 +0000
[SPARK-35568][SQL] Add the BroadcastExchange after re-optimizing the
physical plan to fix the UnsupportedOperationException when enabling both AQE
and DPP
### What changes were proposed in this pull request?
This PR is to fix the `UnsupportedOperationException` described in
[PR#32705](https://github.com/apache/spark/pull/32705).
When AQE and DPP are turned on at the same time, because the
`BroadcastExchange` included in the DPP filter is not added through
`EnsureRequirement` rule, Therefore, when AQE optimizes the DPP filter, there
is no way to add `BroadcastExchange` through the `EnsureRequirement` rule in
`reOptimize` method, which eventually leads to the loss of `BroadcastExchange`
in the final physical plan. This PR adds `BroadcastExchange` node in the
`reOptimize` method if the current plan is DPP filter.
### Why are the changes needed?
bug fix
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
adding new ut
Closes #32741 from JkSelf/fixDPP+AQEbug.
Authored-by: Ke Jia <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 20 ++++++++++++++++++--
.../spark/sql/DynamicPartitionPruningSuite.scala | 16 ++++++++++++++++
2 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index ebff790..b8d0953 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -312,7 +312,9 @@ case class AdaptiveSparkPlanExec(
}
override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
- getFinalPhysicalPlan().doExecuteBroadcast()
+ val finalPlan = getFinalPhysicalPlan()
+ assert(finalPlan.isInstanceOf[BroadcastQueryStageExec])
+ finalPlan.doExecuteBroadcast()
}
protected override def stringArgs: Iterator[Any] =
Iterator(s"isFinalPlan=$isFinalPlan")
@@ -600,7 +602,21 @@ case class AdaptiveSparkPlanExec(
sparkPlan,
preprocessingRules ++ queryStagePreparationRules,
Some((planChangeLogger, "AQE Replanning")))
- (newPlan, optimized)
+
+ // When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters`
rule will
+ // add the `BroadcastExchangeExec` node manually in the DPP subquery,
+ // not through `EnsureRequirements` rule. Therefore, when the DPP subquery
is complicated
+ // and need to be re-optimized, AQE also need to manually insert the
`BroadcastExchangeExec`
+ // node to prevent the loss of the `BroadcastExchangeExec` node in DPP
subquery.
+ // Here, we also need to avoid to insert the `BroadcastExchangeExec` node
when the newPlan
+ // is already the `BroadcastExchangeExec` plan after apply the
`LogicalQueryStageStrategy` rule.
+ val finalPlan = currentPhysicalPlan match {
+ case b: BroadcastExchangeLike
+ if (!newPlan.isInstanceOf[BroadcastExchangeLike]) =>
b.withNewChildren(Seq(newPlan))
+ case _ => newPlan
+ }
+
+ (finalPlan, optimized)
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f5ed511..f3928ed 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1505,6 +1505,22 @@ abstract class DynamicPartitionPruningSuiteBase
checkAnswer(df, Row(15, 15) :: Nil)
}
}
+
+ test("SPARK-35568: Fix UnsupportedOperationException when enabling both AQE
and DPP") {
+ val df = sql(
+ """
+ |SELECT s.store_id, f.product_id
+ |FROM (SELECT DISTINCT * FROM fact_sk) f
+ | JOIN (SELECT
+ | *,
+ | ROW_NUMBER() OVER (PARTITION BY store_id ORDER BY
state_province DESC) AS rn
+ | FROM dim_store) s
+ | ON f.store_id = s.store_id
+ |WHERE s.country = 'DE' AND s.rn = 1
+ |""".stripMargin)
+
+ checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil)
+ }
}
class DynamicPartitionPruningSuiteAEOff extends
DynamicPartitionPruningSuiteBase
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]