This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 46b7f17 [SPARK-31037][SQL] refine AQE config names
46b7f17 is described below
commit 46b7f1796bd0b96977ce9b473601033f397a3b18
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Mar 6 00:46:34 2020 +0800
[SPARK-31037][SQL] refine AQE config names
When introducing AQE to others, I feel the config names are a bit
incoherent and hard to use.
This PR refines the config names:
1. remove the "shuffle" prefix. AQE is all about shuffle and we don't need
to add the "shuffle" prefix everywhere.
2. `targetPostShuffleInputSize` is obscure, rename to
`advisoryShufflePartitionSizeInBytes`.
3. `reducePostShufflePartitions` doesn't match the actual optimization,
rename to `coalesceShufflePartitions`
4. `minNumPostShufflePartitions` is obscure, rename it `minPartitionNum`
under the `coalesceShufflePartitions` namespace
5. `maxNumPostShufflePartitions` is confusing with the word "max", rename
it `initialPartitionNum`
6. `skewedJoinOptimization` is too verbose. skew join is a well-known
terminology in database area, we can just say `skewJoin`
Make the config names easy to understand.
deprecate the config `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`
N/A
Closes #27793 from cloud-fan/aqe.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 142 +++++++++++----------
.../spark/sql/execution/ShuffledRowRDD.scala | 2 +-
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +-
...tions.scala => CoalesceShufflePartitions.scala} | 14 +-
.../execution/adaptive/OptimizeSkewedJoin.scala | 14 +-
.../execution/exchange/EnsureRequirements.scala | 4 +-
....scala => CoalesceShufflePartitionsSuite.scala} | 12 +-
.../adaptive/AdaptiveQueryExecSuite.scala | 8 +-
.../apache/spark/sql/internal/SQLConfSuite.scala | 26 ++--
.../spark/sql/sources/BucketedReadSuite.scala | 2 +-
10 files changed, 117 insertions(+), 109 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3dbfc65..b2b3d12 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -356,8 +356,16 @@ object SQLConf {
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be
positive")
.createWithDefault(200)
+ val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
+ buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
+ .internal()
+ .doc("(Deprecated since Spark 3.0)")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("64MB")
+
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
- .doc("When true, enable adaptive query execution.")
+ .doc("When true, enable adaptive query execution, which re-optimizes the
query plan in the " +
+ "middle of query execution, based on accurate runtime statistics.")
.booleanConf
.createWithDefault(false)
@@ -365,90 +373,90 @@ object SQLConf {
.internal()
.doc("Adaptive query execution is skipped when the query does not have
exchanges or " +
"sub-queries. By setting this config to true (together with " +
- s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply
adaptive query " +
+ s"'${ADAPTIVE_EXECUTION_ENABLED.key}' set to true), Spark will force
apply adaptive query " +
"execution for all supported queries.")
.booleanConf
.createWithDefault(false)
- val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
- buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions")
- .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled,
this enables reducing " +
- "the number of post-shuffle partitions based on map output
statistics.")
- .booleanConf
- .createWithDefault(true)
+ val ADVISORY_PARTITION_SIZE_IN_BYTES =
+ buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes")
+ .doc("The advisory size in bytes of the shuffle partition during
adaptive optimization " +
+ s"(when ${ADAPTIVE_EXECUTION_ENABLED.key} is true). It takes effect
when Spark " +
+ "coalesces small shuffle partitions or splits skewed shuffle
partition.")
+ .fallbackConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
- val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
- buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch")
- .doc("Whether to fetch the continuous shuffle blocks in batch. Instead
of fetching blocks " +
- "one by one, fetching continuous shuffle blocks for the same map task
in batch can " +
- "reduce IO and improve performance. Note, multiple continuous blocks
exist in single " +
- s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}'
and " +
- s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this
feature also depends " +
- "on a relocatable serializer, the concatenation support codec in use
and the new version " +
- "shuffle fetch protocol.")
+ val COALESCE_PARTITIONS_ENABLED =
+ buildConf("spark.sql.adaptive.coalescePartitions.enabled")
+ .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark
will coalesce " +
+ "contiguous shuffle partitions according to the target size (specified
by " +
+ s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small
tasks.")
.booleanConf
.createWithDefault(true)
- val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
- buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
- .doc("The advisory minimum number of post-shuffle partitions used when "
+
- s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
- s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
+ val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
+ buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
+ .doc("The minimum number of shuffle partitions after coalescing. This
configuration only " +
+ s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
+ s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
.intConf
- .checkValue(_ > 0, "The minimum shuffle partition number " +
- "must be a positive integer.")
+ .checkValue(_ > 0, "The minimum number of partitions must be positive.")
.createWithDefault(1)
- val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
- buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
- .doc("The target post-shuffle input size in bytes of a task. This
configuration only has " +
- s"an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
- s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("64MB")
-
- val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
- buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
- .doc("The advisory maximum number of post-shuffle partitions used in
adaptive execution. " +
- "This is used as the initial number of pre-shuffle partitions. By
default it equals to " +
- "spark.sql.shuffle.partitions. This configuration only has an effect
when " +
- s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
- s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
+ val COALESCE_PARTITIONS_INITIAL_PARTITION_NUM =
+ buildConf("spark.sql.adaptive.coalescePartitions.initialPartitionNum")
+ .doc("The initial number of shuffle partitions before coalescing. By
default it equals to " +
+ s"${SHUFFLE_PARTITIONS.key}. This configuration only has an effect
when " +
+ s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and
'${COALESCE_PARTITIONS_ENABLED.key}' " +
+ "are both true.")
.intConf
- .checkValue(_ > 0, "The maximum shuffle partition number " +
- "must be a positive integer.")
+ .checkValue(_ > 0, "The initial number of partitions must be positive.")
.createOptional
+ val FETCH_SHUFFLE_BLOCKS_IN_BATCH =
+ buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch")
+ .internal()
+ .doc("Whether to fetch the contiguous shuffle blocks in batch. Instead
of fetching blocks " +
+ "one by one, fetching contiguous shuffle blocks for the same map task
in batch can " +
+ "reduce IO and improve performance. Note, multiple contiguous blocks
exist in single " +
+ s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}'
and " +
+ s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature
also depends " +
+ "on a relocatable serializer, the concatenation support codec in use
and the new version " +
+ "shuffle fetch protocol.")
+ .booleanConf
+ .createWithDefault(true)
+
val LOCAL_SHUFFLE_READER_ENABLED =
- buildConf("spark.sql.adaptive.shuffle.localShuffleReader.enabled")
- .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this
enables the " +
- "optimization of converting the shuffle reader to local shuffle reader
for the shuffle " +
- "exchange of the broadcast hash join in probe side.")
- .booleanConf
- .createWithDefault(true)
+ buildConf("spark.sql.adaptive.localShuffleReader.enabled")
+ .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark
tries to use local " +
+ "shuffle reader to read the shuffle data when the shuffle partitioning
is not needed, " +
+ "for example, after converting sort-merge join to broadcast-hash
join.")
+ .booleanConf
+ .createWithDefault(true)
- val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED =
- buildConf("spark.sql.adaptive.skewedJoinOptimization.enabled")
- .doc("When true and adaptive execution is enabled, a skewed join is
automatically handled at " +
- "runtime.")
- .booleanConf
- .createWithDefault(true)
+ val SKEW_JOIN_ENABLED =
+ buildConf("spark.sql.adaptive.skewJoin.enabled")
+ .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark
dynamically " +
+ "handles skew in sort-merge join by splitting (and replicating if
needed) skewed " +
+ "partitions.")
+ .booleanConf
+ .createWithDefault(true)
- val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR =
-
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionFactor")
- .doc("A partition is considered as a skewed partition if its size is
larger than" +
- " this factor multiple the median partition size and also larger than
" +
- s" ${SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key}")
+ val SKEW_JOIN_SKEWED_PARTITION_FACTOR =
+ buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
+ .doc("A partition is considered as skewed if its size is larger than
this factor " +
+ "multiplying the median partition size and also larger than " +
+ s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'")
.intConf
.checkValue(_ > 0, "The skew factor must be positive.")
.createWithDefault(10)
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
+ .internal()
.doc("The relation with a non-empty partition ratio lower than this
config will not be " +
"considered as the build side of a broadcast-hash join in adaptive
execution regardless " +
"of its size.This configuration only has an effect when " +
- s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.")
+ s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is true.")
.doubleConf
.checkValue(_ >= 0, "The non-empty partition ratio must be positive
number.")
.createWithDefault(0.2)
@@ -2257,7 +2265,9 @@ object SQLConf {
DeprecatedConfig(ARROW_EXECUTION_ENABLED.key, "3.0",
s"Use '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' instead of it."),
DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0",
- s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it.")
+ s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it."),
+ DeprecatedConfig(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "3.0",
+ s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it.")
)
Map(configs.map { cfg => cfg.key -> cfg } : _*)
@@ -2418,19 +2428,17 @@ class SQLConf extends Serializable with Logging {
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
- def targetPostShuffleInputSize: Long =
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
-
- def fetchShuffleBlocksInBatchEnabled: Boolean =
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)
+ def fetchShuffleBlocksInBatch: Boolean =
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH)
def nonEmptyPartitionRatioForBroadcastJoin: Double =
getConf(NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN)
- def reducePostShufflePartitionsEnabled: Boolean =
getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
+ def coalesceShufflePartitionsEnabled: Boolean =
getConf(COALESCE_PARTITIONS_ENABLED)
- def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
+ def minShufflePartitionNum: Int =
getConf(COALESCE_PARTITIONS_MIN_PARTITION_NUM)
- def maxNumPostShufflePartitions: Int =
-
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
+ def initialShufflePartitionNum: Int =
+
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index eb02259..53ab049 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -127,7 +127,7 @@ class ShuffledRowRDD(
Array.tabulate(dependency.partitioner.numPartitions)(i =>
CoalescedPartitionSpec(i, i + 1)))
}
- if (SQLConf.get.fetchShuffleBlocksInBatchEnabled) {
+ if (SQLConf.get.fetchShuffleBlocksInBatch) {
dependency.rdd.context.setLocalProperty(
SortShuffleManager.FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED_KEY, "true")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index f2ebe1a..b74401e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -91,7 +91,7 @@ case class AdaptiveSparkPlanExec(
// before 'ReduceNumShufflePartitions', as the skewed partition handled
// in 'OptimizeSkewedJoin' rule, should be omitted in
'ReduceNumShufflePartitions'.
OptimizeSkewedJoin(conf),
- ReduceNumShufflePartitions(conf),
+ CoalesceShufflePartitions(conf),
// The rule of 'OptimizeLocalShuffleReader' need to make use of the
'partitionStartIndices'
// in 'ReduceNumShufflePartitions' rule. So it must be after
'ReduceNumShufflePartitions' rule.
OptimizeLocalShuffleReader(conf),
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
similarity index 90%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 767a4b2..a8e2d8e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
/**
- * A rule to reduce the post shuffle partitions based on the map output
statistics, which can
+ * A rule to coalesce the shuffle partitions based on the map output
statistics, which can
* avoid many small reduce tasks that hurt performance.
*/
-case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
- import ReduceNumShufflePartitions._
+case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
+ import CoalesceShufflePartitions._
override def apply(plan: SparkPlan): SparkPlan = {
- if (!conf.reducePostShufflePartitionsEnabled) {
+ if (!conf.coalesceShufflePartitionsEnabled) {
return plan
}
if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])
@@ -70,8 +70,8 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends
Rule[SparkPlan] {
validMetrics.toArray,
firstPartitionIndex = 0,
lastPartitionIndex = distinctNumPreShufflePartitions.head,
- advisoryTargetSize = conf.targetPostShuffleInputSize,
- minNumPartitions = conf.minNumPostShufflePartitions)
+ advisoryTargetSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
+ minNumPartitions = conf.minShufflePartitionNum)
// This transformation adds new nodes, so we must use `transformUp`
here.
val stageIds = shuffleStages.map(_.id).toSet
plan.transformUp {
@@ -88,6 +88,6 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends
Rule[SparkPlan] {
}
}
-object ReduceNumShufflePartitions {
+object CoalesceShufflePartitions {
val COALESCED_SHUFFLE_READER_DESCRIPTION = "coalesced"
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 979fee1..2e8adcf 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -64,11 +64,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
/**
* A partition is considered as a skewed partition if its size is larger
than the median
* partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also
larger than
- * SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.
+ * ADVISORY_PARTITION_SIZE_IN_BYTES.
*/
private def isSkewed(size: Long, medianSize: Long): Boolean = {
- size > medianSize *
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
- size > conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
+ size > medianSize *
conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
+ size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
}
private def medianSize(stats: MapOutputStatistics): Long = {
@@ -87,7 +87,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
* target post-shuffle partition size if avg size is smaller than it.
*/
private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long =
{
- val targetPostShuffleSize =
conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
+ val targetPostShuffleSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_,
medianSize))
// It's impossible that all the partitions are skewed, as we use median
size to define skew.
assert(nonSkewSizes.nonEmpty)
@@ -271,7 +271,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
rightStats: MapOutputStatistics,
nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = {
assert(nonSkewPartitionIndices.nonEmpty)
- val shouldCoalesce =
conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
+ val shouldCoalesce = conf.getConf(SQLConf.COALESCE_PARTITIONS_ENABLED)
if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1))
} else {
@@ -280,7 +280,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
firstPartitionIndex = nonSkewPartitionIndices.head,
// `lastPartitionIndex` is exclusive.
lastPartitionIndex = nonSkewPartitionIndices.last + 1,
- advisoryTargetSize = conf.targetPostShuffleInputSize)
+ advisoryTargetSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES))
}
}
@@ -300,7 +300,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
}
override def apply(plan: SparkPlan): SparkPlan = {
- if (!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED)) {
+ if (!conf.getConf(SQLConf.SKEW_JOIN_ENABLED)) {
return plan
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index ab4176c..28ef793 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -36,8 +36,8 @@ import org.apache.spark.sql.internal.SQLConf
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
private def defaultNumPreShufflePartitions: Int =
- if (conf.adaptiveExecutionEnabled &&
conf.reducePostShufflePartitionsEnabled) {
- conf.maxNumPostShufflePartitions
+ if (conf.adaptiveExecutionEnabled &&
conf.coalesceShufflePartitionsEnabled) {
+ conf.initialShufflePartitionNum
} else {
conf.numShufflePartitions
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
similarity index 96%
rename from
sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
rename to
sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index a32b684..9e77f61 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -23,13 +23,13 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql._
import org.apache.spark.sql.execution.adaptive._
+import
org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
-import
org.apache.spark.sql.execution.adaptive.ReduceNumShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
-class ReduceNumShufflePartitionsSuite extends SparkFunSuite with
BeforeAndAfterAll {
+class CoalesceShufflePartitionsSuite extends SparkFunSuite with
BeforeAndAfterAll {
private var originalActiveSparkSession: Option[SparkSession] = _
private var originalInstantiatedSparkSession: Option[SparkSession] = _
@@ -65,17 +65,17 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite
with BeforeAndAfterA
.setAppName("test")
.set(UI_ENABLED, false)
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
- .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
+ .set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
.set(
- SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
targetPostShuffleInputSize.toString)
minNumPostShufflePartitions match {
case Some(numPartitions) =>
- sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key,
numPartitions.toString)
+ sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key,
numPartitions.toString)
case None =>
- sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1")
+ sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "1")
}
val spark = SparkSession.builder()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 500b6cc..a7fa63d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -135,7 +135,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
- SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "10") {
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "10") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val smj = findTopLevelSortMergeJoin(plan)
@@ -167,7 +167,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
- SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "false") {
+ SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val smj = findTopLevelSortMergeJoin(plan)
@@ -584,7 +584,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "700") {
withTempView("skewData1", "skewData2") {
spark
.range(0, 1000, 1, 10)
@@ -609,7 +609,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") {
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") {
withTempView("skewData1", "skewData2") {
spark
.range(0, 1000, 1, 10)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 48be211..10ea948 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -170,33 +170,33 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean,
but was 10")
}
- test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") {
+ test("Test ADVISORY_PARTITION_SIZE_IN_BYTES's method") {
spark.sessionState.conf.clear()
- spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100")
- assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) ===
100)
+ spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "100")
+ assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100)
- spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k")
- assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) ===
1024)
+ spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1k")
+ assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024)
- spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M")
- assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) ===
1048576)
+ spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1M")
+ assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) ===
1048576)
- spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g")
- assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) ===
1073741824)
+ spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g")
+ assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) ===
1073741824)
- spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1")
- assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) ===
-1)
+ spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
+ assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === -1)
// Test overflow exception
intercept[IllegalArgumentException] {
// This value exceeds Long.MaxValue
- spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
"90000000000g")
+ spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
"90000000000g")
}
intercept[IllegalArgumentException] {
// This value less than Long.MinValue
- spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
"-90000000000g")
+ spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
"-90000000000g")
}
spark.sessionState.conf.clear()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index b54f4f2..57bbf20 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -821,7 +821,7 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils {
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
withSQLConf(
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
- SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "7") {
+ SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
Seq(false, true).foreach { enableAdaptive =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key ->
s"$enableAdaptive") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]