This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b318e3c41b [flink] disable clustering during writing if incremental
clustering enabled (#6432)
b318e3c41b is described below
commit b318e3c41b5ad87ee3b3c001f6948ba82ac9b461
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Oct 20 19:21:10 2025 +0800
[flink] disable clustering during writing if incremental clustering enabled
(#6432)
---
.../org/apache/paimon/flink/sink/FlinkTableSinkBase.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index bf2bee15dc..051bc91a67 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -44,6 +44,7 @@ import java.util.Map;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
+import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
@@ -134,12 +135,14 @@ public abstract class FlinkTableSinkBase
.forRowData(
new DataStream<>(
dataStream.getExecutionEnvironment(),
- dataStream.getTransformation()))
- .clusteringIfPossible(
- conf.get(CLUSTERING_COLUMNS),
- conf.get(CLUSTERING_STRATEGY),
- conf.get(CLUSTERING_SORT_IN_CLUSTER),
- conf.get(CLUSTERING_SAMPLE_FACTOR));
+ dataStream.getTransformation()));
+ if (!conf.get(CLUSTERING_INCREMENTAL)) {
+ builder.clusteringIfPossible(
+ conf.get(CLUSTERING_COLUMNS),
+ conf.get(CLUSTERING_STRATEGY),
+ conf.get(CLUSTERING_SORT_IN_CLUSTER),
+ conf.get(CLUSTERING_SAMPLE_FACTOR));
+ }
if (overwrite) {
builder.overwrite(staticPartitions);
}