This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a2a8dd55d0 [SPARK-41986][SQL] Introduce shuffle on SinglePartition
7a2a8dd55d0 is described below

commit 7a2a8dd55d08e8235693a1d8a5da532f6f133988
Author: Yuming Wang <[email protected]>
AuthorDate: Thu Jan 12 23:10:09 2023 -0800

    [SPARK-41986][SQL] Introduce shuffle on SinglePartition
    
    ### What changes were proposed in this pull request?
    
    This PR introduces shuffle on SinglePartition if it's physical size greater 
than `spark.sql.adaptive.advisoryPartitionSizeInBytes`.
    
    ### Why are the changes needed?
    
    Improve parallelism. For example:
    ```scala
    spark.range(100000000L).selectExpr("id as a", "id as 
b").write.saveAsTable("t1")
    
    sql(
      """
        |WITH base
        |     AS (select *, ROW_NUMBER() OVER(ORDER BY a) AS new_a FROM t1)
        |SELECT * FROM base t1 JOIN base t2 ON t1.a = t2.b
        |""".stripMargin).explain()
    ```
    
    Before this PR:
    ```
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- SortMergeJoin [a#10L], [b#26L], Inner
       :- Filter isnotnull(a#10L)
       :  +- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
new_a#8], [a#10L ASC NULLS FIRST]
       :     +- Sort [a#10L ASC NULLS FIRST], false, 0
       :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=50]
       :           +- FileScan parquet spark_catalog.default.t1[a#10L,b#11L]
       +- Sort [b#26L ASC NULLS FIRST], false, 0
          +- Filter isnotnull(b#26L)
             +- Window [row_number() windowspecdefinition(a#25L ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
new_a#27], [a#25L ASC NULLS FIRST]
                +- Sort [a#25L ASC NULLS FIRST], false, 0
                   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=54]
                      +- FileScan parquet spark_catalog.default.t1[a#25L,b#26L]
    ```
    
    After this PR:
    ```
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- SortMergeJoin [a#10L], [b#26L], Inner
       :- Sort [a#10L ASC NULLS FIRST], false, 0
       :  +- Exchange hashpartitioning(a#10L, 5), ENSURE_REQUIREMENTS, 
[plan_id=60]
       :     +- Filter isnotnull(a#10L)
       :        +- Window [row_number() windowspecdefinition(a#10L ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
new_a#8], [a#10L ASC NULLS FIRST]
       :           +- Sort [a#10L ASC NULLS FIRST], false, 0
       :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=50]
       :                 +- FileScan parquet 
spark_catalog.default.t1[a#10L,b#11L]
       +- Sort [b#26L ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(b#26L, 5), ENSURE_REQUIREMENTS, 
[plan_id=61]
             +- Filter isnotnull(b#26L)
                +- Window [row_number() windowspecdefinition(a#25L ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
new_a#27], [a#25L ASC NULLS FIRST]
                   +- Sort [a#25L ASC NULLS FIRST], false, 0
                      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=54]
                         +- FileScan parquet 
spark_catalog.default.t1[a#25L,b#26L]
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #39512 from wangyum/SPARK-41986.
    
    Authored-by: Yuming Wang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  7 ++++
 .../execution/exchange/EnsureRequirements.scala    | 12 ++++---
 .../exchange/EnsureRequirementsSuite.scala         | 37 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fa69ab0ce38..3abd2578d4a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -513,6 +513,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val MAX_SINGLE_PARTITION_BYTES = 
buildConf("spark.sql.maxSinglePartitionBytes")
+    .doc("The maximum number of bytes allowed for a single partition. 
Otherwise, The planner " +
+      "will introduce shuffle to improve parallelism.")
+    .version("3.4.0")
+    .bytesConf(ByteUnit.BYTE)
+    .createWithDefault(Long.MaxValue)
+
   val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort")
     .internal()
     .doc("When true, enable use of radix sort when possible. Radix sort is 
much faster but " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 808b92e966a..7706b26af70 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -76,13 +76,17 @@ case class EnsureRequirements(
       case _ => false
     }.map(_._2)
 
-    // Special case: if all sides of the join are single partition
-    val allSinglePartition =
-      childrenIndexes.forall(children(_).outputPartitioning == SinglePartition)
+    // Special case: if all sides of the join are single partition and it's 
physical size less than
+    // or equal spark.sql.maxSinglePartitionBytes.
+    val preferSinglePartition = childrenIndexes.forall { i =>
+      children(i).outputPartitioning == SinglePartition &&
+        children(i).logicalLink
+          .forall(_.stats.sizeInBytes <= 
conf.getConf(SQLConf.MAX_SINGLE_PARTITION_BYTES))
+    }
 
     // If there are more than one children, we'll need to check partitioning & 
distribution of them
     // and see if extra shuffles are necessary.
-    if (childrenIndexes.length > 1 && !allSinglePartition) {
+    if (childrenIndexes.length > 1 && !preferSinglePartition) {
       val specs = childrenIndexes.map(i => {
         val requiredDist = requiredChildDistributions(i)
         assert(requiredDist.isInstanceOf[ClusteredDistribution],
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index a24d4465b35..7cfa00b4168 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.exchange
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.statsEstimation.StatsTestPlan
 import org.apache.spark.sql.connector.catalog.functions._
 import org.apache.spark.sql.execution.{DummySparkPlan, SortExec}
 import org.apache.spark.sql.execution.SparkPlan
@@ -731,6 +732,42 @@ class EnsureRequirementsSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-41986: Introduce shuffle on SinglePartition") {
+    val filesMaxPartitionBytes = conf.filesMaxPartitionBytes
+    withSQLConf(SQLConf.MAX_SINGLE_PARTITION_BYTES.key -> 
filesMaxPartitionBytes.toString) {
+      Seq(filesMaxPartitionBytes, filesMaxPartitionBytes + 1).foreach { size =>
+        val logicalPlan = StatsTestPlan(Nil, 1L, AttributeMap.empty, 
Some(size))
+        val left = DummySparkPlan(outputPartitioning = SinglePartition)
+        left.setLogicalLink(logicalPlan)
+        val right = DummySparkPlan(outputPartitioning = SinglePartition)
+        right.setLogicalLink(logicalPlan)
+        val smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, 
None, left, right)
+
+        if (size <= filesMaxPartitionBytes) {
+          EnsureRequirements.apply(smjExec) match {
+            case SortMergeJoinExec(leftKeys, rightKeys, _, _,
+            SortExec(_, _, _: DummySparkPlan, _),
+            SortExec(_, _, _: DummySparkPlan, _), _) =>
+              assert(leftKeys === Seq(exprA))
+              assert(rightKeys === Seq(exprC))
+            case other => fail(other.toString)
+          }
+        } else {
+          EnsureRequirements.apply(smjExec) match {
+            case SortMergeJoinExec(leftKeys, rightKeys, _, _,
+            SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), 
_),
+            SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), 
_), _) =>
+              assert(leftKeys === Seq(exprA))
+              assert(rightKeys === Seq(exprC))
+              assert(left.numPartitions == 5)
+              assert(right.numPartitions == 5)
+            case other => fail(other.toString)
+          }
+        }
+      }
+    }
+  }
+
   test("Check with KeyGroupedPartitioning") {
     // simplest case: identity transforms
     var plan1 = DummySparkPlan(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to