This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 6403a84b6854 [SPARK-46590][SQL] Fix coalesce failed with unexpected 
partition indeces
6403a84b6854 is described below

commit 6403a84b6854214a4ed7d5c0c800e877e0748964
Author: jackylee-ch <lijunq...@baidu.com>
AuthorDate: Tue Jan 23 16:10:37 2024 +0800

    [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces
    
    ### What changes were proposed in this pull request?
    As outlined in JIRA issue 
[SPARK-46590](https://issues.apache.org/jira/browse/SPARK-46590), when a 
broadcast join follows a union within the same stage, the 
[collectCoalesceGroups](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L144)
 method will indiscriminately traverse all sub-plans, aggregating them into a 
single group, which is not expected.
    
    ### Why are the changes needed?
    In fact, for broadcastjoin, we do not expect broadcast exchange has same 
partition number. Therefore, we can safely disregard the broadcast join and 
continue traversing the subplan.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Newly added unit test. It would fail without this pr.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44661 from 
jackylee-ch/fix_coalesce_problem_with_broadcastjoin_and_union.
    
    Authored-by: jackylee-ch <lijunq...@baidu.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit de0c4ad3947f1188f02aaa612df8278d1c7c3ce5)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../adaptive/CoalesceShufflePartitions.scala       | 10 ++--
 .../execution/adaptive/ShufflePartitionsUtil.scala |  6 ++-
 .../execution/CoalesceShufflePartitionsSuite.scala | 61 ++++++++++++++++++++++
 .../sql/execution/ShufflePartitionsUtilSuite.scala | 31 +++++------
 4 files changed, 86 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 34399001c726..26e5ac649dbb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan, 
UnaryExecNode, UnionExec}
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, 
ShuffleExchangeLike, ShuffleOrigin}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, CartesianProductExec}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
 
@@ -146,13 +147,16 @@ case class CoalesceShufflePartitions(session: 
SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to 
reduce the number of
-    // shuffle partitions, because we may break the assumption that all 
children of a spark plan
-    // have same number of output partitions.
+    case join: CartesianProductExec => 
join.children.flatMap(collectCoalesceGroups)
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then 
the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It 
can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition 
number.
+    case join: BroadcastHashJoinExec => 
join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => 
join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to 
reduce the number of
+    // shuffle partitions, because we may break the assumption that all 
children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) 
=>
       val shuffleStages = collectShuffleStageInfos(p)
       // ShuffleExchanges introduced by repartition do not support partition 
number change.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index dbed66683b01..9370b3d8d1d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -128,8 +128,10 @@ object ShufflePartitionsUtil extends Logging {
 
     // There should be no unexpected partition specs and the start indices 
should be identical
     // across all different shuffles.
-    assert(partitionIndicesSeq.distinct.length == 1 && 
partitionIndicesSeq.head.forall(_ >= 0),
-      s"Invalid shuffle partition specs: $inputPartitionSpecs")
+    if (partitionIndicesSeq.distinct.length > 1 || 
partitionIndicesSeq.head.exists(_ < 0)) {
+      logWarning(s"Could not apply partition coalescing because of unexpected 
partition indices.")
+      return Seq.empty
+    }
 
     // The indices may look like [0, 1, 2, 2, 2, 3, 4, 4, 5], and the repeated 
`2` and `4` mean
     // skewed partitions.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index 24a98dd83f33..e11191da6a95 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -310,6 +310,67 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
{
     }
   }
 
+  test("SPARK-46590 adaptive query execution works correctly with broadcast 
join and union") {
+    val test: SparkSession => Unit = { spark: SparkSession =>
+      import spark.implicits._
+      spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1KB")
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB")
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+      val df00 = spark.range(0, 1000, 2)
+        .selectExpr("id as key", "id as value")
+        .union(Seq.fill(100000)((600, 600)).toDF("key", "value"))
+      val df01 = spark.range(0, 1000, 3)
+        .selectExpr("id as key", "id as value")
+      val df10 = spark.range(0, 1000, 5)
+        .selectExpr("id as key", "id as value")
+        .union(Seq.fill(500000)((600, 600)).toDF("key", "value"))
+      val df11 = spark.range(0, 1000, 7)
+        .selectExpr("id as key", "id as value")
+      val df20 = spark.range(0, 10).selectExpr("id as key", "id as value")
+
+      df20.join(df00.join(df01, Array("key", "value"), "left_outer")
+        .union(df10.join(df11, Array("key", "value"), "left_outer")))
+        .write
+        .format("noop")
+        .mode("overwrite")
+        .save()
+    }
+    withSparkSession(test, 12000, None)
+  }
+
+  test("SPARK-46590 adaptive query execution works correctly with cartesian 
join and union") {
+    val test: SparkSession => Unit = { spark: SparkSession =>
+      import spark.implicits._
+      spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "100B")
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+      val df00 = spark.range(0, 10, 2)
+        .selectExpr("id as key", "id as value")
+        .union(Seq.fill(1000)((600, 600)).toDF("key", "value"))
+      val df01 = spark.range(0, 10, 3)
+        .selectExpr("id as key", "id as value")
+      val df10 = spark.range(0, 10, 5)
+        .selectExpr("id as key", "id as value")
+        .union(Seq.fill(5000)((600, 600)).toDF("key", "value"))
+      val df11 = spark.range(0, 10, 7)
+        .selectExpr("id as key", "id as value")
+      val df20 = spark.range(0, 10)
+        .selectExpr("id as key", "id as value")
+        .union(Seq.fill(1000)((11, 11)).toDF("key", "value"))
+      val df21 = spark.range(0, 10)
+        .selectExpr("id as key", "id as value")
+
+      df20.join(df21.hint("shuffle_hash"), Array("key", "value"), "left_outer")
+        .join(df00.join(df01.hint("shuffle_hash"), Array("key", "value"), 
"left_outer")
+          .union(df10.join(df11.hint("shuffle_hash"), Array("key", "value"), 
"left_outer")))
+        .write
+        .format("noop")
+        .mode("overwrite")
+        .save()
+    }
+    withSparkSession(test, 100, None)
+  }
+
   test("SPARK-24705 adaptive query execution works correctly when exchange 
reuse enabled") {
     val test: SparkSession => Unit = { spark: SparkSession =>
       spark.sql("SET spark.sql.exchange.reuse=true")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
index da05373125d3..f8b796436847 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
@@ -567,14 +567,13 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite 
with LocalSparkContext {
     }
 
     {
-      // Assertion error if shuffle partition specs contain 
`CoalescedShuffleSpec` that has
-      // `end` - `start` > 1.
+      // If shuffle partition specs contain `CoalescedShuffleSpec` that has
+      // `end` - `start` > 1, return empty result.
       val bytesByPartitionId1 = Array[Long](10, 10, 10, 10, 10)
       val bytesByPartitionId2 = Array[Long](10, 10, 10, 10, 10)
       val specs1 = Seq(CoalescedPartitionSpec(0, 1), CoalescedPartitionSpec(1, 
5))
       val specs2 = specs1
-      intercept[AssertionError] {
-        ShufflePartitionsUtil.coalescePartitions(
+      val coalesced = ShufflePartitionsUtil.coalescePartitions(
           Array(
             Some(new MapOutputStatistics(0, bytesByPartitionId1)),
             Some(new MapOutputStatistics(1, bytesByPartitionId2))),
@@ -582,17 +581,16 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite 
with LocalSparkContext {
             Some(specs1),
             Some(specs2)),
           targetSize, 1, 0)
-      }
+      assert(coalesced.isEmpty)
     }
 
     {
-      // Assertion error if shuffle partition specs contain 
`PartialMapperShuffleSpec`.
+      // If shuffle partition specs contain `PartialMapperShuffleSpec`, return 
empty result.
       val bytesByPartitionId1 = Array[Long](10, 10, 10, 10, 10)
       val bytesByPartitionId2 = Array[Long](10, 10, 10, 10, 10)
       val specs1 = Seq(CoalescedPartitionSpec(0, 1), 
PartialMapperPartitionSpec(1, 0, 1))
       val specs2 = specs1
-      intercept[AssertionError] {
-        ShufflePartitionsUtil.coalescePartitions(
+      val coalesced = ShufflePartitionsUtil.coalescePartitions(
           Array(
             Some(new MapOutputStatistics(0, bytesByPartitionId1)),
             Some(new MapOutputStatistics(1, bytesByPartitionId2))),
@@ -600,18 +598,17 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite 
with LocalSparkContext {
             Some(specs1),
             Some(specs2)),
           targetSize, 1, 0)
-      }
+      assert(coalesced.isEmpty)
     }
 
     {
-      // Assertion error if partition specs of different shuffles have 
different lengths.
+      // If partition specs of different shuffles have different lengths, 
return empty result.
       val bytesByPartitionId1 = Array[Long](10, 10, 10, 10, 10)
       val bytesByPartitionId2 = Array[Long](10, 10, 10, 10, 10)
       val specs1 = Seq.tabulate(4)(i => CoalescedPartitionSpec(i, i + 1)) ++
         Seq.tabulate(2)(i => PartialReducerPartitionSpec(4, i, i + 1, 10L))
       val specs2 = Seq.tabulate(5)(i => CoalescedPartitionSpec(i, i + 1))
-      intercept[AssertionError] {
-        ShufflePartitionsUtil.coalescePartitions(
+      val coalesced = ShufflePartitionsUtil.coalescePartitions(
           Array(
             Some(new MapOutputStatistics(0, bytesByPartitionId1)),
             Some(new MapOutputStatistics(1, bytesByPartitionId2))),
@@ -619,11 +616,12 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite 
with LocalSparkContext {
             Some(specs1),
             Some(specs2)),
           targetSize, 1, 0)
-      }
+      assert(coalesced.isEmpty)
     }
 
     {
-      // Assertion error if start indices of partition specs are not identical 
among all shuffles.
+      // If start indices of partition specs are not identical among all 
shuffles,
+      // return empty result.
       val bytesByPartitionId1 = Array[Long](10, 10, 10, 10, 10)
       val bytesByPartitionId2 = Array[Long](10, 10, 10, 10, 10)
       val specs1 = Seq.tabulate(4)(i => CoalescedPartitionSpec(i, i + 1)) ++
@@ -631,8 +629,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with 
LocalSparkContext {
       val specs2 = Seq.tabulate(2)(i => CoalescedPartitionSpec(i, i + 1)) ++
         Seq.tabulate(2)(i => PartialReducerPartitionSpec(2, i, i + 1, 10L)) ++
         Seq.tabulate(2)(i => CoalescedPartitionSpec(i + 3, i + 4))
-      intercept[AssertionError] {
-        ShufflePartitionsUtil.coalescePartitions(
+      val coalesced = ShufflePartitionsUtil.coalescePartitions(
           Array(
             Some(new MapOutputStatistics(0, bytesByPartitionId1)),
             Some(new MapOutputStatistics(1, bytesByPartitionId2))),
@@ -640,7 +637,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with 
LocalSparkContext {
             Some(specs1),
             Some(specs2)),
           targetSize, 1, 0)
-      }
+      assert(coalesced.isEmpty)
     }
 
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to