This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 59c6ae019acee6bfcf8d176a953bc9db0065379c
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Oct 20 19:21:10 2025 +0800

    [flink] disable clustering during writing if incremental clustering enabled 
(#6432)
---
 .../org/apache/paimon/flink/sink/FlinkTableSinkBase.java  | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index bf2bee15dc..051bc91a67 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -44,6 +44,7 @@ import java.util.Map;
 
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
+import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
 import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
@@ -134,12 +135,14 @@ public abstract class FlinkTableSinkBase
                             .forRowData(
                                     new DataStream<>(
                                             
dataStream.getExecutionEnvironment(),
-                                            dataStream.getTransformation()))
-                            .clusteringIfPossible(
-                                    conf.get(CLUSTERING_COLUMNS),
-                                    conf.get(CLUSTERING_STRATEGY),
-                                    conf.get(CLUSTERING_SORT_IN_CLUSTER),
-                                    conf.get(CLUSTERING_SAMPLE_FACTOR));
+                                            dataStream.getTransformation()));
+                    if (!conf.get(CLUSTERING_INCREMENTAL)) {
+                        builder.clusteringIfPossible(
+                                conf.get(CLUSTERING_COLUMNS),
+                                conf.get(CLUSTERING_STRATEGY),
+                                conf.get(CLUSTERING_SORT_IN_CLUSTER),
+                                conf.get(CLUSTERING_SAMPLE_FACTOR));
+                    }
                     if (overwrite) {
                         builder.overwrite(staticPartitions);
                     }

Reply via email to