[
https://issues.apache.org/jira/browse/SPARK-32056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-32056.
---------------------------------
Fix Version/s: 3.1.0
Resolution: Fixed
Issue resolved by pull request 28900
[https://github.com/apache/spark/pull/28900]
> Repartition by key should support partition coalesce for AQE
> ------------------------------------------------------------
>
> Key: SPARK-32056
> URL: https://issues.apache.org/jira/browse/SPARK-32056
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Environment: spark release 3.0.0
> Reporter: koert kuipers
> Priority: Minor
> Fix For: 3.1.0
>
>
> when adaptive query execution is enabled the following expression should
> support coalescing of partitions:
> {code:java}
> dataframe.repartition(col("somecolumn")) {code}
> currently it does not because it simply calls the repartition implementation
> where number of partitions is specified:
> {code:java}
> def repartition(partitionExprs: Column*): Dataset[T] = {
> repartition(sparkSession.sessionState.conf.numShufflePartitions,
> partitionExprs: _*)
> }{code}
> and repartition with the number of partitions specified does now allow for
> coalescing of partitions (since this breaks the user's expectation that it
> will have the number of partitions specified).
> for more context see the discussion here:
> [https://github.com/apache/spark/pull/27986]
> a simple test to confirm that repartition by key does not support coalescing
> of partitions can be added in AdaptiveQueryExecSuite like this (it currently
> fails):
> {code:java}
> test("SPARK-32056 repartition has less partitions for small data when
> adaptiveExecutionEnabled") {
> Seq(true, false).foreach { enableAQE =>
> withSQLConf(
> SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
> SQLConf.SHUFFLE_PARTITIONS.key -> "50",
> SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50",
> SQLConf.SHUFFLE_PARTITIONS.key -> "50") {
> val partitionsNum = (1 to 10).toDF.repartition($"value")
> .rdd.collectPartitions().length
> if (enableAQE) {
> assert(partitionsNum < 50)
> } else {
> assert(partitionsNum === 50)
> }
> }
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]