This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 24ff42103a [core] Enable clustering before write phase for incremental 
clustering table (#7331)
24ff42103a is described below

commit 24ff42103afc11512d2d59c19e9853b95e18164f
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Mar 3 10:41:03 2026 +0800

    [core] Enable clustering before write phase for incremental clustering 
table (#7331)
---
 .../shortcodes/generated/core_configuration.html   | 18 +++++++++-----
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 +++++++++
 .../paimon/flink/sink/FlinkTableSinkBase.java      |  4 +++-
 .../RangePartitionAndSortForAppendTableITCase.java | 28 ++++++++++++++++++++++
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  5 +++-
 5 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9d6c1948dd..4f012492d8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -56,6 +56,12 @@ under the License.
             <td>Boolean</td>
             <td>Write blob field using blob descriptor rather than blob 
bytes.</td>
         </tr>
+        <tr>
+            <td><h5>blob-descriptor-field</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Comma-separated BLOB field names to store as serialized 
BlobDescriptor bytes inline in data files.</td>
+        </tr>
         <tr>
             <td><h5>blob-field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -68,12 +74,6 @@ under the License.
             <td>Boolean</td>
             <td>Whether to consider blob file size as a factor when performing 
scan splitting.</td>
         </tr>
-        <tr>
-            <td><h5>blob-descriptor-field</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>Comma-separated BLOB field names to store as serialized 
BlobDescriptor bytes inline in data files.</td>
-        </tr>
         <tr>
             <td><h5>blob.target-file-size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -200,6 +200,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether enable incremental clustering.</td>
         </tr>
+        <tr>
+            <td><h5>clustering.incremental.optimize-write</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether enable perform clustering before write phase when 
incremental clustering is enabled.</td>
+        </tr>
         <tr>
             <td><h5>clustering.strategy</h5></td>
             <td style="word-wrap: break-word;">"auto"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 34e680433b..ca3c576512 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2071,6 +2071,13 @@ public class CoreOptions implements Serializable {
                             "The duration after which a partition without new 
updates is considered a historical partition. "
                                     + "Historical partitions will be 
automatically fully clustered during the cluster operation.");
 
+    public static final ConfigOption<Boolean> 
CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE =
+            key("clustering.incremental.optimize-write")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether enable perform clustering before write 
phase when incremental clustering is enabled.");
+
     @Immutable
     public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
             key("row-tracking.enabled")
@@ -3402,6 +3409,10 @@ public class CoreOptions implements Serializable {
         return options.get(CLUSTERING_INCREMENTAL);
     }
 
+    public boolean clusteringIncrementalOptimizeWrite() {
+        return options.get(CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE);
+    }
+
     public boolean bucketClusterEnabled() {
         return !bucketAppendOrdered()
                 && !deletionVectorsEnabled()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index b7c37d525e..fb68793399 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
 import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
+import static 
org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE;
 import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
@@ -122,7 +123,8 @@ public abstract class FlinkTableSinkBase
                             new DataStream<>(
                                     dataStream.getExecutionEnvironment(),
                                     dataStream.getTransformation()));
-                    if (!conf.get(CLUSTERING_INCREMENTAL)) {
+                    if (!conf.get(CLUSTERING_INCREMENTAL)
+                            || 
conf.get(CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE)) {
                         builder.clusteringIfPossible(
                                 conf.get(CLUSTERING_COLUMNS),
                                 conf.get(CLUSTERING_STRATEGY),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java
index 2dce791b88..a842e7a298 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java
@@ -247,6 +247,34 @@ public class RangePartitionAndSortForAppendTableITCase 
extends CatalogITCaseBase
         Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
     }
 
+    @Test
+    public void testClusteringPreWriteEnabled() throws Exception {
+        List<Row> inputRows = generateSinkRows();
+        String id = TestValuesTableFactory.registerData(inputRows);
+        batchSql(
+                "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 
INT, col4 INT) WITH "
+                        + "('connector'='values', 'bounded'='true', 
'data-id'='%s')",
+                id);
+        batchSql(
+                "INSERT INTO test_table /*+ 
OPTIONS('sink.clustering.by-columns' = 'col1', "
+                        + "'sink.parallelism' = '10', 
'sink.clustering.strategy' = 'zorder', "
+                        + "'clustering.incremental' = 'true', 
'clustering.incremental.optimize-write' = 'true') */ "
+                        + "SELECT * FROM test_source");
+        List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+        assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+        FileStoreTable testStoreTable = paimonTable("test_table");
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(testStoreTable.rowType());
+        Predicate predicate = predicateBuilder.between(0, 100, 200);
+        List<ManifestEntry> files = 
testStoreTable.store().newScan().plan().files();
+        assertThat(files.size()).isEqualTo(10);
+        List<ManifestEntry> filesFilter =
+                ((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+        Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+    }
+
     private List<Row> generateSinkRows() {
         List<Row> sinkRows = new ArrayList<>();
         Random random = new Random();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ebdbf038ff..b4ba7efc09 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -303,7 +303,10 @@ case class PaimonSparkWriter(
           }
         }
         val clusteringColumns = coreOptions.clusteringColumns()
-        if ((!coreOptions.clusteringIncrementalEnabled()) && 
(!clusteringColumns.isEmpty)) {
+        if (
+          (!coreOptions.clusteringIncrementalEnabled() || coreOptions
+            .clusteringIncrementalOptimizeWrite()) && 
(!clusteringColumns.isEmpty)
+        ) {
           val strategy = 
coreOptions.clusteringStrategy(tableSchema.fields().size())
           val sorter = TableSorter.getSorter(table, strategy, 
clusteringColumns)
           input = sorter.sort(data)

Reply via email to