This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 3e28f338fad [SPARK-39447][SQL] Avoid AssertionError in 
AdaptiveSparkPlanExec.doExecuteBroadcast
3e28f338fad is described below

commit 3e28f338fad66393b6d2f7a2da6ce5eee60a626e
Author: ulysses-you <[email protected]>
AuthorDate: Tue Jul 5 11:31:02 2022 +0800

    [SPARK-39447][SQL] Avoid AssertionError in 
AdaptiveSparkPlanExec.doExecuteBroadcast
    
    ### What changes were proposed in this pull request?
    
    Change `currentPhysicalPlan` to `inputPlan ` when we restore the broadcast 
exchange for DPP.
    
    ### Why are the changes needed?
    
    The currentPhysicalPlan can be wrapped with broadcast query stage so it is 
not safe to match it. For example:
     The broadcast exchange which is added by DPP is running before than the 
normal broadcast exchange(e.g. introduced by join).
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes bug fix
    
    ### How was this patch tested?
    
    add test
    
    Closes #36974 from ulysses-you/inputplan.
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit c320a5d51b2c8427fc5d6648984bfd266891b451)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala    |  2 +-
 .../spark/sql/DynamicPartitionPruningSuite.scala      | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 40d2e1a3a8f..6c9c0e1cda4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -659,7 +659,7 @@ case class AdaptiveSparkPlanExec(
       // node to prevent the loss of the `BroadcastExchangeExec` node in DPP 
subquery.
       // Here, we also need to avoid to insert the `BroadcastExchangeExec` 
node when the newPlan is
       // already the `BroadcastExchangeExec` plan after apply the 
`LogicalQueryStageStrategy` rule.
-      val finalPlan = currentPhysicalPlan match {
+      val finalPlan = inputPlan match {
         case b: BroadcastExchangeLike
           if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => 
b.withNewChildren(Seq(newPlan))
         case _ => newPlan
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index cfdd2e08a79..d5498c469c5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1694,6 +1694,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends 
DynamicPartitionPruningV1Suite
 class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
   with EnableAdaptiveExecutionSuite {
 
+  test("SPARK-39447: Avoid AssertionError in 
AdaptiveSparkPlanExec.doExecuteBroadcast") {
+    val df = sql(
+      """
+        |WITH empty_result AS (
+        |  SELECT * FROM fact_stats WHERE product_id < 0
+        |)
+        |SELECT *
+        |FROM   (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id
+        |        FROM   fact_sk
+        |               JOIN empty_result
+        |                 ON fact_sk.product_id = empty_result.product_id) t2
+        |       JOIN empty_result
+        |         ON t2.store_id = empty_result.store_id
+      """.stripMargin)
+
+    checkPartitionPruningPredicate(df, false, false)
+    checkAnswer(df, Nil)
+  }
+
   test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use 
prepareExecutedPlan " +
     "rather than createSparkPlan to re-plan subquery") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to