This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ce5f24  [SPARK-35568][SQL] Add the BroadcastExchange after 
re-optimizing the physical plan to fix the UnsupportedOperationException when 
enabling both AQE and DPP
6ce5f24 is described below

commit 6ce5f2491c156e0bf4af713948e780d3215a992d
Author: Ke Jia <[email protected]>
AuthorDate: Fri Jun 4 13:29:36 2021 +0000

    [SPARK-35568][SQL] Add the BroadcastExchange after re-optimizing the 
physical plan to fix the UnsupportedOperationException when enabling both AQE 
and DPP
    
    ### What changes were proposed in this pull request?
    This PR is to fix the `UnsupportedOperationException` described in 
[PR#32705](https://github.com/apache/spark/pull/32705).
    When AQE and DPP are turned on at the same time, because the 
`BroadcastExchange` included in the DPP filter is not added through 
`EnsureRequirement` rule, Therefore, when AQE optimizes the DPP filter, there 
is no way to add `BroadcastExchange` through the `EnsureRequirement` rule in 
`reOptimize` method, which eventually leads to the loss of `BroadcastExchange` 
in the final physical plan. This PR adds `BroadcastExchange` node in the 
`reOptimize` method if the current plan is DPP filter.
    ### Why are the changes needed?
    bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    adding new ut
    
    Closes #32741 from JkSelf/fixDPP+AQEbug.
    
    Authored-by: Ke Jia <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala   | 20 ++++++++++++++++++--
 .../spark/sql/DynamicPartitionPruningSuite.scala     | 16 ++++++++++++++++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index ebff790..b8d0953 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -312,7 +312,9 @@ case class AdaptiveSparkPlanExec(
   }
 
   override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
-    getFinalPhysicalPlan().doExecuteBroadcast()
+    val finalPlan = getFinalPhysicalPlan()
+    assert(finalPlan.isInstanceOf[BroadcastQueryStageExec])
+    finalPlan.doExecuteBroadcast()
   }
 
   protected override def stringArgs: Iterator[Any] = 
Iterator(s"isFinalPlan=$isFinalPlan")
@@ -600,7 +602,21 @@ case class AdaptiveSparkPlanExec(
       sparkPlan,
       preprocessingRules ++ queryStagePreparationRules,
       Some((planChangeLogger, "AQE Replanning")))
-    (newPlan, optimized)
+
+    // When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` 
rule will
+    // add the `BroadcastExchangeExec` node manually in the DPP subquery,
+    // not through `EnsureRequirements` rule. Therefore, when the DPP subquery 
is complicated
+    // and need to be re-optimized, AQE also need to manually insert the 
`BroadcastExchangeExec`
+    // node to prevent the loss of the `BroadcastExchangeExec` node in DPP 
subquery.
+    // Here, we also need to avoid to insert the `BroadcastExchangeExec` node 
when the newPlan
+    // is already the `BroadcastExchangeExec` plan after apply the 
`LogicalQueryStageStrategy` rule.
+    val finalPlan = currentPhysicalPlan match {
+      case b: BroadcastExchangeLike
+        if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => 
b.withNewChildren(Seq(newPlan))
+      case _ => newPlan
+    }
+
+    (finalPlan, optimized)
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f5ed511..f3928ed 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1505,6 +1505,22 @@ abstract class DynamicPartitionPruningSuiteBase
       checkAnswer(df, Row(15, 15) :: Nil)
     }
   }
+
+  test("SPARK-35568: Fix UnsupportedOperationException when enabling both AQE 
and DPP") {
+    val df = sql(
+      """
+        |SELECT s.store_id, f.product_id
+        |FROM (SELECT DISTINCT * FROM fact_sk) f
+        |  JOIN (SELECT
+        |          *,
+        |          ROW_NUMBER() OVER (PARTITION BY store_id ORDER BY 
state_province DESC) AS rn
+        |        FROM dim_store) s
+        |   ON f.store_id = s.store_id
+        |WHERE s.country = 'DE' AND s.rn = 1
+        |""".stripMargin)
+
+    checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil)
+  }
 }
 
 class DynamicPartitionPruningSuiteAEOff extends 
DynamicPartitionPruningSuiteBase

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to