koert kuipers created SPARK-32056:
-------------------------------------
Summary: Repartition by key should support partition coalesce for
AQE
Key: SPARK-32056
URL: https://issues.apache.org/jira/browse/SPARK-32056
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.0.0
Environment: spark release 3.0.0
Reporter: koert kuipers
when adaptive query execution is enabled the following expression should
support coalescing of partitions:
{code:java}
dataframe.repartition(col("somecolumn")) {code}
currently it does not because it simply calls the repartition implementation
where number of partitions is specified:
{code:java}
def repartition(partitionExprs: Column*): Dataset[T] = {
repartition(sparkSession.sessionState.conf.numShufflePartitions,
partitionExprs: _*)
}{code}
and repartition with the number of partitions specified does now allow for
coalescing of partitions (since this breaks the user's expectation that it will
have the number of partitions specified).
for more context see the discussion here:
[https://github.com/apache/spark/pull/27986]
a simple test to confirm that repartition by key does not support coalescing of
partitions can be added in AdaptiveQueryExecSuite like this (it currently
fails):
{code:java}
test("SPARK-????? repartition has less partitions for small data when
adaptiveExecutionEnabled") {
Seq(true, false).foreach { enableAQE =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
SQLConf.SHUFFLE_PARTITIONS.key -> "50",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50",
SQLConf.SHUFFLE_PARTITIONS.key -> "50") {
val partitionsNum = (1 to 10).toDF.repartition($"value")
.rdd.collectPartitions().length
if (enableAQE) {
assert(partitionsNum < 50)
} else {
assert(partitionsNum === 50)
}
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]