This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7a2a8dd55d0 [SPARK-41986][SQL] Introduce shuffle on SinglePartition
7a2a8dd55d0 is described below
commit 7a2a8dd55d08e8235693a1d8a5da532f6f133988
Author: Yuming Wang <[email protected]>
AuthorDate: Thu Jan 12 23:10:09 2023 -0800
[SPARK-41986][SQL] Introduce shuffle on SinglePartition
### What changes were proposed in this pull request?
This PR introduces shuffle on SinglePartition if it's physical size greater
than `spark.sql.adaptive.advisoryPartitionSizeInBytes`.
### Why are the changes needed?
Improve parallelism. For example:
```scala
spark.range(100000000L).selectExpr("id as a", "id as
b").write.saveAsTable("t1")
sql(
"""
|WITH base
| AS (select *, ROW_NUMBER() OVER(ORDER BY a) AS new_a FROM t1)
|SELECT * FROM base t1 JOIN base t2 ON t1.a = t2.b
|""".stripMargin).explain()
```
Before this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [a#10L], [b#26L], Inner
:- Filter isnotnull(a#10L)
: +- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
new_a#8], [a#10L ASC NULLS FIRST]
: +- Sort [a#10L ASC NULLS FIRST], false, 0
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=50]
: +- FileScan parquet spark_catalog.default.t1[a#10L,b#11L]
+- Sort [b#26L ASC NULLS FIRST], false, 0
+- Filter isnotnull(b#26L)
+- Window [row_number() windowspecdefinition(a#25L ASC NULLS
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
new_a#27], [a#25L ASC NULLS FIRST]
+- Sort [a#25L ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=54]
+- FileScan parquet spark_catalog.default.t1[a#25L,b#26L]
```
After this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [a#10L], [b#26L], Inner
:- Sort [a#10L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(a#10L, 5), ENSURE_REQUIREMENTS,
[plan_id=60]
: +- Filter isnotnull(a#10L)
: +- Window [row_number() windowspecdefinition(a#10L ASC NULLS
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
new_a#8], [a#10L ASC NULLS FIRST]
: +- Sort [a#10L ASC NULLS FIRST], false, 0
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=50]
: +- FileScan parquet
spark_catalog.default.t1[a#10L,b#11L]
+- Sort [b#26L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(b#26L, 5), ENSURE_REQUIREMENTS,
[plan_id=61]
+- Filter isnotnull(b#26L)
+- Window [row_number() windowspecdefinition(a#25L ASC NULLS
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
new_a#27], [a#25L ASC NULLS FIRST]
+- Sort [a#25L ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=54]
+- FileScan parquet
spark_catalog.default.t1[a#25L,b#26L]
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #39512 from wangyum/SPARK-41986.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 7 ++++
.../execution/exchange/EnsureRequirements.scala | 12 ++++---
.../exchange/EnsureRequirementsSuite.scala | 37 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fa69ab0ce38..3abd2578d4a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -513,6 +513,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val MAX_SINGLE_PARTITION_BYTES =
buildConf("spark.sql.maxSinglePartitionBytes")
+ .doc("The maximum number of bytes allowed for a single partition.
Otherwise, The planner " +
+ "will introduce shuffle to improve parallelism.")
+ .version("3.4.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(Long.MaxValue)
+
val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort")
.internal()
.doc("When true, enable use of radix sort when possible. Radix sort is
much faster but " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 808b92e966a..7706b26af70 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -76,13 +76,17 @@ case class EnsureRequirements(
case _ => false
}.map(_._2)
- // Special case: if all sides of the join are single partition
- val allSinglePartition =
- childrenIndexes.forall(children(_).outputPartitioning == SinglePartition)
+ // Special case: if all sides of the join are single partition and it's
physical size less than
+ // or equal spark.sql.maxSinglePartitionBytes.
+ val preferSinglePartition = childrenIndexes.forall { i =>
+ children(i).outputPartitioning == SinglePartition &&
+ children(i).logicalLink
+ .forall(_.stats.sizeInBytes <=
conf.getConf(SQLConf.MAX_SINGLE_PARTITION_BYTES))
+ }
// If there are more than one children, we'll need to check partitioning &
distribution of them
// and see if extra shuffles are necessary.
- if (childrenIndexes.length > 1 && !allSinglePartition) {
+ if (childrenIndexes.length > 1 && !preferSinglePartition) {
val specs = childrenIndexes.map(i => {
val requiredDist = requiredChildDistributions(i)
assert(requiredDist.isInstanceOf[ClusteredDistribution],
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index a24d4465b35..7cfa00b4168 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.exchange
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.statsEstimation.StatsTestPlan
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.execution.{DummySparkPlan, SortExec}
import org.apache.spark.sql.execution.SparkPlan
@@ -731,6 +732,42 @@ class EnsureRequirementsSuite extends SharedSparkSession {
}
}
+ test("SPARK-41986: Introduce shuffle on SinglePartition") {
+ val filesMaxPartitionBytes = conf.filesMaxPartitionBytes
+ withSQLConf(SQLConf.MAX_SINGLE_PARTITION_BYTES.key ->
filesMaxPartitionBytes.toString) {
+ Seq(filesMaxPartitionBytes, filesMaxPartitionBytes + 1).foreach { size =>
+ val logicalPlan = StatsTestPlan(Nil, 1L, AttributeMap.empty,
Some(size))
+ val left = DummySparkPlan(outputPartitioning = SinglePartition)
+ left.setLogicalLink(logicalPlan)
+ val right = DummySparkPlan(outputPartitioning = SinglePartition)
+ right.setLogicalLink(logicalPlan)
+ val smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner,
None, left, right)
+
+ if (size <= filesMaxPartitionBytes) {
+ EnsureRequirements.apply(smjExec) match {
+ case SortMergeJoinExec(leftKeys, rightKeys, _, _,
+ SortExec(_, _, _: DummySparkPlan, _),
+ SortExec(_, _, _: DummySparkPlan, _), _) =>
+ assert(leftKeys === Seq(exprA))
+ assert(rightKeys === Seq(exprC))
+ case other => fail(other.toString)
+ }
+ } else {
+ EnsureRequirements.apply(smjExec) match {
+ case SortMergeJoinExec(leftKeys, rightKeys, _, _,
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _),
_),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _),
_), _) =>
+ assert(leftKeys === Seq(exprA))
+ assert(rightKeys === Seq(exprC))
+ assert(left.numPartitions == 5)
+ assert(right.numPartitions == 5)
+ case other => fail(other.toString)
+ }
+ }
+ }
+ }
+ }
+
test("Check with KeyGroupedPartitioning") {
// simplest case: identity transforms
var plan1 = DummySparkPlan(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]