This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c320a5d51b2 [SPARK-39447][SQL] Avoid AssertionError in
AdaptiveSparkPlanExec.doExecuteBroadcast
c320a5d51b2 is described below
commit c320a5d51b2c8427fc5d6648984bfd266891b451
Author: ulysses-you <[email protected]>
AuthorDate: Tue Jul 5 11:31:02 2022 +0800
[SPARK-39447][SQL] Avoid AssertionError in
AdaptiveSparkPlanExec.doExecuteBroadcast
### What changes were proposed in this pull request?
Change `currentPhysicalPlan` to `inputPlan ` when we restore the broadcast
exchange for DPP.
### Why are the changes needed?
The currentPhysicalPlan can be wrapped with broadcast query stage so it is
not safe to match it. For example:
The broadcast exchange which is added by DPP is running before than the
normal broadcast exchange(e.g. introduced by join).
### Does this PR introduce _any_ user-facing change?
yes bug fix
### How was this patch tested?
add test
Closes #36974 from ulysses-you/inputplan.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +-
.../spark/sql/DynamicPartitionPruningSuite.scala | 19 +++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index fc7cc109787..44a5ba4a547 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -662,7 +662,7 @@ case class AdaptiveSparkPlanExec(
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP
subquery.
// Here, we also need to avoid to insert the `BroadcastExchangeExec`
node when the newPlan is
// already the `BroadcastExchangeExec` plan after apply the
`LogicalQueryStageStrategy` rule.
- val finalPlan = currentPhysicalPlan match {
+ val finalPlan = inputPlan match {
case b: BroadcastExchangeLike
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) =>
b.withNewChildren(Seq(newPlan))
case _ => newPlan
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index b1b9ed04568..366120fb66c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1754,6 +1754,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends
DynamicPartitionPruningV1Suite
class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
with EnableAdaptiveExecutionSuite {
+ test("SPARK-39447: Avoid AssertionError in
AdaptiveSparkPlanExec.doExecuteBroadcast") {
+ val df = sql(
+ """
+ |WITH empty_result AS (
+ | SELECT * FROM fact_stats WHERE product_id < 0
+ |)
+ |SELECT *
+ |FROM (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id
+ | FROM fact_sk
+ | JOIN empty_result
+ | ON fact_sk.product_id = empty_result.product_id) t2
+ | JOIN empty_result
+ | ON t2.store_id = empty_result.store_id
+ """.stripMargin)
+
+ checkPartitionPruningPredicate(df, false, false)
+ checkAnswer(df, Nil)
+ }
+
test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use
prepareExecutedPlan " +
"rather than createSparkPlan to re-plan subquery") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]