This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e27ceb464 Default parallelism also considers numShufflePartitions
(#3218)
e27ceb464 is described below
commit e27ceb464244f5a0c2bfa2a7c6db649ca945212b
Author: Zhen Wang <[email protected]>
AuthorDate: Tue Apr 16 16:20:55 2024 +0800
Default parallelism also considers numShufflePartitions (#3218)
---
.../scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index da269d486..ab42a4317 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -130,7 +130,11 @@ case class PaimonSparkWriter(table: FileStoreTable) {
// Topology: input -> shuffle by special key & partition hash ->
bucket-assigner -> shuffle by partition & bucket
val numParallelism =
Option(table.coreOptions.dynamicBucketAssignerParallelism)
.map(_.toInt)
- .getOrElse(sparkSession.sparkContext.defaultParallelism)
+ .getOrElse {
+ val defaultParallelism =
sparkSession.sparkContext.defaultParallelism
+ val numShufflePartitions =
sparkSession.sessionState.conf.numShufflePartitions
+ Math.max(defaultParallelism, numShufflePartitions)
+ }
val numAssigners =
Option(table.coreOptions.dynamicBucketInitialBuckets)
.map(initialBuckets => Math.min(initialBuckets.toInt,
numParallelism))
.getOrElse(numParallelism)