wzhero1 commented on code in PR #7027: URL: https://github.com/apache/paimon/pull/7027#discussion_r2973193068
########## paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsExecutor.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import org.apache.paimon.Changelog; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.ExpireFileEntry; +import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.SnapshotManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** + * Executor for snapshot expire tasks. This class handles the actual deletion logic based on the + * task type defined by {@link SnapshotExpireTask.TaskType}. + * + * <p>The executor uses switch-based dispatch on task type: + * + * <ul> + * <li>{@link SnapshotExpireTask.TaskType#DELETE_DATA_FILES} → delete data files + * <li>{@link SnapshotExpireTask.TaskType#DELETE_CHANGELOG_FILES} → delete changelog files + * <li>{@link SnapshotExpireTask.TaskType#DELETE_MANIFESTS} → delete manifest files + * <li>{@link SnapshotExpireTask.TaskType#DELETE_SNAPSHOT} → delete snapshot metadata file + * </ul> + */ +public class ExpireSnapshotsExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsExecutor.class); + + private final SnapshotManager snapshotManager; + private final SnapshotDeletion snapshotDeletion; + @Nullable private final ChangelogManager changelogManager; + + /** Pre-collected snapshot cache. Empty map means no cache (read on demand). */ + private Map<Long, Snapshot> snapshotCache = Collections.emptyMap(); + + public ExpireSnapshotsExecutor( + SnapshotManager snapshotManager, SnapshotDeletion snapshotDeletion) { + this(snapshotManager, snapshotDeletion, null); + } + + public ExpireSnapshotsExecutor( + SnapshotManager snapshotManager, + SnapshotDeletion snapshotDeletion, + @Nullable ChangelogManager changelogManager) { + this.snapshotManager = snapshotManager; + this.snapshotDeletion = snapshotDeletion; + this.changelogManager = changelogManager; + } + + /** Set pre-collected snapshot cache to avoid redundant reads. */ + public void setSnapshotCache(Map<Long, Snapshot> snapshotCache) { + this.snapshotCache = snapshotCache; + } + + /** + * Execute a snapshot expire task based on its task type. + * + * @param task the task to execute + * @param taggedSnapshots taggedSnapshots set for data file deletion (required for + * DELETE_DATA_FILES) + * @param skippingSet manifest skipping set (required for DELETE_MANIFESTS) + * @return deletion report with execution results + */ + public DeletionReport execute( + SnapshotExpireTask task, + @Nullable List<Snapshot> taggedSnapshots, + @Nullable Set<String> skippingSet) { + + Snapshot snapshot = snapshotCache.get(task.snapshotId()); + if (snapshot == null) { + try { + snapshot = snapshotManager.tryGetSnapshot(task.snapshotId()); + } catch (FileNotFoundException e) { + LOG.warn("Snapshot {} not found, skipping task", task.snapshotId()); + return DeletionReport.skipped(task.snapshotId()); + } + } + + switch (task.taskType()) { + case DELETE_DATA_FILES: + return executeDeleteDataFiles(task, snapshot, taggedSnapshots); + case DELETE_CHANGELOG_FILES: + return executeDeleteChangelogFiles(task, snapshot); + case DELETE_MANIFESTS: + return executeDeleteManifests(task, snapshot, skippingSet); + case DELETE_SNAPSHOT: + return executeDeleteSnapshot(task, snapshot); Review Comment: Refactored as suggested. Replaced the ExpireSnapshotsExecutor switch-based dispatch with polymorphic SnapshotExpireTask subclasses (DeleteDataFilesTask, DeleteChangelogFilesTask, DeleteManifestsTask, DeleteSnapshotTask). Each subclass encapsulates its own deletion logic via execute(ExpireContext). Also simplified ExpireContext by removing delegate methods — callers now access underlying objects directly (e.g., context.snapshotDeletion().cleanUnusedDataFiles()). taggedSnapshots and snapshotCache are now constructor parameters of ExpireContext instead of setters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
