This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 24ff42103a [core] Enable clustering before write phase for incremental
clustering table (#7331)
24ff42103a is described below
commit 24ff42103afc11512d2d59c19e9853b95e18164f
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Mar 3 10:41:03 2026 +0800
[core] Enable clustering before write phase for incremental clustering
table (#7331)
---
.../shortcodes/generated/core_configuration.html | 18 +++++++++-----
.../main/java/org/apache/paimon/CoreOptions.java | 11 +++++++++
.../paimon/flink/sink/FlinkTableSinkBase.java | 4 +++-
.../RangePartitionAndSortForAppendTableITCase.java | 28 ++++++++++++++++++++++
.../paimon/spark/commands/PaimonSparkWriter.scala | 5 +++-
5 files changed, 58 insertions(+), 8 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9d6c1948dd..4f012492d8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -56,6 +56,12 @@ under the License.
<td>Boolean</td>
<td>Write blob field using blob descriptor rather than blob
bytes.</td>
</tr>
+ <tr>
+ <td><h5>blob-descriptor-field</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Comma-separated BLOB field names to store as serialized
BlobDescriptor bytes inline in data files.</td>
+ </tr>
<tr>
<td><h5>blob-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -68,12 +74,6 @@ under the License.
<td>Boolean</td>
<td>Whether to consider blob file size as a factor when performing
scan splitting.</td>
</tr>
- <tr>
- <td><h5>blob-descriptor-field</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>Comma-separated BLOB field names to store as serialized
BlobDescriptor bytes inline in data files.</td>
- </tr>
<tr>
<td><h5>blob.target-file-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -200,6 +200,12 @@ under the License.
<td>Boolean</td>
<td>Whether enable incremental clustering.</td>
</tr>
+ <tr>
+ <td><h5>clustering.incremental.optimize-write</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether enable perform clustering before write phase when
incremental clustering is enabled.</td>
+ </tr>
<tr>
<td><h5>clustering.strategy</h5></td>
<td style="word-wrap: break-word;">"auto"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 34e680433b..ca3c576512 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2071,6 +2071,13 @@ public class CoreOptions implements Serializable {
"The duration after which a partition without new
updates is considered a historical partition. "
+ "Historical partitions will be
automatically fully clustered during the cluster operation.");
+ public static final ConfigOption<Boolean>
CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE =
+ key("clustering.incremental.optimize-write")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether enable perform clustering before write
phase when incremental clustering is enabled.");
+
@Immutable
public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
key("row-tracking.enabled")
@@ -3402,6 +3409,10 @@ public class CoreOptions implements Serializable {
return options.get(CLUSTERING_INCREMENTAL);
}
+ public boolean clusteringIncrementalOptimizeWrite() {
+ return options.get(CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE);
+ }
+
public boolean bucketClusterEnabled() {
return !bucketAppendOrdered()
&& !deletionVectorsEnabled()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index b7c37d525e..fb68793399 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -41,6 +41,7 @@ import java.util.Map;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
+import static
org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE;
import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
@@ -122,7 +123,8 @@ public abstract class FlinkTableSinkBase
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()));
- if (!conf.get(CLUSTERING_INCREMENTAL)) {
+ if (!conf.get(CLUSTERING_INCREMENTAL)
+ ||
conf.get(CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE)) {
builder.clusteringIfPossible(
conf.get(CLUSTERING_COLUMNS),
conf.get(CLUSTERING_STRATEGY),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java
index 2dce791b88..a842e7a298 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java
@@ -247,6 +247,34 @@ public class RangePartitionAndSortForAppendTableITCase
extends CatalogITCaseBase
Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
}
+ @Test
+ public void testClusteringPreWriteEnabled() throws Exception {
+ List<Row> inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3
INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true',
'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+
OPTIONS('sink.clustering.by-columns' = 'col1', "
+ + "'sink.parallelism' = '10',
'sink.clustering.strategy' = 'zorder', "
+ + "'clustering.incremental' = 'true',
'clustering.incremental.optimize-write' = 'true') */ "
+ + "SELECT * FROM test_source");
+ List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(testStoreTable.rowType());
+ Predicate predicate = predicateBuilder.between(0, 100, 200);
+ List<ManifestEntry> files =
testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List<ManifestEntry> filesFilter =
+ ((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
+ .withFilter(predicate)
+ .plan()
+ .files();
+ Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+ }
+
private List<Row> generateSinkRows() {
List<Row> sinkRows = new ArrayList<>();
Random random = new Random();
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ebdbf038ff..b4ba7efc09 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -303,7 +303,10 @@ case class PaimonSparkWriter(
}
}
val clusteringColumns = coreOptions.clusteringColumns()
- if ((!coreOptions.clusteringIncrementalEnabled()) &&
(!clusteringColumns.isEmpty)) {
+ if (
+ (!coreOptions.clusteringIncrementalEnabled() || coreOptions
+ .clusteringIncrementalOptimizeWrite()) &&
(!clusteringColumns.isEmpty)
+ ) {
val strategy =
coreOptions.clusteringStrategy(tableSchema.fields().size())
val sorter = TableSorter.getSorter(table, strategy,
clusteringColumns)
input = sorter.sort(data)