This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 3e28f338fad [SPARK-39447][SQL] Avoid AssertionError in
AdaptiveSparkPlanExec.doExecuteBroadcast
3e28f338fad is described below
commit 3e28f338fad66393b6d2f7a2da6ce5eee60a626e
Author: ulysses-you <[email protected]>
AuthorDate: Tue Jul 5 11:31:02 2022 +0800
[SPARK-39447][SQL] Avoid AssertionError in
AdaptiveSparkPlanExec.doExecuteBroadcast
### What changes were proposed in this pull request?
Change `currentPhysicalPlan` to `inputPlan ` when we restore the broadcast
exchange for DPP.
### Why are the changes needed?
The currentPhysicalPlan can be wrapped with broadcast query stage so it is
not safe to match it. For example:
The broadcast exchange which is added by DPP is running before than the
normal broadcast exchange(e.g. introduced by join).
### Does this PR introduce _any_ user-facing change?
yes bug fix
### How was this patch tested?
add test
Closes #36974 from ulysses-you/inputplan.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit c320a5d51b2c8427fc5d6648984bfd266891b451)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +-
.../spark/sql/DynamicPartitionPruningSuite.scala | 19 +++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 40d2e1a3a8f..6c9c0e1cda4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -659,7 +659,7 @@ case class AdaptiveSparkPlanExec(
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP
subquery.
// Here, we also need to avoid to insert the `BroadcastExchangeExec`
node when the newPlan is
// already the `BroadcastExchangeExec` plan after apply the
`LogicalQueryStageStrategy` rule.
- val finalPlan = currentPhysicalPlan match {
+ val finalPlan = inputPlan match {
case b: BroadcastExchangeLike
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) =>
b.withNewChildren(Seq(newPlan))
case _ => newPlan
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index cfdd2e08a79..d5498c469c5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1694,6 +1694,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends
DynamicPartitionPruningV1Suite
class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
with EnableAdaptiveExecutionSuite {
+ test("SPARK-39447: Avoid AssertionError in
AdaptiveSparkPlanExec.doExecuteBroadcast") {
+ val df = sql(
+ """
+ |WITH empty_result AS (
+ | SELECT * FROM fact_stats WHERE product_id < 0
+ |)
+ |SELECT *
+ |FROM (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id
+ | FROM fact_sk
+ | JOIN empty_result
+ | ON fact_sk.product_id = empty_result.product_id) t2
+ | JOIN empty_result
+ | ON t2.store_id = empty_result.store_id
+ """.stripMargin)
+
+ checkPartitionPruningPredicate(df, false, false)
+ checkAnswer(df, Nil)
+ }
+
test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use
prepareExecutedPlan " +
"rather than createSparkPlan to re-plan subquery") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]