This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 725366cab4 [flink] Incremental Clustering support specify partitions 
(#6449)
725366cab4 is described below

commit 725366cab4b62543b3a8547d48f9f644db1fc87f
Author: LsomeYeah <[email protected]>
AuthorDate: Wed Oct 22 13:22:02 2025 +0800

    [flink] Incremental Clustering support specify partitions (#6449)
---
 .../append/cluster/IncrementalClusterManager.java  | 11 +++++-
 .../apache/paimon/flink/action/CompactAction.java  | 11 +++---
 .../action/IncrementalClusterActionITCase.java     | 42 ++++++++++++++++++++++
 3 files changed, 56 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 407f6b17ef..1aa4d2b13e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.LevelSortedRun;
 import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
@@ -33,6 +34,8 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -58,11 +61,17 @@ public class IncrementalClusterManager {
     private int maxLevel;
 
     public IncrementalClusterManager(FileStoreTable table) {
+        this(table, null);
+    }
+
+    public IncrementalClusterManager(
+            FileStoreTable table, @Nullable PartitionPredicate 
specifiedPartitions) {
         checkArgument(
                 table.bucketMode() == BucketMode.BUCKET_UNAWARE,
                 "only append unaware-bucket table support incremental 
clustering.");
         // drop stats to reduce memory usage
-        this.snapshotReader = table.newSnapshotReader().dropStats();
+        this.snapshotReader =
+                
table.newSnapshotReader().withPartitionFilter(specifiedPartitions).dropStats();
         CoreOptions options = table.coreOptions();
         checkArgument(
                 options.clusteringIncrementalEnabled(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 524c862517..03fe46a850 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -213,15 +213,12 @@ public class CompactAction extends TableActionBase {
     }
 
     private boolean buildForIncrementalClustering(
-            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming) {
+            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming)
+            throws Exception {
         checkArgument(!isStreaming, "Incremental clustering currently only 
supports batch mode");
-        checkArgument(
-                partitions == null,
-                "Incremental clustering currently does not support specifying 
partitions");
-        checkArgument(
-                whereSql == null, "Incremental clustering currently does not 
support predicates");
 
-        IncrementalClusterManager incrementalClusterManager = new 
IncrementalClusterManager(table);
+        IncrementalClusterManager incrementalClusterManager =
+                new IncrementalClusterManager(table, getPartitionPredicate());
 
         // non-full strategy as default for incremental clustering
         if (fullCompaction == null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index ff59915ccf..7629ebf81d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -319,6 +319,48 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         assertThat(result4).containsExactlyElementsOf(expected4);
     }
 
+    @Test
+    public void testClusterSpecifyPartition() throws Exception {
+        FileStoreTable table = createTable("pt", 1);
+
+        BinaryString randomStr = BinaryString.fromString(randomString(150));
+        List<CommitMessage> messages = new ArrayList<>();
+
+        // first write
+        List<String> expected1 = new ArrayList<>();
+        for (int pt = 0; pt < 2; pt++) {
+            for (int i = 0; i < 3; i++) {
+                for (int j = 0; j < 3; j++) {
+                    messages.addAll(write(GenericRow.of(i, j, (pt == 0) ? 
randomStr : null, pt)));
+                    expected1.add(String.format("+I[%s, %s, %s]", i, j, pt));
+                }
+            }
+        }
+        commit(messages);
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1, 3});
+        List<String> result1 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        assertThat(result1).containsExactlyElementsOf(expected1);
+
+        runAction(Lists.newArrayList("--partition", "pt=0", 
"--compact_strategy", "full"));
+        checkSnapshot(table);
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(2);
+        for (Split split : splits) {
+            DataSplit dataSplit = (DataSplit) split;
+            if (dataSplit.partition().getInt(0) == 0) {
+                assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+                assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5);
+            } else {
+                assertThat(dataSplit.dataFiles().size()).isGreaterThan(1);
+                assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(0);
+            }
+        }
+    }
+
     @Test
     public void testClusterOnEmptyData() throws Exception {
         createTable("pt", 1);

Reply via email to