Guosmilesmile commented on code in PR #13302: URL: https://github.com/apache/iceberg/pull/13302#discussion_r2282024327
########## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.time.Duration; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode; +import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor; +import org.apache.iceberg.flink.maintenance.operator.FileNameReader; +import org.apache.iceberg.flink.maintenance.operator.FileUriKeySelector; +import org.apache.iceberg.flink.maintenance.operator.ListFileSystemFiles; +import org.apache.iceberg.flink.maintenance.operator.ListMetadataFiles; +import org.apache.iceberg.flink.maintenance.operator.MetadataTablePlanner; +import org.apache.iceberg.flink.maintenance.operator.OrphanFilesDetector; +import org.apache.iceberg.flink.maintenance.operator.SkipOnError; +import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** Delete orphan files from the file system. */ +public class DeleteOrphanFiles { + + private static final Schema FILE_PATH_SCHEMA = new Schema(DataFile.FILE_PATH); + private static final ScanContext FILE_PATH_SCAN_CONTEXT = + ScanContext.builder().streaming(true).project(FILE_PATH_SCHEMA).build(); + private static final Splitter COMMA_SPLITTER = Splitter.on(","); + + @Internal + public static final OutputTag<Exception> ERROR_STREAM = + new OutputTag<>("error-stream", TypeInformation.of(Exception.class)); + + static final String PLANNER_TASK_NAME = "Table Planner"; + static final String READER_TASK_NAME = "Files Reader"; + static final String FILESYSTEM_FILES_TASK_NAME = "Filesystem Files"; + static final String METADATA_FILES_TASK_NAME = "List metadata Files"; + static final String DELETE_FILES_TASK_NAME = "Delete File"; + static final String AGGREGATOR_TASK_NAME = "Orphan Files Aggregator"; + static final String FILTER_FILES_TASK_NAME = "Filter File"; + static final String SKIP_ON_ERROR_TASK_NAME = "Skip On Error"; + + public static DeleteOrphanFiles.Builder builder() { + return new DeleteOrphanFiles.Builder(); + } + + private DeleteOrphanFiles() { + // Do not instantiate directly + } + + public static class Builder extends MaintenanceTaskBuilder<DeleteOrphanFiles.Builder> { + private String location = null; + private Duration minAge = Duration.ofDays(3); + private int planningWorkerPoolSize = 10; + private int deleteBatchSize = 1000; + private int maxListingDepth = 3; + private int maxListingDirectSubDirs = 10; Review Comment: I understand what you mean. If we add a maximum file limit here, it indeed can shorten the execution time of a task, which is equivalent to checking whether some system files are orphan files. After thinking it through, I believe it is necessary. If we are to add this configuration item, I think it should be added at the core level. Can we not support this feature in this PR, but instead open another PR to design and implement it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
