This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 725366cab4 [flink] Incremental Clustering support specify partitions
(#6449)
725366cab4 is described below
commit 725366cab4b62543b3a8547d48f9f644db1fc87f
Author: LsomeYeah <[email protected]>
AuthorDate: Wed Oct 22 13:22:02 2025 +0800
[flink] Incremental Clustering support specify partitions (#6449)
---
.../append/cluster/IncrementalClusterManager.java | 11 +++++-
.../apache/paimon/flink/action/CompactAction.java | 11 +++---
.../action/IncrementalClusterActionITCase.java | 42 ++++++++++++++++++++++
3 files changed, 56 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 407f6b17ef..1aa4d2b13e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
@@ -33,6 +34,8 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -58,11 +61,17 @@ public class IncrementalClusterManager {
private int maxLevel;
public IncrementalClusterManager(FileStoreTable table) {
+ this(table, null);
+ }
+
+ public IncrementalClusterManager(
+ FileStoreTable table, @Nullable PartitionPredicate
specifiedPartitions) {
checkArgument(
table.bucketMode() == BucketMode.BUCKET_UNAWARE,
"only append unaware-bucket table support incremental
clustering.");
// drop stats to reduce memory usage
- this.snapshotReader = table.newSnapshotReader().dropStats();
+ this.snapshotReader =
+
table.newSnapshotReader().withPartitionFilter(specifiedPartitions).dropStats();
CoreOptions options = table.coreOptions();
checkArgument(
options.clusteringIncrementalEnabled(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 524c862517..03fe46a850 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -213,15 +213,12 @@ public class CompactAction extends TableActionBase {
}
private boolean buildForIncrementalClustering(
- StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming) {
+ StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming)
+ throws Exception {
checkArgument(!isStreaming, "Incremental clustering currently only
supports batch mode");
- checkArgument(
- partitions == null,
- "Incremental clustering currently does not support specifying
partitions");
- checkArgument(
- whereSql == null, "Incremental clustering currently does not
support predicates");
- IncrementalClusterManager incrementalClusterManager = new
IncrementalClusterManager(table);
+ IncrementalClusterManager incrementalClusterManager =
+ new IncrementalClusterManager(table, getPartitionPredicate());
// non-full strategy as default for incremental clustering
if (fullCompaction == null) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index ff59915ccf..7629ebf81d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -319,6 +319,48 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
assertThat(result4).containsExactlyElementsOf(expected4);
}
+ @Test
+ public void testClusterSpecifyPartition() throws Exception {
+ FileStoreTable table = createTable("pt", 1);
+
+ BinaryString randomStr = BinaryString.fromString(randomString(150));
+ List<CommitMessage> messages = new ArrayList<>();
+
+ // first write
+ List<String> expected1 = new ArrayList<>();
+ for (int pt = 0; pt < 2; pt++) {
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ messages.addAll(write(GenericRow.of(i, j, (pt == 0) ?
randomStr : null, pt)));
+ expected1.add(String.format("+I[%s, %s, %s]", i, j, pt));
+ }
+ }
+ }
+ commit(messages);
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1, 3});
+ List<String> result1 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ assertThat(result1).containsExactlyElementsOf(expected1);
+
+ runAction(Lists.newArrayList("--partition", "pt=0",
"--compact_strategy", "full"));
+ checkSnapshot(table);
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(2);
+ for (Split split : splits) {
+ DataSplit dataSplit = (DataSplit) split;
+ if (dataSplit.partition().getInt(0) == 0) {
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+ assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5);
+ } else {
+ assertThat(dataSplit.dataFiles().size()).isGreaterThan(1);
+ assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(0);
+ }
+ }
+ }
+
@Test
public void testClusterOnEmptyData() throws Exception {
createTable("pt", 1);