mxm commented on code in PR #13302: URL: https://github.com/apache/iceberg/pull/13302#discussion_r2177823542
########## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/OrphanFilesDetector.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.actions.DeleteOrphanFiles; +import org.apache.iceberg.actions.FileURI; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A specialized co-process function that performs an anti-join between two streams of file URIs. + * + * <p>Emits every file that exists in the file system but is not referenced in the table metadata, + * which are considered orphan files. It also handles URI normalization using provided scheme and + * authority equivalence mappings. + */ +@Internal +public class OrphanFilesDetector extends KeyedCoProcessFunction<String, String, String, String> { + private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesDetector.class); + + // Use MapState to dedupe the strings found in the table + private transient MapState<String, Boolean> foundInTable; + private transient ValueState<String> foundInFileSystem; + private transient ValueState<Boolean> hasUriError; + private final DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode; + private final Map<String, String> equalSchemes; + private final Map<String, String> equalAuthorities; + + public OrphanFilesDetector( + DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode, + Map<String, String> equalSchemes, + Map<String, String> equalAuthorities) { + this.prefixMismatchMode = prefixMismatchMode; + this.equalSchemes = equalSchemes; + this.equalAuthorities = equalAuthorities; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + foundInTable = + getRuntimeContext() + .getMapState( + new MapStateDescriptor<>("antiJoinFoundInTable", Types.STRING, Types.BOOLEAN)); + hasUriError = + getRuntimeContext().getState(new ValueStateDescriptor<>("antiJoinUriError", Types.BOOLEAN)); + foundInFileSystem = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("antiJoinFoundInFileSystem", Types.STRING)); + } + + @Override + public void processElement1(String value, Context context, Collector<String> collector) + throws Exception { + if (shouldSkipElement(value, context)) { + return; + } + + if (!foundInTable.contains(value)) { + foundInTable.put(value, true); Review Comment: Do we actually need to save the `value` here? Wouldn't a `ValueState` suffice? The state is always scoped by key. The current key is always available in `processElement(..)` or in `onTimer(..)` ########## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java: ########## @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.time.Duration; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.FileURI; +import org.apache.iceberg.flink.maintenance.operator.AntiJoin; +import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor; +import org.apache.iceberg.flink.maintenance.operator.FileUriConverter; +import org.apache.iceberg.flink.maintenance.operator.ListFileSystemFiles; +import org.apache.iceberg.flink.maintenance.operator.ListMetadataFilesProcess; +import org.apache.iceberg.flink.maintenance.operator.SkipOnError; +import org.apache.iceberg.flink.maintenance.operator.TablePlanner; +import org.apache.iceberg.flink.maintenance.operator.TableReader; +import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** Delete orphan files from the file system. */ +public class DeleteOrphanFiles { + + private static final Schema FILE_PATH_SCHEMA = new Schema(DataFile.FILE_PATH); + private static final ScanContext FILE_PATH_SCAN_CONTEXT = + ScanContext.builder().streaming(true).project(FILE_PATH_SCHEMA).build(); + private static final Splitter COMMA_SPLITTER = Splitter.on(","); + + @Internal + public static final OutputTag<Exception> ERROR_STREAM = + new OutputTag<>("error-stream", TypeInformation.of(Exception.class)); + + static final String PLANNER_TASK_NAME = "Table Planner"; + static final String READER_TASK_NAME = "Files Reader"; + static final String FILESYSTEM_FILES_TASK_NAME = "Filesystem Files"; + static final String METADATA_FILES_TASK_NAME = "List metadata Files"; + static final String DELETE_FILES_TASK_NAME = "Delete File"; + static final String AGGREGATOR_TASK_NAME = "Orphan Files Aggregator"; + static final String FILTER_FILES_TASK_NAME = "Filter File"; + static final String SKIP_ON_ERROR_TASK_NAME = "Skip On Error"; + + public static DeleteOrphanFiles.Builder builder() { + return new DeleteOrphanFiles.Builder(); + } + + private DeleteOrphanFiles() { + // Do not instantiate directly + } + + public static class Builder extends MaintenanceTaskBuilder<DeleteOrphanFiles.Builder> { + private String location = null; + private Duration minAge = Duration.ofDays(3); + private int planningWorkerPoolSize = 10; + private int deleteBatchSize = 1000; + private boolean caseSensitive = false; + private int maxListingDepth = 3; + private int maxListingDirectSubDirs = 10; + private boolean usePrefixListing = false; + private Map<String, String> equalSchemes = + Maps.newHashMap( + ImmutableMap.of( + "s3n", "s3", + "s3a", "s3a")); + private final Map<String, String> equalAuthorities = Maps.newHashMap(); + private org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode = + org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode.ERROR; + + /** + * The location to start the recursive listing the candidate files for removal. By default, the + * {@link Table#location()} is used. + * + * @param newLocation the task will scan + * @return for chained calls + */ + public Builder location(String newLocation) { + this.location = newLocation; + return this; + } + + /** + * Whether to use prefix listing when listing files from the file system. + * + * @param newUsePrefixListing true to enable prefix listing, false otherwise + * @return for chained calls + */ + public Builder usePrefixListing(boolean newUsePrefixListing) { + this.usePrefixListing = newUsePrefixListing; + return this; + } + + /** + * The maximum number of direct subdirectories to list in a single directory. + * + * @param newMaxListingDirectSubDirs the maximum number of direct sub-directories to list + * @return for chained calls + */ + public Builder maxListingDirectSubDirs(int newMaxListingDirectSubDirs) { + this.maxListingDirectSubDirs = newMaxListingDirectSubDirs; + return this; + } + + /** + * The maximum depth to recurse when listing files from the file system. + * + * @param newMaxListingDepth the maximum depth to recurse + * @return for chained calls + */ + public Builder maxListingDepth(int newMaxListingDepth) { + this.maxListingDepth = newMaxListingDepth; + return this; + } + + /** + * Action behavior when location prefixes (schemes/authorities) mismatch. + * + * @param newPrefixMismatchMode to action when mismatch + * @return for chained calls + */ + public Builder prefixMismatchMode( + org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) { Review Comment: Why do we need to use fully qualified name here? Which other class clashes with PrefixMismatchMode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
