This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f3e6e85a77 [flink] Extract IncrementalClusterCompact from
CompactAction (#6638)
f3e6e85a77 is described below
commit f3e6e85a7771fea0cc8a1627c16e54975a1e8681
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 20 14:59:48 2025 +0800
[flink] Extract IncrementalClusterCompact from CompactAction (#6638)
---
.../java/org/apache/paimon/AbstractFileStore.java | 18 +-
.../src/main/java/org/apache/paimon/FileStore.java | 3 +
.../append/cluster/IncrementalClusterManager.java | 2 +-
.../paimon/privilege/PrivilegedFileStore.java | 6 +
.../cluster/IncrementalClusterManagerTest.java | 6 +-
.../apache/paimon/flink/action/CompactAction.java | 159 ++---------------
.../paimon/flink/action/CompactDatabaseAction.java | 6 +-
...CompactBuilder.java => AppendTableCompact.java} | 5 +-
.../flink/compact/IncrementalClusterCompact.java | 194 +++++++++++++++++++++
.../paimon/spark/procedure/CompactProcedure.java | 2 +-
10 files changed, 234 insertions(+), 167 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index e02098c0dd..4f48618ef8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -360,6 +360,15 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
public abstract Comparator<InternalRow> newKeyComparator();
+ @Override
+ public InternalRowPartitionComputer partitionComputer() {
+ return new InternalRowPartitionComputer(
+ options.partitionDefaultName(),
+ schema.logicalPartitionType(),
+ schema.partitionKeys().toArray(new String[0]),
+ options.legacyPartitionName());
+ }
+
private List<CommitCallback> createCommitCallbacks(String commitUser,
FileStoreTable table) {
List<CommitCallback> callbacks =
new ArrayList<>(CallbackUtils.loadCommitCallbacks(options,
table));
@@ -367,13 +376,8 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
if (options.partitionedTableInMetastore() &&
!schema.partitionKeys().isEmpty()) {
PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
if (partitionHandler != null) {
- InternalRowPartitionComputer partitionComputer =
- new InternalRowPartitionComputer(
- options.partitionDefaultName(),
- schema.logicalPartitionType(),
- schema.partitionKeys().toArray(new String[0]),
- options.legacyPartitionName());
- callbacks.add(new AddPartitionCommitCallback(partitionHandler,
partitionComputer));
+ callbacks.add(
+ new AddPartitionCommitCallback(partitionHandler,
partitionComputer()));
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 929302673d..98905b47e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -41,6 +41,7 @@ import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -67,6 +68,8 @@ public interface FileStore<T> {
RowType partitionType();
+ InternalRowPartitionComputer partitionComputer();
+
CoreOptions options();
BucketMode bucketMode();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 20dc2a49d2..1c8da3c033 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -123,7 +123,7 @@ public class IncrementalClusterManager {
specifiedPartitions);
}
- public Map<BinaryRow, CompactUnit> prepareForCluster(boolean
fullCompaction) {
+ public Map<BinaryRow, CompactUnit> createCompactUnits(boolean
fullCompaction) {
// 1. construct LSM structure for each partition
Map<BinaryRow, List<LevelSortedRun>> partitionLevels =
constructLevels();
logForPartitionLevel(partitionLevels, partitionComputer);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 99069ddc31..6ced52bbd1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -45,6 +45,7 @@ import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -92,6 +93,11 @@ public class PrivilegedFileStore<T> implements FileStore<T> {
return wrapped.partitionType();
}
+ @Override
+ public InternalRowPartitionComputer partitionComputer() {
+ return wrapped.partitionComputer();
+ }
+
@Override
public CoreOptions options() {
return wrapped.options();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index a4d1071092..516fa63b55 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -192,14 +192,14 @@ public class IncrementalClusterManagerTest {
RowType.of(DataTypes.INT()),
Lists.newArrayList(BinaryRow.singleColumn("pt3"))));
Map<BinaryRow, CompactUnit> partitionLevels =
- incrementalClusterManager.prepareForCluster(true);
+ incrementalClusterManager.createCompactUnits(true);
assertThat(partitionLevels.size()).isEqualTo(2);
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt1"))).isNotNull();
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt3"))).isNotNull();
// test don't specify partition and enable history partition auto
clustering
incrementalClusterManager = new IncrementalClusterManager(table);
- partitionLevels = incrementalClusterManager.prepareForCluster(true);
+ partitionLevels = incrementalClusterManager.createCompactUnits(true);
assertThat(partitionLevels.size()).isEqualTo(4);
// test specify partition and disable history partition auto clustering
@@ -213,7 +213,7 @@ public class IncrementalClusterManagerTest {
PartitionPredicate.fromMultiple(
RowType.of(DataTypes.INT()),
Lists.newArrayList(BinaryRow.singleColumn("pt3"))));
- partitionLevels = incrementalClusterManager.prepareForCluster(true);
+ partitionLevels = incrementalClusterManager.createCompactUnits(true);
assertThat(partitionLevels.size()).isEqualTo(1);
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt3"))).isNotNull();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 694b67d705..2e4442d9d7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -19,14 +19,11 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.append.cluster.IncrementalClusterManager;
-import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
-import
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
-import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
+import org.apache.paimon.flink.compact.AppendTableCompact;
+import org.apache.paimon.flink.compact.IncrementalClusterCompact;
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
import
org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
@@ -36,10 +33,7 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
-import org.apache.paimon.flink.sink.RowAppendTableSink;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
-import org.apache.paimon.flink.sorter.TableSortInfo;
-import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
@@ -50,8 +44,6 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
@@ -77,7 +69,6 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -94,7 +85,7 @@ public class CompactAction extends TableActionBase {
@Nullable protected Duration partitionIdleTime = null;
- protected Boolean fullCompaction;
+ @Nullable protected Boolean fullCompaction;
public CompactAction(
String database,
@@ -151,20 +142,21 @@ public class CompactAction extends TableActionBase {
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
FileStoreTable fileStoreTable = (FileStoreTable) table;
-
+ PartitionPredicate partitionPredicate = getPartitionPredicate();
if (fileStoreTable.coreOptions().bucket() ==
BucketMode.POSTPONE_BUCKET) {
- return buildForPostponeBucketCompaction(env, fileStoreTable,
isStreaming);
+ buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
- return buildForIncrementalClustering(env, fileStoreTable,
isStreaming);
+ new IncrementalClusterCompact(
+ env, fileStoreTable, partitionPredicate,
fullCompaction)
+ .build();
} else {
buildForAppendTableCompact(env, fileStoreTable, isStreaming);
- return true;
}
} else {
buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
- return true;
}
+ return true;
}
protected void buildForBucketedTableCompact(
@@ -206,144 +198,13 @@ public class CompactAction extends TableActionBase {
protected void buildForAppendTableCompact(
StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming)
throws Exception {
- AppendTableCompactBuilder builder =
- new AppendTableCompactBuilder(env, identifier.getFullName(),
table);
+ AppendTableCompact builder = new AppendTableCompact(env,
identifier.getFullName(), table);
builder.withPartitionPredicate(getPartitionPredicate());
builder.withContinuousMode(isStreaming);
builder.withPartitionIdleTime(partitionIdleTime);
builder.build();
}
- protected boolean buildForIncrementalClustering(
- StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming)
- throws Exception {
- checkArgument(!isStreaming, "Incremental clustering currently only
supports batch mode");
-
- IncrementalClusterManager incrementalClusterManager =
- new IncrementalClusterManager(table, getPartitionPredicate());
-
- // non-full strategy as default for incremental clustering
- if (fullCompaction == null) {
- fullCompaction = false;
- }
- Options options = new Options(table.options());
- int localSampleMagnification =
table.coreOptions().getLocalSampleMagnification();
- if (localSampleMagnification < 20) {
- throw new IllegalArgumentException(
- String.format(
- "the config '%s=%d' should not be set too small,
greater than or equal to 20 is needed.",
-
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
- localSampleMagnification));
- }
- String commitUser = CoreOptions.createCommitUser(options);
- InternalRowPartitionComputer partitionComputer =
- new InternalRowPartitionComputer(
- table.coreOptions().partitionDefaultName(),
- table.store().partitionType(),
- table.partitionKeys().toArray(new String[0]),
- table.coreOptions().legacyPartitionName());
-
- // 1. pick cluster files for each partition
- Map<BinaryRow, CompactUnit> compactUnits =
- incrementalClusterManager.prepareForCluster(fullCompaction);
- if (compactUnits.isEmpty()) {
- LOGGER.warn(
- "No partition needs to be incrementally clustered. "
- + "Please set '--compact_strategy full' if you
need forcibly trigger the cluster."
- + "Please set '--force_start_flink_job true' if
you need forcibly start a flink job.");
- if (this.forceStartFlinkJob) {
- env.fromSequence(0, 0)
- .name("Nothing to Cluster Source")
- .sinkTo(new DiscardingSink<>());
- return true;
- } else {
- return false;
- }
- }
-
- Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits =
-
incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits);
-
- // 2. read,sort and write in partition
- List<DataStream<Committable>> dataStreams = new ArrayList<>();
-
- for (Map.Entry<BinaryRow, Pair<List<DataSplit>, CommitMessage>> entry :
- partitionSplits.entrySet()) {
- BinaryRow partition = entry.getKey();
- List<DataSplit> splits = entry.getValue().getKey();
- CommitMessage dvCommitMessage = entry.getValue().getRight();
- LinkedHashMap<String, String> partitionSpec =
- partitionComputer.generatePartValues(partition);
-
- // 2.1 generate source for current partition
- Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
- IncrementalClusterSplitSource.buildSource(
- env,
- table,
- partitionSpec,
- splits,
- dvCommitMessage,
-
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
-
- // 2.2 cluster in partition
- Integer sinkParallelism =
options.get(FlinkConnectorOptions.SINK_PARALLELISM);
- if (sinkParallelism == null) {
- sinkParallelism = sourcePair.getLeft().getParallelism();
- }
- TableSortInfo sortInfo =
- new TableSortInfo.Builder()
-
.setSortColumns(incrementalClusterManager.clusterKeys())
-
.setSortStrategy(incrementalClusterManager.clusterCurve())
- .setSinkParallelism(sinkParallelism)
- .setLocalSampleSize(sinkParallelism *
localSampleMagnification)
- .setGlobalSampleSize(sinkParallelism * 1000)
- .setRangeNumber(sinkParallelism * 10)
- .build();
- DataStream<RowData> sorted =
- TableSorter.getSorter(env, sourcePair.getLeft(), table,
sortInfo).sort();
-
- // 2.3 write and then reorganize the committable
- // set parallelism to null, and it'll forward parallelism when
doWrite()
- RowAppendTableSink sink = new RowAppendTableSink(table, null,
null, null);
- boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
- DataStream<Committable> written =
- sink.doWrite(
- FlinkSinkBuilder.mapToInternalRow(
- sorted,
- table.rowType(),
- blobAsDescriptor,
-
table.catalogEnvironment().catalogContext()),
- commitUser,
- null);
- DataStream<Committable> clusterCommittable =
- written.forward()
- .transform(
- "Rewrite cluster committable",
- new CommittableTypeInfo(),
- new
RewriteIncrementalClusterCommittableOperator(
- table,
- compactUnits.entrySet().stream()
- .collect(
- Collectors.toMap(
-
Map.Entry::getKey,
- unit ->
-
unit.getValue()
-
.outputLevel()))))
- .setParallelism(written.getParallelism());
- dataStreams.add(clusterCommittable);
- dataStreams.add(sourcePair.getRight());
- }
-
- // 3. commit
- RowAppendTableSink sink = new RowAppendTableSink(table, null, null,
null);
- DataStream<Committable> dataStream = dataStreams.get(0);
- for (int i = 1; i < dataStreams.size(); i++) {
- dataStream = dataStream.union(dataStreams.get(i));
- }
- sink.doCommit(dataStream, commitUser);
- return true;
- }
-
protected PartitionPredicate getPartitionPredicate() throws Exception {
checkArgument(
partitions == null || whereSql == null,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index a438a5f895..1a65b8d1bd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -23,7 +23,7 @@ import org.apache.paimon.append.MultiTableAppendCompactTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
+import org.apache.paimon.flink.compact.AppendTableCompact;
import org.apache.paimon.flink.sink.BucketsRowChannelComputer;
import org.apache.paimon.flink.sink.CombinedTableCompactorSink;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
@@ -278,8 +278,8 @@ public class CompactDatabaseAction extends ActionBase {
private void buildForUnawareBucketCompaction(
StreamExecutionEnvironment env, String fullName, FileStoreTable
table) {
- AppendTableCompactBuilder unawareBucketCompactionTopoBuilder =
- new AppendTableCompactBuilder(env, fullName, table);
+ AppendTableCompact unawareBucketCompactionTopoBuilder =
+ new AppendTableCompact(env, fullName, table);
unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
unawareBucketCompactionTopoBuilder.withPartitionIdleTime(partitionIdleTime);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java
index 8cd9bca856..a9037e8428 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
* snapshot. We need to enable checkpoint for this compaction job, checkpoint
will trigger committer
* stage to commit all the compacted files.
*/
-public class AppendTableCompactBuilder {
+public class AppendTableCompact {
private final transient StreamExecutionEnvironment env;
private final String tableIdentifier;
@@ -65,7 +65,7 @@ public class AppendTableCompactBuilder {
@Nullable private PartitionPredicate partitionPredicate;
@Nullable private Duration partitionIdleTime = null;
- public AppendTableCompactBuilder(
+ public AppendTableCompact(
StreamExecutionEnvironment env, String tableIdentifier,
FileStoreTable table) {
this.env = env;
this.tableIdentifier = tableIdentifier;
@@ -120,7 +120,6 @@ public class AppendTableCompactBuilder {
}
private DataStreamSource<AppendCompactTask> buildSource() {
-
long scanInterval =
table.coreOptions().continuousDiscoveryInterval().toMillis();
AppendTableCompactSource source =
new AppendTableCompactSource(table, isContinuous,
scanInterval, partitionPredicate);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
new file mode 100644
index 0000000000..c7fef70266
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
+import
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.RowAppendTableSink;
+import org.apache.paimon.flink.sorter.TableSortInfo;
+import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
+
+/** Compact for incremental clustering. */
+public class IncrementalClusterCompact {
+
+ protected final StreamExecutionEnvironment env;
+ protected final FileStoreTable table;
+ protected final IncrementalClusterManager clusterManager;
+ protected final String commitUser;
+ protected final InternalRowPartitionComputer partitionComputer;
+
+ protected final @Nullable Integer parallelism;
+ protected final int localSampleMagnification;
+ protected final Map<BinaryRow, CompactUnit> compactUnits;
+ protected final Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>>
compactSplits;
+
+ public IncrementalClusterCompact(
+ StreamExecutionEnvironment env,
+ FileStoreTable table,
+ PartitionPredicate partitionPredicate,
+ @Nullable Boolean fullCompaction) {
+ this.env = env;
+ this.table = table;
+ this.clusterManager = new IncrementalClusterManager(table,
partitionPredicate);
+ Options options = new Options(table.options());
+ this.partitionComputer = table.store().partitionComputer();
+ this.commitUser = CoreOptions.createCommitUser(options);
+ this.parallelism = options.get(SCAN_PARALLELISM);
+ this.localSampleMagnification =
table.coreOptions().getLocalSampleMagnification();
+ if (localSampleMagnification < 20) {
+ throw new IllegalArgumentException(
+ String.format(
+ "the config '%s=%d' should not be set too small,
greater than or equal to 20 is needed.",
+
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+ localSampleMagnification));
+ }
+ // non-full strategy as default for incremental clustering
+ this.compactUnits =
+ clusterManager.createCompactUnits(fullCompaction != null &&
fullCompaction);
+ this.compactSplits =
clusterManager.toSplitsAndRewriteDvFiles(compactUnits);
+ }
+
+ public void build() throws Exception {
+ if (compactUnits.isEmpty()) {
+ env.fromSequence(0, 0).name("Nothing to Cluster
Source").sinkTo(new DiscardingSink<>());
+ return;
+ }
+
+ List<DataStream<Committable>> dataStreams = new ArrayList<>();
+ for (Map.Entry<BinaryRow, Pair<List<DataSplit>, CommitMessage>> entry :
+ compactSplits.entrySet()) {
+ dataStreams.addAll(
+ buildCompactOperator(
+ entry.getKey(),
+ entry.getValue().getKey(),
+ entry.getValue().getRight(),
+ parallelism));
+ }
+
+ buildCommitOperator(dataStreams);
+ }
+
+ /**
+ * Build for one partition.
+ *
+ * @param parallelism Give the caller the opportunity to set parallelism
+ */
+ protected List<DataStream<Committable>> buildCompactOperator(
+ BinaryRow partition,
+ List<DataSplit> splits,
+ CommitMessage dvCommitMessage,
+ @Nullable Integer parallelism) {
+ LinkedHashMap<String, String> partitionSpec =
+ partitionComputer.generatePartValues(partition);
+
+ // 2.1 generate source for current partition
+ Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
+ IncrementalClusterSplitSource.buildSource(
+ env, table, partitionSpec, splits, dvCommitMessage,
parallelism);
+
+ // 2.2 cluster in partition
+ Integer sinkParallelism = parallelism;
+ if (sinkParallelism == null) {
+ sinkParallelism = sourcePair.getLeft().getParallelism();
+ }
+ TableSortInfo sortInfo =
+ new TableSortInfo.Builder()
+ .setSortColumns(clusterManager.clusterKeys())
+ .setSortStrategy(clusterManager.clusterCurve())
+ .setSinkParallelism(sinkParallelism)
+ .setLocalSampleSize(sinkParallelism *
localSampleMagnification)
+ .setGlobalSampleSize(sinkParallelism * 1000)
+ .setRangeNumber(sinkParallelism * 10)
+ .build();
+ DataStream<RowData> sorted =
+ TableSorter.getSorter(env, sourcePair.getLeft(), table,
sortInfo).sort();
+
+ // 2.3 write and then reorganize the committable
+ // set parallelism to null, and it'll forward parallelism when
doWrite()
+ RowAppendTableSink sink = new RowAppendTableSink(table, null, null,
null);
+ boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+ DataStream<Committable> written =
+ sink.doWrite(
+ FlinkSinkBuilder.mapToInternalRow(
+ sorted,
+ table.rowType(),
+ blobAsDescriptor,
+ table.catalogEnvironment().catalogContext()),
+ commitUser,
+ null);
+ DataStream<Committable> clusterCommittable =
+ written.forward()
+ .transform(
+ "Rewrite cluster committable",
+ new CommittableTypeInfo(),
+ new
RewriteIncrementalClusterCommittableOperator(
+ table,
+ compactUnits.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+
Map.Entry::getKey,
+ unit ->
+
unit.getValue()
+
.outputLevel()))))
+ .setParallelism(written.getParallelism());
+
+ List<DataStream<Committable>> dataStreams = new ArrayList<>();
+ dataStreams.add(clusterCommittable);
+ dataStreams.add(sourcePair.getRight());
+ return dataStreams;
+ }
+
+ protected void buildCommitOperator(List<DataStream<Committable>>
dataStreams) {
+ RowAppendTableSink sink = new RowAppendTableSink(table, null, null,
null);
+ DataStream<Committable> dataStream = dataStreams.get(0);
+ for (int i = 1; i < dataStreams.size(); i++) {
+ dataStream = dataStream.union(dataStreams.get(i));
+ }
+ sink.doCommit(dataStream, commitUser);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 8e98727332..93305a2aec 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -533,7 +533,7 @@ public class CompactProcedure extends BaseProcedure {
FileStoreTable table, boolean fullCompaction, DataSourceV2Relation
relation) {
IncrementalClusterManager incrementalClusterManager = new
IncrementalClusterManager(table);
Map<BinaryRow, CompactUnit> compactUnits =
- incrementalClusterManager.prepareForCluster(fullCompaction);
+ incrementalClusterManager.createCompactUnits(fullCompaction);
Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits =
incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits);