This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f3e6e85a77 [flink] Extract IncrementalClusterCompact from 
CompactAction (#6638)
f3e6e85a77 is described below

commit f3e6e85a7771fea0cc8a1627c16e54975a1e8681
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 20 14:59:48 2025 +0800

    [flink] Extract IncrementalClusterCompact from CompactAction (#6638)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  18 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   3 +
 .../append/cluster/IncrementalClusterManager.java  |   2 +-
 .../paimon/privilege/PrivilegedFileStore.java      |   6 +
 .../cluster/IncrementalClusterManagerTest.java     |   6 +-
 .../apache/paimon/flink/action/CompactAction.java  | 159 ++---------------
 .../paimon/flink/action/CompactDatabaseAction.java |   6 +-
 ...CompactBuilder.java => AppendTableCompact.java} |   5 +-
 .../flink/compact/IncrementalClusterCompact.java   | 194 +++++++++++++++++++++
 .../paimon/spark/procedure/CompactProcedure.java   |   2 +-
 10 files changed, 234 insertions(+), 167 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index e02098c0dd..4f48618ef8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -360,6 +360,15 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     public abstract Comparator<InternalRow> newKeyComparator();
 
+    @Override
+    public InternalRowPartitionComputer partitionComputer() {
+        return new InternalRowPartitionComputer(
+                options.partitionDefaultName(),
+                schema.logicalPartitionType(),
+                schema.partitionKeys().toArray(new String[0]),
+                options.legacyPartitionName());
+    }
+
     private List<CommitCallback> createCommitCallbacks(String commitUser, 
FileStoreTable table) {
         List<CommitCallback> callbacks =
                 new ArrayList<>(CallbackUtils.loadCommitCallbacks(options, 
table));
@@ -367,13 +376,8 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
         if (options.partitionedTableInMetastore() && 
!schema.partitionKeys().isEmpty()) {
             PartitionHandler partitionHandler = 
catalogEnvironment.partitionHandler();
             if (partitionHandler != null) {
-                InternalRowPartitionComputer partitionComputer =
-                        new InternalRowPartitionComputer(
-                                options.partitionDefaultName(),
-                                schema.logicalPartitionType(),
-                                schema.partitionKeys().toArray(new String[0]),
-                                options.legacyPartitionName());
-                callbacks.add(new AddPartitionCommitCallback(partitionHandler, 
partitionComputer));
+                callbacks.add(
+                        new AddPartitionCommitCallback(partitionHandler, 
partitionComputer()));
             }
         }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 929302673d..98905b47e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -41,6 +41,7 @@ import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -67,6 +68,8 @@ public interface FileStore<T> {
 
     RowType partitionType();
 
+    InternalRowPartitionComputer partitionComputer();
+
     CoreOptions options();
 
     BucketMode bucketMode();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 20dc2a49d2..1c8da3c033 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -123,7 +123,7 @@ public class IncrementalClusterManager {
                         specifiedPartitions);
     }
 
-    public Map<BinaryRow, CompactUnit> prepareForCluster(boolean 
fullCompaction) {
+    public Map<BinaryRow, CompactUnit> createCompactUnits(boolean 
fullCompaction) {
         // 1. construct LSM structure for each partition
         Map<BinaryRow, List<LevelSortedRun>> partitionLevels = 
constructLevels();
         logForPartitionLevel(partitionLevels, partitionComputer);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 99069ddc31..6ced52bbd1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -45,6 +45,7 @@ import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -92,6 +93,11 @@ public class PrivilegedFileStore<T> implements FileStore<T> {
         return wrapped.partitionType();
     }
 
+    @Override
+    public InternalRowPartitionComputer partitionComputer() {
+        return wrapped.partitionComputer();
+    }
+
     @Override
     public CoreOptions options() {
         return wrapped.options();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index a4d1071092..516fa63b55 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -192,14 +192,14 @@ public class IncrementalClusterManagerTest {
                                 RowType.of(DataTypes.INT()),
                                 
Lists.newArrayList(BinaryRow.singleColumn("pt3"))));
         Map<BinaryRow, CompactUnit> partitionLevels =
-                incrementalClusterManager.prepareForCluster(true);
+                incrementalClusterManager.createCompactUnits(true);
         assertThat(partitionLevels.size()).isEqualTo(2);
         
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt1"))).isNotNull();
         
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt3"))).isNotNull();
 
         // test don't specify partition and enable history partition auto 
clustering
         incrementalClusterManager = new IncrementalClusterManager(table);
-        partitionLevels = incrementalClusterManager.prepareForCluster(true);
+        partitionLevels = incrementalClusterManager.createCompactUnits(true);
         assertThat(partitionLevels.size()).isEqualTo(4);
 
         // test specify partition and disable history partition auto clustering
@@ -213,7 +213,7 @@ public class IncrementalClusterManagerTest {
                         PartitionPredicate.fromMultiple(
                                 RowType.of(DataTypes.INT()),
                                 
Lists.newArrayList(BinaryRow.singleColumn("pt3"))));
-        partitionLevels = incrementalClusterManager.prepareForCluster(true);
+        partitionLevels = incrementalClusterManager.createCompactUnits(true);
         assertThat(partitionLevels.size()).isEqualTo(1);
         
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt3"))).isNotNull();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 694b67d705..2e4442d9d7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -19,14 +19,11 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.append.cluster.IncrementalClusterManager;
-import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
-import 
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
-import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
+import org.apache.paimon.flink.compact.AppendTableCompact;
+import org.apache.paimon.flink.compact.IncrementalClusterCompact;
 import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
 import 
org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
 import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
@@ -36,10 +33,7 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder;
 import org.apache.paimon.flink.sink.FixedBucketSink;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
-import org.apache.paimon.flink.sink.RowAppendTableSink;
 import org.apache.paimon.flink.sink.RowDataChannelComputer;
-import org.apache.paimon.flink.sorter.TableSortInfo;
-import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
@@ -50,8 +44,6 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.PredicateProjectionConverter;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
@@ -77,7 +69,6 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -94,7 +85,7 @@ public class CompactAction extends TableActionBase {
 
     @Nullable protected Duration partitionIdleTime = null;
 
-    protected Boolean fullCompaction;
+    @Nullable protected Boolean fullCompaction;
 
     public CompactAction(
             String database,
@@ -151,20 +142,21 @@ public class CompactAction extends TableActionBase {
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-
+        PartitionPredicate partitionPredicate = getPartitionPredicate();
         if (fileStoreTable.coreOptions().bucket() == 
BucketMode.POSTPONE_BUCKET) {
-            return buildForPostponeBucketCompaction(env, fileStoreTable, 
isStreaming);
+            buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
         } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
             if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
-                return buildForIncrementalClustering(env, fileStoreTable, 
isStreaming);
+                new IncrementalClusterCompact(
+                                env, fileStoreTable, partitionPredicate, 
fullCompaction)
+                        .build();
             } else {
                 buildForAppendTableCompact(env, fileStoreTable, isStreaming);
-                return true;
             }
         } else {
             buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
-            return true;
         }
+        return true;
     }
 
     protected void buildForBucketedTableCompact(
@@ -206,144 +198,13 @@ public class CompactAction extends TableActionBase {
     protected void buildForAppendTableCompact(
             StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming)
             throws Exception {
-        AppendTableCompactBuilder builder =
-                new AppendTableCompactBuilder(env, identifier.getFullName(), 
table);
+        AppendTableCompact builder = new AppendTableCompact(env, 
identifier.getFullName(), table);
         builder.withPartitionPredicate(getPartitionPredicate());
         builder.withContinuousMode(isStreaming);
         builder.withPartitionIdleTime(partitionIdleTime);
         builder.build();
     }
 
-    protected boolean buildForIncrementalClustering(
-            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming)
-            throws Exception {
-        checkArgument(!isStreaming, "Incremental clustering currently only 
supports batch mode");
-
-        IncrementalClusterManager incrementalClusterManager =
-                new IncrementalClusterManager(table, getPartitionPredicate());
-
-        // non-full strategy as default for incremental clustering
-        if (fullCompaction == null) {
-            fullCompaction = false;
-        }
-        Options options = new Options(table.options());
-        int localSampleMagnification = 
table.coreOptions().getLocalSampleMagnification();
-        if (localSampleMagnification < 20) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "the config '%s=%d' should not be set too small, 
greater than or equal to 20 is needed.",
-                            
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
-                            localSampleMagnification));
-        }
-        String commitUser = CoreOptions.createCommitUser(options);
-        InternalRowPartitionComputer partitionComputer =
-                new InternalRowPartitionComputer(
-                        table.coreOptions().partitionDefaultName(),
-                        table.store().partitionType(),
-                        table.partitionKeys().toArray(new String[0]),
-                        table.coreOptions().legacyPartitionName());
-
-        // 1. pick cluster files for each partition
-        Map<BinaryRow, CompactUnit> compactUnits =
-                incrementalClusterManager.prepareForCluster(fullCompaction);
-        if (compactUnits.isEmpty()) {
-            LOGGER.warn(
-                    "No partition needs to be incrementally clustered. "
-                            + "Please set '--compact_strategy full' if you 
need forcibly trigger the cluster."
-                            + "Please set '--force_start_flink_job true' if 
you need forcibly start a flink job.");
-            if (this.forceStartFlinkJob) {
-                env.fromSequence(0, 0)
-                        .name("Nothing to Cluster Source")
-                        .sinkTo(new DiscardingSink<>());
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits =
-                
incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits);
-
-        // 2. read,sort and write in partition
-        List<DataStream<Committable>> dataStreams = new ArrayList<>();
-
-        for (Map.Entry<BinaryRow, Pair<List<DataSplit>, CommitMessage>> entry :
-                partitionSplits.entrySet()) {
-            BinaryRow partition = entry.getKey();
-            List<DataSplit> splits = entry.getValue().getKey();
-            CommitMessage dvCommitMessage = entry.getValue().getRight();
-            LinkedHashMap<String, String> partitionSpec =
-                    partitionComputer.generatePartValues(partition);
-
-            // 2.1 generate source for current partition
-            Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
-                    IncrementalClusterSplitSource.buildSource(
-                            env,
-                            table,
-                            partitionSpec,
-                            splits,
-                            dvCommitMessage,
-                            
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
-
-            // 2.2 cluster in partition
-            Integer sinkParallelism = 
options.get(FlinkConnectorOptions.SINK_PARALLELISM);
-            if (sinkParallelism == null) {
-                sinkParallelism = sourcePair.getLeft().getParallelism();
-            }
-            TableSortInfo sortInfo =
-                    new TableSortInfo.Builder()
-                            
.setSortColumns(incrementalClusterManager.clusterKeys())
-                            
.setSortStrategy(incrementalClusterManager.clusterCurve())
-                            .setSinkParallelism(sinkParallelism)
-                            .setLocalSampleSize(sinkParallelism * 
localSampleMagnification)
-                            .setGlobalSampleSize(sinkParallelism * 1000)
-                            .setRangeNumber(sinkParallelism * 10)
-                            .build();
-            DataStream<RowData> sorted =
-                    TableSorter.getSorter(env, sourcePair.getLeft(), table, 
sortInfo).sort();
-
-            // 2.3 write and then reorganize the committable
-            // set parallelism to null, and it'll forward parallelism when 
doWrite()
-            RowAppendTableSink sink = new RowAppendTableSink(table, null, 
null, null);
-            boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
-            DataStream<Committable> written =
-                    sink.doWrite(
-                            FlinkSinkBuilder.mapToInternalRow(
-                                    sorted,
-                                    table.rowType(),
-                                    blobAsDescriptor,
-                                    
table.catalogEnvironment().catalogContext()),
-                            commitUser,
-                            null);
-            DataStream<Committable> clusterCommittable =
-                    written.forward()
-                            .transform(
-                                    "Rewrite cluster committable",
-                                    new CommittableTypeInfo(),
-                                    new 
RewriteIncrementalClusterCommittableOperator(
-                                            table,
-                                            compactUnits.entrySet().stream()
-                                                    .collect(
-                                                            Collectors.toMap(
-                                                                    
Map.Entry::getKey,
-                                                                    unit ->
-                                                                            
unit.getValue()
-                                                                               
     .outputLevel()))))
-                            .setParallelism(written.getParallelism());
-            dataStreams.add(clusterCommittable);
-            dataStreams.add(sourcePair.getRight());
-        }
-
-        // 3. commit
-        RowAppendTableSink sink = new RowAppendTableSink(table, null, null, 
null);
-        DataStream<Committable> dataStream = dataStreams.get(0);
-        for (int i = 1; i < dataStreams.size(); i++) {
-            dataStream = dataStream.union(dataStreams.get(i));
-        }
-        sink.doCommit(dataStream, commitUser);
-        return true;
-    }
-
     protected PartitionPredicate getPartitionPredicate() throws Exception {
         checkArgument(
                 partitions == null || whereSql == null,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index a438a5f895..1a65b8d1bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -23,7 +23,7 @@ import org.apache.paimon.append.MultiTableAppendCompactTask;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
+import org.apache.paimon.flink.compact.AppendTableCompact;
 import org.apache.paimon.flink.sink.BucketsRowChannelComputer;
 import org.apache.paimon.flink.sink.CombinedTableCompactorSink;
 import org.apache.paimon.flink.sink.CompactorSinkBuilder;
@@ -278,8 +278,8 @@ public class CompactDatabaseAction extends ActionBase {
 
     private void buildForUnawareBucketCompaction(
             StreamExecutionEnvironment env, String fullName, FileStoreTable 
table) {
-        AppendTableCompactBuilder unawareBucketCompactionTopoBuilder =
-                new AppendTableCompactBuilder(env, fullName, table);
+        AppendTableCompact unawareBucketCompactionTopoBuilder =
+                new AppendTableCompact(env, fullName, table);
 
         unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
         
unawareBucketCompactionTopoBuilder.withPartitionIdleTime(partitionIdleTime);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java
index 8cd9bca856..a9037e8428 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
  * snapshot. We need to enable checkpoint for this compaction job, checkpoint 
will trigger committer
  * stage to commit all the compacted files.
  */
-public class AppendTableCompactBuilder {
+public class AppendTableCompact {
 
     private final transient StreamExecutionEnvironment env;
     private final String tableIdentifier;
@@ -65,7 +65,7 @@ public class AppendTableCompactBuilder {
     @Nullable private PartitionPredicate partitionPredicate;
     @Nullable private Duration partitionIdleTime = null;
 
-    public AppendTableCompactBuilder(
+    public AppendTableCompact(
             StreamExecutionEnvironment env, String tableIdentifier, 
FileStoreTable table) {
         this.env = env;
         this.tableIdentifier = tableIdentifier;
@@ -120,7 +120,6 @@ public class AppendTableCompactBuilder {
     }
 
     private DataStreamSource<AppendCompactTask> buildSource() {
-
         long scanInterval = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
         AppendTableCompactSource source =
                 new AppendTableCompactSource(table, isContinuous, 
scanInterval, partitionPredicate);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
new file mode 100644
index 0000000000..c7fef70266
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
+import 
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.RowAppendTableSink;
+import org.apache.paimon.flink.sorter.TableSortInfo;
+import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
+
+/** Compact for incremental clustering. */
+public class IncrementalClusterCompact {
+
+    protected final StreamExecutionEnvironment env;
+    protected final FileStoreTable table;
+    protected final IncrementalClusterManager clusterManager;
+    protected final String commitUser;
+    protected final InternalRowPartitionComputer partitionComputer;
+
+    protected final @Nullable Integer parallelism;
+    protected final int localSampleMagnification;
+    protected final Map<BinaryRow, CompactUnit> compactUnits;
+    protected final Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> 
compactSplits;
+
+    public IncrementalClusterCompact(
+            StreamExecutionEnvironment env,
+            FileStoreTable table,
+            PartitionPredicate partitionPredicate,
+            @Nullable Boolean fullCompaction) {
+        this.env = env;
+        this.table = table;
+        this.clusterManager = new IncrementalClusterManager(table, 
partitionPredicate);
+        Options options = new Options(table.options());
+        this.partitionComputer = table.store().partitionComputer();
+        this.commitUser = CoreOptions.createCommitUser(options);
+        this.parallelism = options.get(SCAN_PARALLELISM);
+        this.localSampleMagnification = 
table.coreOptions().getLocalSampleMagnification();
+        if (localSampleMagnification < 20) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "the config '%s=%d' should not be set too small, 
greater than or equal to 20 is needed.",
+                            
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+                            localSampleMagnification));
+        }
+        // non-full strategy as default for incremental clustering
+        this.compactUnits =
+                clusterManager.createCompactUnits(fullCompaction != null && 
fullCompaction);
+        this.compactSplits = 
clusterManager.toSplitsAndRewriteDvFiles(compactUnits);
+    }
+
+    public void build() throws Exception {
+        if (compactUnits.isEmpty()) {
+            env.fromSequence(0, 0).name("Nothing to Cluster 
Source").sinkTo(new DiscardingSink<>());
+            return;
+        }
+
+        List<DataStream<Committable>> dataStreams = new ArrayList<>();
+        for (Map.Entry<BinaryRow, Pair<List<DataSplit>, CommitMessage>> entry :
+                compactSplits.entrySet()) {
+            dataStreams.addAll(
+                    buildCompactOperator(
+                            entry.getKey(),
+                            entry.getValue().getKey(),
+                            entry.getValue().getRight(),
+                            parallelism));
+        }
+
+        buildCommitOperator(dataStreams);
+    }
+
+    /**
+     * Build for one partition.
+     *
+     * @param parallelism Give the caller the opportunity to set parallelism
+     */
+    protected List<DataStream<Committable>> buildCompactOperator(
+            BinaryRow partition,
+            List<DataSplit> splits,
+            CommitMessage dvCommitMessage,
+            @Nullable Integer parallelism) {
+        LinkedHashMap<String, String> partitionSpec =
+                partitionComputer.generatePartValues(partition);
+
+        // 2.1 generate source for current partition
+        Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
+                IncrementalClusterSplitSource.buildSource(
+                        env, table, partitionSpec, splits, dvCommitMessage, 
parallelism);
+
+        // 2.2 cluster in partition
+        Integer sinkParallelism = parallelism;
+        if (sinkParallelism == null) {
+            sinkParallelism = sourcePair.getLeft().getParallelism();
+        }
+        TableSortInfo sortInfo =
+                new TableSortInfo.Builder()
+                        .setSortColumns(clusterManager.clusterKeys())
+                        .setSortStrategy(clusterManager.clusterCurve())
+                        .setSinkParallelism(sinkParallelism)
+                        .setLocalSampleSize(sinkParallelism * 
localSampleMagnification)
+                        .setGlobalSampleSize(sinkParallelism * 1000)
+                        .setRangeNumber(sinkParallelism * 10)
+                        .build();
+        DataStream<RowData> sorted =
+                TableSorter.getSorter(env, sourcePair.getLeft(), table, 
sortInfo).sort();
+
+        // 2.3 write and then reorganize the committable
+        // set parallelism to null, and it'll forward parallelism when 
doWrite()
+        RowAppendTableSink sink = new RowAppendTableSink(table, null, null, 
null);
+        boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+        DataStream<Committable> written =
+                sink.doWrite(
+                        FlinkSinkBuilder.mapToInternalRow(
+                                sorted,
+                                table.rowType(),
+                                blobAsDescriptor,
+                                table.catalogEnvironment().catalogContext()),
+                        commitUser,
+                        null);
+        DataStream<Committable> clusterCommittable =
+                written.forward()
+                        .transform(
+                                "Rewrite cluster committable",
+                                new CommittableTypeInfo(),
+                                new 
RewriteIncrementalClusterCommittableOperator(
+                                        table,
+                                        compactUnits.entrySet().stream()
+                                                .collect(
+                                                        Collectors.toMap(
+                                                                
Map.Entry::getKey,
+                                                                unit ->
+                                                                        
unit.getValue()
+                                                                               
 .outputLevel()))))
+                        .setParallelism(written.getParallelism());
+
+        List<DataStream<Committable>> dataStreams = new ArrayList<>();
+        dataStreams.add(clusterCommittable);
+        dataStreams.add(sourcePair.getRight());
+        return dataStreams;
+    }
+
+    protected void buildCommitOperator(List<DataStream<Committable>> 
dataStreams) {
+        RowAppendTableSink sink = new RowAppendTableSink(table, null, null, 
null);
+        DataStream<Committable> dataStream = dataStreams.get(0);
+        for (int i = 1; i < dataStreams.size(); i++) {
+            dataStream = dataStream.union(dataStreams.get(i));
+        }
+        sink.doCommit(dataStream, commitUser);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 8e98727332..93305a2aec 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -533,7 +533,7 @@ public class CompactProcedure extends BaseProcedure {
             FileStoreTable table, boolean fullCompaction, DataSourceV2Relation 
relation) {
         IncrementalClusterManager incrementalClusterManager = new 
IncrementalClusterManager(table);
         Map<BinaryRow, CompactUnit> compactUnits =
-                incrementalClusterManager.prepareForCluster(fullCompaction);
+                incrementalClusterManager.createCompactUnits(fullCompaction);
 
         Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits =
                 
incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits);

Reply via email to