yunfengzhou-hub commented on code in PR #7027: URL: https://github.com/apache/paimon/pull/7027#discussion_r3004547134
########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/ExpireSnapshotsPlanner.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.expire; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; + +import static org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot; +import static org.apache.paimon.utils.SnapshotManager.findPreviousSnapshot; + +/** + * Planner for snapshot expiration. This class computes the expiration plan including: + * + * <ul> + * <li>The range of snapshots to expire [beginInclusiveId, endExclusiveId) + * <li>Protection set containing manifests that should not be deleted + * <li>Four groups of tasks organized by deletion phase + * </ul> + * + * <p>Tag data files are loaded on-demand by workers using {@link + * SnapshotDeletion#createDataFileSkipperForTags}, which has internal caching to avoid repeated + * reads. + */ +public class ExpireSnapshotsPlanner { + + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsPlanner.class); + + private final SnapshotManager snapshotManager; + private final ConsumerManager consumerManager; + private final SnapshotDeletion snapshotDeletion; + private final TagManager tagManager; + + public ExpireSnapshotsPlanner( + SnapshotManager snapshotManager, + ConsumerManager consumerManager, + SnapshotDeletion snapshotDeletion, + TagManager tagManager) { + this.snapshotManager = snapshotManager; + this.consumerManager = consumerManager; + this.snapshotDeletion = snapshotDeletion; + this.tagManager = tagManager; + } + + /** Creates an ExpireSnapshotsPlanner from a FileStoreTable. */ + public static ExpireSnapshotsPlanner create(FileStoreTable table) { + SnapshotManager snapshotManager = table.snapshotManager(); + ConsumerManager consumerManager = + new ConsumerManager(table.fileIO(), table.location(), snapshotManager.branch()); + return new ExpireSnapshotsPlanner( + table.snapshotManager(), + consumerManager, + table.store().newSnapshotDeletion(), + table.tagManager()); + } + + /** + * Plan the snapshot expiration. + * + * @param config expiration configuration + * @return the expiration plan with three groups of tasks, or empty plan if nothing to expire + */ + public ExpireSnapshotsPlan plan(ExpireConfig config) { + snapshotDeletion.setChangelogDecoupled(config.isChangelogDecoupled()); + int retainMax = config.getSnapshotRetainMax(); + int retainMin = config.getSnapshotRetainMin(); + int maxDeletes = config.getSnapshotMaxDeletes(); + long olderThanMills = + System.currentTimeMillis() - config.getSnapshotTimeRetain().toMillis(); + + // 1. Get snapshot range + Long latestSnapshotId = snapshotManager.latestSnapshotId(); + if (latestSnapshotId == null) { + // no snapshot, nothing to expire + return ExpireSnapshotsPlan.empty(); + } + + Long earliestId = snapshotManager.earliestSnapshotId(); + if (earliestId == null) { + return ExpireSnapshotsPlan.empty(); + } + + Preconditions.checkArgument( + retainMax >= retainMin, + String.format( + "retainMax (%s) must not be less than retainMin (%s).", + retainMax, retainMin)); + + // the min snapshot to retain from 'snapshot.num-retained.max' + // (the maximum number of snapshots to retain) + long min = Math.max(latestSnapshotId - retainMax + 1, earliestId); + + // the max exclusive snapshot to expire until + // protected by 'snapshot.num-retained.min' + // (the minimum number of completed snapshots to retain) + long endExclusiveId = latestSnapshotId - retainMin + 1; + + // the snapshot being read by the consumer cannot be deleted + long consumerProtection = consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE); + endExclusiveId = Math.min(endExclusiveId, consumerProtection); Review Comment: This logic to compute begin and end snapshot id is also repeated in `ExpireSnapshotsImpl` and `ExpireChangelogImpl`. It might be better to extract a common method for this code part, like whtat this PR has done for `FileDeletionBase#cleanEmptyDirectories`. ########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/ExpireSnapshotsPlan.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.expire; + +import org.apache.paimon.Snapshot; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * The plan for snapshot expiration, containing four groups of tasks organized by deletion phase. + * + * <p>The plan organizes tasks into four groups for correct deletion order: + * + * <ul> + * <li>{@link #dataFileTasks()}: Phase 1a - Delete data files (can be parallelized) + * <li>{@link #changelogFileTasks()}: Phase 1b - Delete changelog files (can be parallelized) + * <li>{@link #manifestTasks()}: Phase 2a - Delete manifest files (serial) + * <li>{@link #snapshotFileTasks()}: Phase 2b - Delete snapshot metadata files (serial) + * </ul> + */ +public class ExpireSnapshotsPlan implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final ExpireSnapshotsPlan EMPTY = + new ExpireSnapshotsPlan( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + null, + 0, + 0, + Collections.emptyMap()); + + private final List<SnapshotExpireTask> dataFileTasks; + private final List<SnapshotExpireTask> changelogFileTasks; + private final List<SnapshotExpireTask> manifestTasks; + private final List<SnapshotExpireTask> snapshotFileTasks; + private final ProtectionSet protectionSet; + private final long beginInclusiveId; + private final long endExclusiveId; + + /** Pre-collected snapshot cache from planner to avoid redundant snapshot reads. */ + private final Map<Long, Snapshot> snapshotCache; + + public ExpireSnapshotsPlan( + List<SnapshotExpireTask> dataFileTasks, + List<SnapshotExpireTask> changelogFileTasks, + List<SnapshotExpireTask> manifestTasks, + List<SnapshotExpireTask> snapshotFileTasks, + ProtectionSet protectionSet, + long beginInclusiveId, + long endExclusiveId, + Map<Long, Snapshot> snapshotCache) { + this.dataFileTasks = dataFileTasks; + this.changelogFileTasks = changelogFileTasks; + this.manifestTasks = manifestTasks; + this.snapshotFileTasks = snapshotFileTasks; + this.protectionSet = protectionSet; + this.beginInclusiveId = beginInclusiveId; + this.endExclusiveId = endExclusiveId; + this.snapshotCache = snapshotCache; + } + + public static ExpireSnapshotsPlan empty() { + return EMPTY; + } + + public boolean isEmpty() { + return dataFileTasks.isEmpty() + && changelogFileTasks.isEmpty() + && manifestTasks.isEmpty() + && snapshotFileTasks.isEmpty(); + } + + /** Get data file deletion tasks (Phase 1a). These can be executed in parallel. */ + public List<SnapshotExpireTask> dataFileTasks() { Review Comment: unused method. ########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/ExpireSnapshotsPlan.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.expire; + +import org.apache.paimon.Snapshot; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * The plan for snapshot expiration, containing four groups of tasks organized by deletion phase. + * + * <p>The plan organizes tasks into four groups for correct deletion order: + * + * <ul> + * <li>{@link #dataFileTasks()}: Phase 1a - Delete data files (can be parallelized) + * <li>{@link #changelogFileTasks()}: Phase 1b - Delete changelog files (can be parallelized) + * <li>{@link #manifestTasks()}: Phase 2a - Delete manifest files (serial) + * <li>{@link #snapshotFileTasks()}: Phase 2b - Delete snapshot metadata files (serial) + * </ul> + */ +public class ExpireSnapshotsPlan implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final ExpireSnapshotsPlan EMPTY = + new ExpireSnapshotsPlan( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + null, + 0, + 0, + Collections.emptyMap()); + + private final List<SnapshotExpireTask> dataFileTasks; + private final List<SnapshotExpireTask> changelogFileTasks; + private final List<SnapshotExpireTask> manifestTasks; + private final List<SnapshotExpireTask> snapshotFileTasks; + private final ProtectionSet protectionSet; + private final long beginInclusiveId; + private final long endExclusiveId; + + /** Pre-collected snapshot cache from planner to avoid redundant snapshot reads. */ + private final Map<Long, Snapshot> snapshotCache; + + public ExpireSnapshotsPlan( + List<SnapshotExpireTask> dataFileTasks, + List<SnapshotExpireTask> changelogFileTasks, + List<SnapshotExpireTask> manifestTasks, + List<SnapshotExpireTask> snapshotFileTasks, + ProtectionSet protectionSet, + long beginInclusiveId, + long endExclusiveId, + Map<Long, Snapshot> snapshotCache) { + this.dataFileTasks = dataFileTasks; + this.changelogFileTasks = changelogFileTasks; + this.manifestTasks = manifestTasks; + this.snapshotFileTasks = snapshotFileTasks; + this.protectionSet = protectionSet; + this.beginInclusiveId = beginInclusiveId; + this.endExclusiveId = endExclusiveId; + this.snapshotCache = snapshotCache; + } + + public static ExpireSnapshotsPlan empty() { + return EMPTY; + } + + public boolean isEmpty() { + return dataFileTasks.isEmpty() + && changelogFileTasks.isEmpty() + && manifestTasks.isEmpty() + && snapshotFileTasks.isEmpty(); + } + + /** Get data file deletion tasks (Phase 1a). These can be executed in parallel. */ + public List<SnapshotExpireTask> dataFileTasks() { + return dataFileTasks; + } + + /** Get changelog file deletion tasks (Phase 1b). These can be executed in parallel. */ + public List<SnapshotExpireTask> changelogFileTasks() { Review Comment: unused method. ########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java: ########## @@ -50,12 +95,139 @@ public ExpireSnapshotsAction( this.olderThan = olderThan; this.maxDeletes = maxDeletes; this.options = options; + this.parallelism = resolveParallelism(parallelism); + } + + private int resolveParallelism(Integer parallelism) { + if (parallelism != null) { + return parallelism; + } + int envParallelism = env.getParallelism(); + return envParallelism > 0 ? envParallelism : 1; } + @Override + public void run() throws Exception { + if (forceStartFlinkJob || parallelism > 1) { + // Parallel mode: build custom multi-parallelism Flink pipeline + build(); + if (!env.getTransformations().isEmpty()) { + execute(this.getClass().getSimpleName()); + } + } else { + // Default: ActionBase handles LocalAction → executeLocally() + super.run(); + } + } + + @Override public void executeLocally() throws Exception { ExpireSnapshotsProcedure expireSnapshotsProcedure = new ExpireSnapshotsProcedure(); expireSnapshotsProcedure.withCatalog(catalog); expireSnapshotsProcedure.call( null, database + "." + table, retainMax, retainMin, olderThan, maxDeletes, options); } + + @Override + public void build() throws Exception { + Identifier identifier = new Identifier(database, table); + + // Prepare table with dynamic options + HashMap<String, String> dynamicOptions = new HashMap<>(); + ProcedureUtils.putAllOptions(dynamicOptions, options); + FileStoreTable fileStoreTable = + (FileStoreTable) catalog.getTable(identifier).copy(dynamicOptions); + + // Build expire config + CoreOptions tableOptions = fileStoreTable.store().options(); + ExpireConfig expireConfig = + ProcedureUtils.fillInSnapshotOptions( + tableOptions, retainMax, retainMin, olderThan, maxDeletes) + .build(); + + // Create planner using factory method + ExpireSnapshotsPlanner planner = ExpireSnapshotsPlanner.create(fileStoreTable); + + // Plan the expiration + ExpireSnapshotsPlan plan = planner.plan(expireConfig); + if (plan.isEmpty()) { + LOG.info("No snapshots to expire, skipping Flink job submission."); + return; + } + + LOG.info( + "Planning to expire {} snapshots, range=[{}, {})", + plan.endExclusiveId() - plan.beginInclusiveId(), + plan.beginInclusiveId(), + plan.endExclusiveId()); + + // Build worker phase + DataStream<DeletionReport> reports = buildWorkerPhase(plan, identifier, expireConfig); + + // Build sink phase + buildSinkPhase(reports, plan, identifier, expireConfig); + } + + /** + * Build the worker phase of the Flink job. + * + * <p>Workers process data file and changelog file deletion tasks in parallel. Tasks are + * partitioned by snapshot range to ensure: + * + * <ul> + * <li>Same snapshot range tasks are processed by the same worker + * <li>Within each worker, data files are deleted before changelog files + * <li>Cache locality is maximized (adjacent snapshots often share manifest files) + * </ul> + */ + private DataStream<DeletionReport> buildWorkerPhase( + ExpireSnapshotsPlan plan, Identifier identifier, ExpireConfig expireConfig) { + // Partition by snapshot range: each worker gets a contiguous range of snapshots + // with dataFileTasks first, then changelogFileTasks + List<List<SnapshotExpireTask>> partitionedGroups = + plan.partitionTasksBySnapshotRange(parallelism); + + DataStreamSource<List<SnapshotExpireTask>> source = + env.fromCollection(partitionedGroups).setParallelism(1); + + return source.rebalance() + .flatMap( + new RangePartitionedExpireFunction( + catalogOptions.toMap(), + identifier, + plan.protectionSet().taggedSnapshots(), + plan.snapshotCache(), + expireConfig.isChangelogDecoupled())) + // Use JavaTypeInfo to ensure proper Java serialization of DeletionReport, + // avoiding Kryo's FieldSerializer which cannot handle BinaryRow correctly. + // This approach is compatible with both Flink 1.x and 2.x. Review Comment: This comment is a little redundant. Most Flink APIs used by Paimon are compatible with both Flink 1.x and 2.x, and there is no need to amplify this point. ########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java: ########## @@ -50,12 +95,168 @@ public ExpireSnapshotsAction( this.olderThan = olderThan; this.maxDeletes = maxDeletes; this.options = options; + this.parallelism = parallelism; } + /** Returns true if forceStartFlinkJob is enabled and parallelism is greater than 1. */ + private boolean isParallelMode() { + return forceStartFlinkJob && parallelism != null && parallelism > 1; + } + + @Override + public void run() throws Exception { Review Comment: In the current implementation, the DAG of the Flink job would change, if the user configures its parallelism from 1 to 2. This would bring additional complexity to the Job's operation process. Given the problem above, I prefer to use the same DAG for the job no matter what parallelism the job has. ########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/DeletionReport.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.expire; + +import org.apache.paimon.data.BinaryRow; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** Report of a single snapshot expiration task. */ +public class DeletionReport implements Serializable { + + private static final long serialVersionUID = 1L; + + private final long snapshotId; + + /** Whether this task was skipped (e.g., snapshot already deleted). */ + private boolean skipped; + + /** Buckets that had files deleted (for empty directory cleanup in parallel phase). */ + private Map<BinaryRow, Set<Integer>> deletionBuckets; + + public DeletionReport(long snapshotId) { + this.snapshotId = snapshotId; + this.skipped = false; + this.deletionBuckets = new HashMap<>(); + } + + /** + * Create a skipped report for a snapshot that was already deleted. + * + * @param snapshotId the snapshot ID + * @return a skipped deletion report + */ + public static DeletionReport skipped(long snapshotId) { + DeletionReport report = new DeletionReport(snapshotId); + report.skipped = true; + return report; + } + + public long snapshotId() { Review Comment: This field is only used in tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
