This is an automated email from the ASF dual-hosted git repository.
tsreaper pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f9d70f22c1 [spark] skip clustering in writing phase for postpone table
(#7782)
f9d70f22c1 is described below
commit f9d70f22c1a93fc961926996e51d9e183b4bcde9
Author: LsomeYeah <[email protected]>
AuthorDate: Fri May 8 13:45:42 2026 +0800
[spark] skip clustering in writing phase for postpone table (#7782)
---
.../paimon/spark/commands/PaimonSparkWriter.scala | 1 +
.../paimon/spark/sql/PostponeBucketTableTest.scala | 42 ++++++++++++++++++++++
2 files changed, 43 insertions(+)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 1a84123215..f2d0135580 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -305,6 +305,7 @@ case class PaimonSparkWriter(
}
val clusteringColumns = coreOptions.clusteringColumns()
if (
+ table.bucketMode() != POSTPONE_MODE &&
(!coreOptions.clusteringIncrementalEnabled() || coreOptions
.clusteringIncrementalOptimizeWrite()) &&
(!clusteringColumns.isEmpty)
) {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
index 07fe32308e..d984f1385c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
@@ -291,4 +291,46 @@ class PostponeBucketTableTest extends PaimonSparkTestBase {
checkAnswer(sql("SELECT count(1) FROM `t$snapshots`"), Seq(Row(5)))
}
}
+
+ test("Postpone bucket table: skip clustering in writing phase") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | k INT,
+ | v STRING
+ |) TBLPROPERTIES (
+ | 'primary-key' = 'k',
+ | 'bucket' = '-2',
+ | 'postpone.batch-write-fixed-bucket' = 'false',
+ | 'clustering.columns' = 'k',
+ | 'clustering.strategy' = 'order'
+ |)
+ |""".stripMargin)
+
+ val before = System.currentTimeMillis()
+
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(4) */
+ |id AS k,
+ |CAST(id AS STRING) AS v
+ |FROM range (0, 100)
+ |""".stripMargin)
+
+ // Verify no Sort operator in the plan (clustering is skipped)
+ val executions = spark.sharedState.statusStore.executionsList()
+ val hasSort = executions.exists {
+ e =>
+ e.submissionTime > before &&
+ e.physicalPlanDescription != null &&
+ e.physicalPlanDescription.toLowerCase.contains("sort")
+ }
+ assert(!hasSort, "Postpone table should skip clustering (no sort in
plan)")
+
+ // Verify data was written to postpone directory (bucket=-2)
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets`"),
+ Seq(Row(-2))
+ )
+ }
+ }
}