This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new db5236e7c1 [spark]  Local-sort mode is supported in spark for 
incremental clustering (#7587)
db5236e7c1 is described below

commit db5236e7c1ae190794e61ca92242a41f08d60df5
Author: sanshi <[email protected]>
AuthorDate: Sun May 24 10:31:44 2026 +0800

    [spark]  Local-sort mode is supported in spark for incremental clustering 
(#7587)
---
 .../cluster/IncrementalClusterManagerTest.java     |  32 ++++
 .../paimon/spark/procedure/CompactProcedure.java   |  10 +-
 .../apache/paimon/spark/sort/HilbertSorter.java    |   8 +
 .../org/apache/paimon/spark/sort/OrderSorter.java  |   6 +
 .../org/apache/paimon/spark/sort/TableSorter.java  |   7 +
 .../org/apache/paimon/spark/sort/ZorderSorter.java |   8 +
 .../spark/procedure/CompactProcedureTestBase.scala | 181 +++++++++++++++++++++
 7 files changed, 251 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index cf81cdf85f..b38cf3af99 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -146,6 +146,38 @@ public class IncrementalClusterManagerTest {
         }
     }
 
+    @Test
+    public void testClusteringIncrementalModeDefault() throws Exception {
+        // Test default mode is GLOBAL_SORT
+        Map<String, String> options = new HashMap<>();
+        FileStoreTable table = createTable(options, Collections.emptyList());
+        IncrementalClusterManager manager = new 
IncrementalClusterManager(table);
+        assertThat(manager.clusteringIncrementalMode())
+                .isEqualTo(CoreOptions.ClusteringIncrementalMode.GLOBAL_SORT);
+    }
+
+    @Test
+    public void testClusteringIncrementalModeLocalSort() throws Exception {
+        // Test local-sort mode
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(), 
"local-sort");
+        FileStoreTable table = createTable(options, Collections.emptyList());
+        IncrementalClusterManager manager = new 
IncrementalClusterManager(table);
+        assertThat(manager.clusteringIncrementalMode())
+                .isEqualTo(CoreOptions.ClusteringIncrementalMode.LOCAL_SORT);
+    }
+
+    @Test
+    public void testClusteringIncrementalModeGlobalSort() throws Exception {
+        // Test explicit global-sort mode
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(), 
"global-sort");
+        FileStoreTable table = createTable(options, Collections.emptyList());
+        IncrementalClusterManager manager = new 
IncrementalClusterManager(table);
+        assertThat(manager.clusteringIncrementalMode())
+                .isEqualTo(CoreOptions.ClusteringIncrementalMode.GLOBAL_SORT);
+    }
+
     @Test
     public void testHistoryPartitionAutoClustering() throws Exception {
         Map<String, String> options = new HashMap<>();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 274080cd7a..b07fbd26be 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -668,6 +668,9 @@ public class CompactProcedure extends BaseProcedure {
                 incrementalClusterManager.clusterCurve(),
                 incrementalClusterManager.clusterKeys());
 
+        CoreOptions.ClusteringIncrementalMode mode =
+                incrementalClusterManager.clusteringIncrementalMode();
+
         Dataset<Row> datasetForWrite =
                 partitionSplits.values().stream()
                         .map(Pair::getKey)
@@ -679,7 +682,12 @@ public class CompactProcedure extends BaseProcedure {
                                                     
ScanPlanHelper$.MODULE$.createNewScanPlan(
                                                             splits.toArray(new 
DataSplit[0]),
                                                             relation));
-                                    return sorter.sort(dataset);
+                                    // Use sortLocal() for LOCAL_SORT, sort() 
for GLOBAL_SORT
+                                    if (mode == 
CoreOptions.ClusteringIncrementalMode.LOCAL_SORT) {
+                                        return sorter.sortLocal(dataset);
+                                    } else {
+                                        return sorter.sort(dataset);
+                                    }
                                 })
                         .reduce(Dataset::union)
                         .orElse(null);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
index 1f30077131..681104fb19 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
@@ -49,6 +49,14 @@ public class HilbertSorter extends TableSorter {
         return sortedDF.drop(H_COLUMN);
     }
 
+    @Override
+    public Dataset<Row> sortLocal(Dataset<Row> df) {
+        Column hilbertColumn = hilbertValue(df);
+        Dataset<Row> hilbertValueDF = df.withColumn(H_COLUMN, hilbertColumn);
+        Dataset<Row> sortedDF = 
hilbertValueDF.sortWithinPartitions(hilbertValueDF.col(H_COLUMN));
+        return sortedDF.drop(H_COLUMN);
+    }
+
     private Column hilbertValue(Dataset<Row> df) {
         SparkHilbertUDF hilbertUDF = new SparkHilbertUDF();
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java
index e2fc18f696..fcee0cf97c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java
@@ -39,4 +39,10 @@ public class OrderSorter extends TableSorter {
         Column[] sortColumns = 
orderColNames.stream().map(input::col).toArray(Column[]::new);
         return 
input.repartitionByRange(sortColumns).sortWithinPartitions(sortColumns);
     }
+
+    @Override
+    public Dataset<Row> sortLocal(Dataset<Row> input) {
+        Column[] sortColumns = 
orderColNames.stream().map(input::col).toArray(Column[]::new);
+        return input.sortWithinPartitions(sortColumns);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
index a96724fad6..de58381fad 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
@@ -62,6 +62,8 @@ public abstract class TableSorter {
 
     public abstract Dataset<Row> sort(Dataset<Row> input);
 
+    public abstract Dataset<Row> sortLocal(Dataset<Row> input);
+
     public static TableSorter getSorter(
             FileStoreTable table, OrderType orderType, List<String> 
orderColumns) {
         switch (orderType) {
@@ -77,6 +79,11 @@ public abstract class TableSorter {
                     public Dataset<Row> sort(Dataset<Row> input) {
                         return input;
                     }
+
+                    @Override
+                    public Dataset<Row> sortLocal(Dataset<Row> input) {
+                        return input;
+                    }
                 };
             default:
                 throw new IllegalArgumentException("cannot match order type: " 
+ orderType);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
index 2a6d1b2ceb..32d2e468ac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
@@ -48,6 +48,14 @@ public class ZorderSorter extends TableSorter {
         return sortedDF.drop(Z_COLUMN);
     }
 
+    @Override
+    public Dataset<Row> sortLocal(Dataset<Row> df) {
+        Column zColumn = zValue(df);
+        Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zColumn);
+        Dataset<Row> sortedDF = 
zValueDF.sortWithinPartitions(zValueDF.col(Z_COLUMN));
+        return sortedDF.drop(Z_COLUMN);
+    }
+
     private Column zValue(Dataset<Row> df) {
         SparkZOrderUDF zOrderUDF =
                 new SparkZOrderUDF(
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 2bea2144a3..7cbe2b6659 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -1339,6 +1339,187 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     }
   }
 
+  test("Paimon Procedure: incremental clustering with local-sort mode") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (a INT, b INT, c STRING)
+                   |TBLPROPERTIES (
+                   |  'bucket'='-1',
+                   |  'num-levels'='4',
+                   |  'num-sorted-run.compaction-trigger'='2',
+                   |  'clustering.columns'='a',
+                   |  'clustering.incremental'='true',
+                   |  'clustering.incremental.mode'='local-sort'
+                   |)
+                   |""".stripMargin)
+
+      // Insert data in multiple batches to create multiple files
+      spark.sql("INSERT INTO T VALUES (5, 1, 'a')")
+      spark.sql("INSERT INTO T VALUES (3, 2, 'b')")
+      spark.sql("INSERT INTO T VALUES (1, 3, 'c')")
+      spark.sql("INSERT INTO T VALUES (2, 3, 'd')")
+      spark.sql(
+        "INSERT INTO T SELECT /*+ COALESCE(1) */ col1, col2, col3 FROM VALUES 
(8, 3, 'e'), (1, 4, 'f'), (2, 5, 'g') AS t(col1, col2, col3);")
+
+      checkAnswer(
+        spark.sql("SELECT a, b, c FROM T"),
+        Seq(
+          Row(5, 1, "a"),
+          Row(3, 2, "b"),
+          Row(1, 3, "c"),
+          Row(2, 3, "d"),
+          Row(8, 3, "e"),
+          Row(1, 4, "f"),
+          Row(2, 5, "g"))
+      )
+
+      val filesBeforeCompact = spark.sql("SELECT count(*) FROM 
`T$files`").collect()(0).getLong(0)
+      Assertions.assertThat(filesBeforeCompact).isEqualTo(5)
+
+      // Run compaction to trigger incremental clustering
+      checkAnswer(spark.sql("CALL sys.compact(table => 'T')"), Row(true) :: 
Nil)
+
+      // Verify data integrity
+      checkAnswer(
+        spark.sql("SELECT a, b, c FROM T"),
+        Seq(
+          Row(5, 1, "a"),
+          Row(3, 2, "b"),
+          Row(1, 3, "c"),
+          Row(2, 3, "d"),
+          Row(1, 4, "f"),
+          Row(2, 5, "g"),
+          Row(8, 3, "e"))
+      )
+
+      // Verify files are clustered (level >= 1)
+      val files = spark.sql("SELECT level FROM `T$files`").collect()
+      files.foreach(row => 
Assertions.assertThat(row.getInt(0)).isGreaterThanOrEqualTo(1))
+      val filesAfterCompact = spark.sql("SELECT count(*) FROM 
`T$files`").collect()(0).getLong(0)
+      
Assertions.assertThat(filesAfterCompact).isLessThanOrEqualTo(filesBeforeCompact)
+
+      // Verify local-sort effect: within each file, rows are physically 
sorted by column 'a'.
+      // file_path in T$files is an absolute path (bucketPath + "/" + 
fileName), so we can
+      // read each file directly as parquet (bypassing Paimon's scan) to check 
physical row order.
+      val fileRows =
+        spark.sql("SELECT file_path, record_count FROM `T$files`").collect()
+      fileRows.foreach {
+        row =>
+          val filePath = row.getString(0)
+          val recordCount = row.getLong(1)
+          if (recordCount > 1) {
+            // For multi-row files, verify rows are physically sorted by 'a'
+            val aValues =
+              
spark.read.format("parquet").load(filePath).select("a").collect().map(_.getInt(0))
+            for (i <- 1 until aValues.length) {
+              Assertions
+                .assertThat(aValues(i))
+                .as(
+                  s"File $filePath: row $i (a=${aValues(i)}) should be >= row 
${i - 1} (a=${aValues(i - 1)})")
+                .isGreaterThanOrEqualTo(aValues(i - 1))
+            }
+          }
+      }
+
+      val table = loadTable("T").asInstanceOf[FileStoreTable]
+      checkSnapshot(table)
+    }
+  }
+
+  test("Paimon Procedure: incremental clustering local-sort vs global-sort") {
+    withTable("T_local", "T_global") {
+      // Create table with local-sort
+      spark.sql(s"""
+                   |CREATE TABLE T_local (a INT, b INT)
+                   |TBLPROPERTIES (
+                   |  'bucket'='-1',
+                   |  'num-levels'='4',
+                   |  'num-sorted-run.compaction-trigger'='2',
+                   |  'clustering.columns'='a',
+                   |  'clustering.incremental'='true',
+                   |  'clustering.incremental.mode'='local-sort'
+                   |)
+                   |""".stripMargin)
+
+      // Create table with global-sort (default)
+      spark.sql(s"""
+                   |CREATE TABLE T_global (a INT, b INT)
+                   |TBLPROPERTIES (
+                   |  'bucket'='-1',
+                   |  'num-levels'='4',
+                   |  'num-sorted-run.compaction-trigger'='2',
+                   |  'clustering.columns'='a',
+                   |  'clustering.incremental'='true',
+                   |  'clustering.incremental.mode'='global-sort'
+                   |)
+                   |""".stripMargin)
+
+      // Insert same data into both tables
+      for (table <- Seq("T_local", "T_global")) {
+        spark.sql(s"INSERT INTO $table VALUES (5, 1), (3, 2)")
+        spark.sql(s"INSERT INTO $table VALUES (8, 3), (1, 4)")
+        spark.sql(s"INSERT INTO $table VALUES (7, 5), (2, 6)")
+        spark.sql(s"INSERT INTO $table VALUES (4, 7), (6, 8)")
+      }
+
+      // Run compact on both
+      spark.sql("CALL sys.compact(table => 'T_local')")
+      spark.sql("CALL sys.compact(table => 'T_global')")
+
+      // Both should have same data
+      checkAnswer(
+        spark.sql("SELECT a, b FROM T_local ORDER BY a"),
+        spark.sql("SELECT a, b FROM T_global ORDER BY a")
+      )
+
+      val localFileRows =
+        spark.sql("SELECT file_path, record_count FROM 
`T_local$files`").collect()
+      val globalFileRows =
+        spark.sql("SELECT file_path, record_count FROM 
`T_global$files`").collect()
+
+      // Global-sort uses repartitionByRange which shuffles all data into 1 
sorted file.
+      // Reading multiple files in parallel gives non-deterministic cross-file 
ordering,
+      // so we must verify physical ordering by reading each parquet file 
directly.
+      Assertions.assertThat(globalFileRows.length).isEqualTo(1)
+      val globalAValues =
+        spark.read
+          .format("parquet")
+          .load(globalFileRows(0).getString(0))
+          .select("a")
+          .collect()
+          .map(_.getInt(0))
+      Assertions.assertThat(globalAValues).isEqualTo(Array(1, 2, 3, 4, 5, 6, 
7, 8))
+
+      // Local-sort uses sortWithinPartitions only (no range shuffle), so 
multiple output
+      // files are produced. Each file is individually sorted by 'a', but 
ranges may overlap.
+      
Assertions.assertThat(localFileRows.length.toLong).isGreaterThan(globalFileRows.length)
+      var localFilesWithMultiRows = 0
+      localFileRows.foreach {
+        row =>
+          val filePath = row.getString(0)
+          val recordCount = row.getLong(1)
+          if (recordCount > 1) {
+            localFilesWithMultiRows += 1
+            val aValues =
+              
spark.read.format("parquet").load(filePath).select("a").collect().map(_.getInt(0))
+            for (i <- 1 until aValues.length) {
+              Assertions
+                .assertThat(aValues(i))
+                .as(
+                  s"local-sort file $filePath: a[$i]=${aValues(i)} should be 
>= a[${i - 1}]=${aValues(i - 1)}")
+                .isGreaterThanOrEqualTo(aValues(i - 1))
+            }
+          }
+      }
+      Assertions.assertThat(localFilesWithMultiRows).isGreaterThan(0)
+
+      val localFiles = spark.sql("SELECT count(*) FROM 
`T_local$files`").collect()(0).getLong(0)
+      val globalFiles = spark.sql("SELECT count(*) FROM 
`T_global$files`").collect()(0).getLong(0)
+      Assertions.assertThat(localFiles).isGreaterThanOrEqualTo(globalFiles)
+      Assertions.assertThat(globalFiles).isEqualTo(1)
+    }
+  }
+
   def checkSnapshot(table: FileStoreTable): Unit = {
     Assertions
       .assertThat(table.latestSnapshot().get().commitKind().toString)

Reply via email to