This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 97ffce0374 [spark] Apply partition.sink-strategy to spark write (#5856)
97ffce0374 is described below
commit 97ffce0374ac1212ebcfb86709fe40c44d567cc5
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Jul 9 16:59:51 2025 +0800
[spark] Apply partition.sink-strategy to spark write (#5856)
---
.../paimon/spark/commands/PaimonSparkWriter.scala | 21 ++++++--
.../spark/write/PaimonWriteRequirement.scala | 8 ++-
.../paimon/spark/sql/WriteDistributeModeTest.scala | 57 ++++++++++++++++++++++
3 files changed, 80 insertions(+), 6 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index e8c72ccff2..d3ea09eb4e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
-import org.apache.paimon.CoreOptions.WRITE_ONLY
+import org.apache.paimon.CoreOptions.{PartitionSinkStrategy, WRITE_ONLY}
import org.apache.paimon.codegen.CodeGenUtils
import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
import org.apache.paimon.data.serializer.InternalSerializers
@@ -42,7 +42,7 @@ import org.apache.paimon.utils.SerializationUtils
import org.apache.spark.{Partitioner, TaskContext}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import java.io.IOException
@@ -233,7 +233,15 @@ case class PaimonSparkWriter(table: FileStoreTable)
extends WriteHelper {
}
case BUCKET_UNAWARE | POSTPONE_MODE =>
- writeWithoutBucket(data)
+ if (
+
coreOptions.partitionSinkStrategy().equals(PartitionSinkStrategy.HASH) &&
!tableSchema
+ .partitionKeys()
+ .isEmpty
+ ) {
+ writeWithoutBucket(data.repartition(partitionCols(data): _*))
+ } else {
+ writeWithoutBucket(data)
+ }
case HASH_FIXED =>
if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {
@@ -410,14 +418,17 @@ case class PaimonSparkWriter(table: FileStoreTable)
extends WriteHelper {
}
private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = {
+ df.repartition(partitionCols(df) ++ Seq(col(BUCKET_COL)): _*)
+ }
+
+ def partitionCols(df: DataFrame): Seq[Column] = {
val inputSchema = df.schema
- val partitionCols = tableSchema
+ tableSchema
.partitionKeys()
.asScala
.map(tableSchema.fieldNames().indexOf(_))
.map(x => col(inputSchema.fieldNames(x)))
.toSeq
- df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
}
private def deserializeCommitMessage(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
index 097dbf7907..1f95c19146 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.write
+import org.apache.paimon.CoreOptions.PartitionSinkStrategy
import org.apache.paimon.spark.commands.BucketExpression.quote
import org.apache.paimon.table.BucketMode._
import org.apache.paimon.table.FileStoreTable
@@ -56,7 +57,12 @@ object PaimonWriteRequirement {
val clusteringExpressions =
(partitionTransforms ++
bucketTransforms).map(identity[Expression]).toArray
- if (clusteringExpressions.isEmpty) {
+ if (
+ clusteringExpressions.isEmpty || (bucketTransforms.isEmpty && table
+ .coreOptions()
+ .partitionSinkStrategy()
+ .equals(PartitionSinkStrategy.NONE))
+ ) {
EMPTY
} else {
val distribution: ClusteredDistribution =
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteDistributeModeTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteDistributeModeTest.scala
new file mode 100644
index 0000000000..23b8602431
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteDistributeModeTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+
+class WriteDistributeModeTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelper {
+
+ test("Write distribute mode: write partitioned bucket -1 table") {
+ for (distributeMode <- Seq("none", "hash")) {
+ withTable("t") {
+ sql(
+ "CREATE TABLE t (id INT, pt STRING) partitioned by (pt)
TBLPROPERTIES ('file.format'='avro')")
+ val query = "INSERT INTO t VALUES (1, 'p1'), (2, 'p2')"
+
+ withSparkSQLConf(
+ "spark.paimon.write.use-v2-write" -> "true",
+ "spark.paimon.partition.sink-strategy" -> distributeMode) {
+ val df = spark.sql(query)
+ val shuffleNodes = collect(
+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
{
+ case shuffle: ShuffleExchangeLike => shuffle
+ }
+
+ if (distributeMode == "none") {
+ assert(shuffleNodes.isEmpty)
+ } else {
+ assert(shuffleNodes.size == 1)
+ }
+
+ checkAnswer(spark.sql("SELECT * FROM t ORDER BY id"), Seq(Row(1,
"p1"), Row(2, "p2")))
+ }
+ }
+ }
+ }
+}