This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new db5236e7c1 [spark] Local-sort mode is supported in spark for
incremental clustering (#7587)
db5236e7c1 is described below
commit db5236e7c1ae190794e61ca92242a41f08d60df5
Author: sanshi <[email protected]>
AuthorDate: Sun May 24 10:31:44 2026 +0800
[spark] Local-sort mode is supported in spark for incremental clustering
(#7587)
---
.../cluster/IncrementalClusterManagerTest.java | 32 ++++
.../paimon/spark/procedure/CompactProcedure.java | 10 +-
.../apache/paimon/spark/sort/HilbertSorter.java | 8 +
.../org/apache/paimon/spark/sort/OrderSorter.java | 6 +
.../org/apache/paimon/spark/sort/TableSorter.java | 7 +
.../org/apache/paimon/spark/sort/ZorderSorter.java | 8 +
.../spark/procedure/CompactProcedureTestBase.scala | 181 +++++++++++++++++++++
7 files changed, 251 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index cf81cdf85f..b38cf3af99 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -146,6 +146,38 @@ public class IncrementalClusterManagerTest {
}
}
+ @Test
+ public void testClusteringIncrementalModeDefault() throws Exception {
+ // Test default mode is GLOBAL_SORT
+ Map<String, String> options = new HashMap<>();
+ FileStoreTable table = createTable(options, Collections.emptyList());
+ IncrementalClusterManager manager = new
IncrementalClusterManager(table);
+ assertThat(manager.clusteringIncrementalMode())
+ .isEqualTo(CoreOptions.ClusteringIncrementalMode.GLOBAL_SORT);
+ }
+
+ @Test
+ public void testClusteringIncrementalModeLocalSort() throws Exception {
+ // Test local-sort mode
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(),
"local-sort");
+ FileStoreTable table = createTable(options, Collections.emptyList());
+ IncrementalClusterManager manager = new
IncrementalClusterManager(table);
+ assertThat(manager.clusteringIncrementalMode())
+ .isEqualTo(CoreOptions.ClusteringIncrementalMode.LOCAL_SORT);
+ }
+
+ @Test
+ public void testClusteringIncrementalModeGlobalSort() throws Exception {
+ // Test explicit global-sort mode
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(),
"global-sort");
+ FileStoreTable table = createTable(options, Collections.emptyList());
+ IncrementalClusterManager manager = new
IncrementalClusterManager(table);
+ assertThat(manager.clusteringIncrementalMode())
+ .isEqualTo(CoreOptions.ClusteringIncrementalMode.GLOBAL_SORT);
+ }
+
@Test
public void testHistoryPartitionAutoClustering() throws Exception {
Map<String, String> options = new HashMap<>();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 274080cd7a..b07fbd26be 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -668,6 +668,9 @@ public class CompactProcedure extends BaseProcedure {
incrementalClusterManager.clusterCurve(),
incrementalClusterManager.clusterKeys());
+ CoreOptions.ClusteringIncrementalMode mode =
+ incrementalClusterManager.clusteringIncrementalMode();
+
Dataset<Row> datasetForWrite =
partitionSplits.values().stream()
.map(Pair::getKey)
@@ -679,7 +682,12 @@ public class CompactProcedure extends BaseProcedure {
ScanPlanHelper$.MODULE$.createNewScanPlan(
splits.toArray(new
DataSplit[0]),
relation));
- return sorter.sort(dataset);
+ // Use sortLocal() for LOCAL_SORT, sort()
for GLOBAL_SORT
+ if (mode ==
CoreOptions.ClusteringIncrementalMode.LOCAL_SORT) {
+ return sorter.sortLocal(dataset);
+ } else {
+ return sorter.sort(dataset);
+ }
})
.reduce(Dataset::union)
.orElse(null);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
index 1f30077131..681104fb19 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
@@ -49,6 +49,14 @@ public class HilbertSorter extends TableSorter {
return sortedDF.drop(H_COLUMN);
}
+ @Override
+ public Dataset<Row> sortLocal(Dataset<Row> df) {
+ Column hilbertColumn = hilbertValue(df);
+ Dataset<Row> hilbertValueDF = df.withColumn(H_COLUMN, hilbertColumn);
+ Dataset<Row> sortedDF =
hilbertValueDF.sortWithinPartitions(hilbertValueDF.col(H_COLUMN));
+ return sortedDF.drop(H_COLUMN);
+ }
+
private Column hilbertValue(Dataset<Row> df) {
SparkHilbertUDF hilbertUDF = new SparkHilbertUDF();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java
index e2fc18f696..fcee0cf97c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java
@@ -39,4 +39,10 @@ public class OrderSorter extends TableSorter {
Column[] sortColumns =
orderColNames.stream().map(input::col).toArray(Column[]::new);
return
input.repartitionByRange(sortColumns).sortWithinPartitions(sortColumns);
}
+
+ @Override
+ public Dataset<Row> sortLocal(Dataset<Row> input) {
+ Column[] sortColumns =
orderColNames.stream().map(input::col).toArray(Column[]::new);
+ return input.sortWithinPartitions(sortColumns);
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
index a96724fad6..de58381fad 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
@@ -62,6 +62,8 @@ public abstract class TableSorter {
public abstract Dataset<Row> sort(Dataset<Row> input);
+ public abstract Dataset<Row> sortLocal(Dataset<Row> input);
+
public static TableSorter getSorter(
FileStoreTable table, OrderType orderType, List<String>
orderColumns) {
switch (orderType) {
@@ -77,6 +79,11 @@ public abstract class TableSorter {
public Dataset<Row> sort(Dataset<Row> input) {
return input;
}
+
+ @Override
+ public Dataset<Row> sortLocal(Dataset<Row> input) {
+ return input;
+ }
};
default:
throw new IllegalArgumentException("cannot match order type: "
+ orderType);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
index 2a6d1b2ceb..32d2e468ac 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
@@ -48,6 +48,14 @@ public class ZorderSorter extends TableSorter {
return sortedDF.drop(Z_COLUMN);
}
+ @Override
+ public Dataset<Row> sortLocal(Dataset<Row> df) {
+ Column zColumn = zValue(df);
+ Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zColumn);
+ Dataset<Row> sortedDF =
zValueDF.sortWithinPartitions(zValueDF.col(Z_COLUMN));
+ return sortedDF.drop(Z_COLUMN);
+ }
+
private Column zValue(Dataset<Row> df) {
SparkZOrderUDF zOrderUDF =
new SparkZOrderUDF(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 2bea2144a3..7cbe2b6659 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -1339,6 +1339,187 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
}
}
+ test("Paimon Procedure: incremental clustering with local-sort mode") {
+ withTable("T") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b INT, c STRING)
+ |TBLPROPERTIES (
+ | 'bucket'='-1',
+ | 'num-levels'='4',
+ | 'num-sorted-run.compaction-trigger'='2',
+ | 'clustering.columns'='a',
+ | 'clustering.incremental'='true',
+ | 'clustering.incremental.mode'='local-sort'
+ |)
+ |""".stripMargin)
+
+ // Insert data in multiple batches to create multiple files
+ spark.sql("INSERT INTO T VALUES (5, 1, 'a')")
+ spark.sql("INSERT INTO T VALUES (3, 2, 'b')")
+ spark.sql("INSERT INTO T VALUES (1, 3, 'c')")
+ spark.sql("INSERT INTO T VALUES (2, 3, 'd')")
+ spark.sql(
+ "INSERT INTO T SELECT /*+ COALESCE(1) */ col1, col2, col3 FROM VALUES
(8, 3, 'e'), (1, 4, 'f'), (2, 5, 'g') AS t(col1, col2, col3);")
+
+ checkAnswer(
+ spark.sql("SELECT a, b, c FROM T"),
+ Seq(
+ Row(5, 1, "a"),
+ Row(3, 2, "b"),
+ Row(1, 3, "c"),
+ Row(2, 3, "d"),
+ Row(8, 3, "e"),
+ Row(1, 4, "f"),
+ Row(2, 5, "g"))
+ )
+
+ val filesBeforeCompact = spark.sql("SELECT count(*) FROM
`T$files`").collect()(0).getLong(0)
+ Assertions.assertThat(filesBeforeCompact).isEqualTo(5)
+
+ // Run compaction to trigger incremental clustering
+ checkAnswer(spark.sql("CALL sys.compact(table => 'T')"), Row(true) ::
Nil)
+
+ // Verify data integrity
+ checkAnswer(
+ spark.sql("SELECT a, b, c FROM T"),
+ Seq(
+ Row(5, 1, "a"),
+ Row(3, 2, "b"),
+ Row(1, 3, "c"),
+ Row(2, 3, "d"),
+ Row(1, 4, "f"),
+ Row(2, 5, "g"),
+ Row(8, 3, "e"))
+ )
+
+ // Verify files are clustered (level >= 1)
+ val files = spark.sql("SELECT level FROM `T$files`").collect()
+ files.foreach(row =>
Assertions.assertThat(row.getInt(0)).isGreaterThanOrEqualTo(1))
+ val filesAfterCompact = spark.sql("SELECT count(*) FROM
`T$files`").collect()(0).getLong(0)
+
Assertions.assertThat(filesAfterCompact).isLessThanOrEqualTo(filesBeforeCompact)
+
+ // Verify local-sort effect: within each file, rows are physically
sorted by column 'a'.
+ // file_path in T$files is an absolute path (bucketPath + "/" +
fileName), so we can
+ // read each file directly as parquet (bypassing Paimon's scan) to check
physical row order.
+ val fileRows =
+ spark.sql("SELECT file_path, record_count FROM `T$files`").collect()
+ fileRows.foreach {
+ row =>
+ val filePath = row.getString(0)
+ val recordCount = row.getLong(1)
+ if (recordCount > 1) {
+ // For multi-row files, verify rows are physically sorted by 'a'
+ val aValues =
+
spark.read.format("parquet").load(filePath).select("a").collect().map(_.getInt(0))
+ for (i <- 1 until aValues.length) {
+ Assertions
+ .assertThat(aValues(i))
+ .as(
+ s"File $filePath: row $i (a=${aValues(i)}) should be >= row
${i - 1} (a=${aValues(i - 1)})")
+ .isGreaterThanOrEqualTo(aValues(i - 1))
+ }
+ }
+ }
+
+ val table = loadTable("T").asInstanceOf[FileStoreTable]
+ checkSnapshot(table)
+ }
+ }
+
+ test("Paimon Procedure: incremental clustering local-sort vs global-sort") {
+ withTable("T_local", "T_global") {
+ // Create table with local-sort
+ spark.sql(s"""
+ |CREATE TABLE T_local (a INT, b INT)
+ |TBLPROPERTIES (
+ | 'bucket'='-1',
+ | 'num-levels'='4',
+ | 'num-sorted-run.compaction-trigger'='2',
+ | 'clustering.columns'='a',
+ | 'clustering.incremental'='true',
+ | 'clustering.incremental.mode'='local-sort'
+ |)
+ |""".stripMargin)
+
+ // Create table with global-sort (default)
+ spark.sql(s"""
+ |CREATE TABLE T_global (a INT, b INT)
+ |TBLPROPERTIES (
+ | 'bucket'='-1',
+ | 'num-levels'='4',
+ | 'num-sorted-run.compaction-trigger'='2',
+ | 'clustering.columns'='a',
+ | 'clustering.incremental'='true',
+ | 'clustering.incremental.mode'='global-sort'
+ |)
+ |""".stripMargin)
+
+ // Insert same data into both tables
+ for (table <- Seq("T_local", "T_global")) {
+ spark.sql(s"INSERT INTO $table VALUES (5, 1), (3, 2)")
+ spark.sql(s"INSERT INTO $table VALUES (8, 3), (1, 4)")
+ spark.sql(s"INSERT INTO $table VALUES (7, 5), (2, 6)")
+ spark.sql(s"INSERT INTO $table VALUES (4, 7), (6, 8)")
+ }
+
+ // Run compact on both
+ spark.sql("CALL sys.compact(table => 'T_local')")
+ spark.sql("CALL sys.compact(table => 'T_global')")
+
+ // Both should have same data
+ checkAnswer(
+ spark.sql("SELECT a, b FROM T_local ORDER BY a"),
+ spark.sql("SELECT a, b FROM T_global ORDER BY a")
+ )
+
+ val localFileRows =
+ spark.sql("SELECT file_path, record_count FROM
`T_local$files`").collect()
+ val globalFileRows =
+ spark.sql("SELECT file_path, record_count FROM
`T_global$files`").collect()
+
+ // Global-sort uses repartitionByRange which shuffles all data into 1
sorted file.
+ // Reading multiple files in parallel gives non-deterministic cross-file
ordering,
+ // so we must verify physical ordering by reading each parquet file
directly.
+ Assertions.assertThat(globalFileRows.length).isEqualTo(1)
+ val globalAValues =
+ spark.read
+ .format("parquet")
+ .load(globalFileRows(0).getString(0))
+ .select("a")
+ .collect()
+ .map(_.getInt(0))
+ Assertions.assertThat(globalAValues).isEqualTo(Array(1, 2, 3, 4, 5, 6,
7, 8))
+
+ // Local-sort uses sortWithinPartitions only (no range shuffle), so
multiple output
+ // files are produced. Each file is individually sorted by 'a', but
ranges may overlap.
+
Assertions.assertThat(localFileRows.length.toLong).isGreaterThan(globalFileRows.length)
+ var localFilesWithMultiRows = 0
+ localFileRows.foreach {
+ row =>
+ val filePath = row.getString(0)
+ val recordCount = row.getLong(1)
+ if (recordCount > 1) {
+ localFilesWithMultiRows += 1
+ val aValues =
+
spark.read.format("parquet").load(filePath).select("a").collect().map(_.getInt(0))
+ for (i <- 1 until aValues.length) {
+ Assertions
+ .assertThat(aValues(i))
+ .as(
+ s"local-sort file $filePath: a[$i]=${aValues(i)} should be
>= a[${i - 1}]=${aValues(i - 1)}")
+ .isGreaterThanOrEqualTo(aValues(i - 1))
+ }
+ }
+ }
+ Assertions.assertThat(localFilesWithMultiRows).isGreaterThan(0)
+
+ val localFiles = spark.sql("SELECT count(*) FROM
`T_local$files`").collect()(0).getLong(0)
+ val globalFiles = spark.sql("SELECT count(*) FROM
`T_global$files`").collect()(0).getLong(0)
+ Assertions.assertThat(localFiles).isGreaterThanOrEqualTo(globalFiles)
+ Assertions.assertThat(globalFiles).isEqualTo(1)
+ }
+ }
+
def checkSnapshot(table: FileStoreTable): Unit = {
Assertions
.assertThat(table.latestSnapshot().get().commitKind().toString)