flyrain commented on code in PR #3256: URL: https://github.com/apache/polaris/pull/3256#discussion_r2726371138
########## storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/StorageUri.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.storage.files.impl; + +import com.google.common.base.Preconditions; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.Comparator; +import java.util.Locale; +import java.util.Objects; + +/** + * A utility class for working with file storage locations to be used instead of {@link URI} since + * many actual object store locations do not always match the URI syntax. + * + * <p>This type is <em>private</em> to the {@code polaris-storage-files-impl} module. + */ +final class StorageUri implements Comparable<StorageUri> { + public static final String SCHEME_FILE = "file"; Review Comment: Can we remove as it is not used? ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileType.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import org.apache.iceberg.ContentFile; + +public enum FileType { + UNKNOWN(false, false), + ICEBERG_METADATA(true, false), + ICEBERG_STATISTICS(false, false), + ICEBERG_MANIFEST_LIST(false, false), + ICEBERG_MANIFEST_FILE(false, false), + ICEBERG_DATA_FILE(false, true), + ICEBERG_DELETE_FILE(false, true), + ; + + private final boolean metadata; + private final boolean dataOrDelete; Review Comment: Looks like these two variables are never used. Can we remove them? ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileSpec.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.polaris.immutables.PolarisImmutable; + +/** + * Describes a single file/object in an object storage. + * + * <p>Not all attributes are populated by every {@link FileOperations} function. + */ +@PolarisImmutable +public interface FileSpec { + + /** The full object storage URI. */ + String location(); + + /** + * The type of the file, if known, as a convenience hint. + * + * <p>The type might have been guessed from the file name, in which case the returned value is not + * guaranteed to be accurate. + * + * @see #guessTypeFromName() + */ + Optional<FileType> fileType(); + + /** The size of the file in bytes, if available. */ + OptionalLong size(); + + /** The creation timestamp in milliseconds since the epoch, if available. */ + OptionalLong createdAtMillis(); + + static Builder builder() { + return ImmutableFileSpec.builder(); + } + + static Builder fromLocation(String location) { + return builder().location(location); + } + + static Builder fromLocationAndSize(String location, long size) { + var b = fromLocation(location); + if (size > 0L) { + b.size(size); + } + return b; + } + + /** Guesses the given file's type from its name, not guaranteed to be accurate. */ + default FileType guessTypeFromName() { Review Comment: this is only used by tests, can we move it the tests? ########## storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import com.google.common.collect.Streams; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileOperations; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.FileType; +import org.apache.polaris.storage.files.api.ImmutablePurgeStats; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link + * org.apache.iceberg.io.SupportsBulkOperations} and {@link + * org.apache.iceberg.io.SupportsPrefixOperations}. + */ +record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(FileOperationsImpl.class); + + @Override + public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter) { + var prefixUri = StorageUri.of(prefix).resolve("/"); + if (fileIO instanceof SupportsPrefixOperations prefixOps) { + return Streams.stream(prefixOps.listPrefix(prefix).iterator()) + .filter(Objects::nonNull) + .map( + fileInfo -> { + var location = StorageUri.of(fileInfo.location()); + if (!location.isAbsolute()) { + // ADLSFileIO does _not_ include the prefix, but GCSFileIO and S3FileIO do. + location = prefixUri.resolve(location); + } + return FileSpec.builder() + .location(location.toString()) + .size(fileInfo.size()) + .createdAtMillis(fileInfo.createdAtMillis()) + .build(); + }) + .filter(filter); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting prefix operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @Override + public Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate) { + var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation); + if (metadataOpt.isEmpty()) { + return Stream.empty(); + } + var metadata = metadataOpt.get(); + + var metadataFileSpec = + FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build(); + + var fileSources = new ArrayList<Stream<FileSpec>>(); + + fileSources.add(Stream.of(metadataFileSpec)); + + var statisticsFiles = metadata.statisticsFiles(); + if (statisticsFiles != null) { + fileSources.addFirst( + statisticsFiles.stream() + .map( + statisticsFile -> + FileSpec.fromLocationAndSize( + statisticsFile.path(), statisticsFile.fileSizeInBytes()) + .fileType(FileType.ICEBERG_STATISTICS) + .build())); + } + + var previousFiles = metadata.previousFiles(); + if (previousFiles != null) { + fileSources.add( + previousFiles.stream() + .filter( + metadataLogEntry -> + metadataLogEntry.file() != null && !metadataLogEntry.file().isEmpty()) + .map( + metadataLogEntry -> + FileSpec.fromLocation(metadataLogEntry.file()) + .fileType(FileType.ICEBERG_METADATA) + .build())); + } + + var specsById = metadata.specsById(); + + var addPredicate = deduplicator(deduplicate); + + fileSources.addFirst( + metadata.snapshots().stream() + // Newest snapshots first + .sorted((s1, s2) -> Long.compare(s2.timestampMillis(), s1.timestampMillis())) + .flatMap( + snapshot -> identifyIcebergTableSnapshotFiles(snapshot, specsById, addPredicate))); + + // Return "dependencies" before the "metadata" itself, so the probability of being able to + // resume a failed/aborted purge is higher. + + return fileSources.stream().flatMap(Function.identity()); + } + + static Predicate<String> deduplicator(boolean deduplicate) { + if (!deduplicate) { + return x -> true; + } + var set = new LinkedHashSet<String>(); + return location -> { + synchronized (set) { + if (set.size() > 100_000) { + // limit the heap pressure of the deduplication set to 100,000 elements + set.removeFirst(); + } + return set.add(location); + } + }; + } + + Stream<FileSpec> identifyIcebergTableSnapshotFiles( + @Nonnull Snapshot snapshot, + Map<Integer, PartitionSpec> specsById, + Predicate<String> addPredicate) { + var manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null && !addPredicate.test(manifestListLocation)) { + return Stream.empty(); + } + + return identifyIcebergManifests(manifestListLocation, snapshot, specsById, addPredicate); + } + + Stream<FileSpec> identifyIcebergManifests( + String manifestListLocation, + Snapshot snapshot, + Map<Integer, PartitionSpec> specsById, + Predicate<String> addPredicate) { + + var manifestListFileSpecStream = Stream.<FileSpec>empty(); + + if (manifestListLocation != null && !manifestListLocation.isEmpty()) { + var manifestListFileSpec = + FileSpec.fromLocation(manifestListLocation) + .fileType(FileType.ICEBERG_MANIFEST_LIST) + .build(); + manifestListFileSpecStream = Stream.of(manifestListFileSpec); + } + + try { + var allManifestsFiles = + snapshot.allManifests(fileIO).stream() + .filter(manifestFile -> addPredicate.test(manifestFile.path())) + .flatMap( + manifestFile -> + identifyIcebergManifestDataFiles(manifestFile, specsById, addPredicate)); + + // Return "dependencies" before the "metadata" itself, so a failed/aborted purge can be + // resumed. + return Stream.of(allManifestsFiles, manifestListFileSpecStream).flatMap(Function.identity()); + } catch (Exception e) { + LOGGER.warn("Failure reading manifest list file {}: {}", manifestListLocation, e.toString()); + LOGGER.debug("Failure reading manifest list file {}", manifestListLocation); + return manifestListFileSpecStream; + } + } + + @SuppressWarnings("UnnecessaryDefault") + private Stream<FileSpec> identifyIcebergManifestDataFiles( + ManifestFile manifestFile, + Map<Integer, PartitionSpec> specsById, + Predicate<String> addPredicate) { + + var manifestFileSpec = + FileSpec.fromLocationAndSize(manifestFile.path(), manifestFile.length()) + .fileType(FileType.ICEBERG_MANIFEST_FILE) + .build(); + + try (var contentFilesIter = + switch (manifestFile.content()) { + case DATA -> ManifestFiles.read(manifestFile, fileIO).iterator(); + case DELETES -> + ManifestFiles.readDeleteManifest(manifestFile, fileIO, specsById).iterator(); + default -> { + LOGGER.warn( + "Unsupported content type {} in manifest {}", + manifestFile.content(), + manifestFile.path()); + yield CloseableIterator.<ContentFile<? extends ContentFile<?>>>empty(); + } + }) { + + // Cannot leverage streaming here and eagerly build a list, as the manifest-file reader needs + // to be closed. + var files = new ArrayList<FileSpec>(); + while (contentFilesIter.hasNext()) { + var contentFile = contentFilesIter.next(); + if (addPredicate.test(contentFile.location())) { + files.add( + FileSpec.fromLocationAndSize(contentFile.location(), contentFile.fileSizeInBytes()) + .fileType(FileType.fromContentFile(contentFile)) + .build()); + } + } + // Return "dependencies" before the "metadata" itself, so the probability of being able to + // resume a failed/aborted purge is higher. + files.add(manifestFileSpec); + + return files.stream(); + } catch (IOException e) { + LOGGER.warn("Failure reading manifest file {}: {}", manifestFile.path(), e.toString()); + LOGGER.debug("Failure reading manifest file {}", manifestFile.path(), e); + return Stream.of(manifestFileSpec); + } + } + + @Override + public Stream<FileSpec> identifyIcebergViewFiles( + @Nonnull String viewMetadataLocation, boolean deduplicate) { + var metadataOpt = readViewMetadataFailsafe(viewMetadataLocation); + if (metadataOpt.isEmpty()) { + return Stream.empty(); + } + + var metadataFileSpec = + FileSpec.fromLocation(viewMetadataLocation).fileType(FileType.ICEBERG_METADATA).build(); + + return Stream.of(metadataFileSpec); + } + + @Override + public PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec) { + var files = + identifyIcebergTableFiles(tableMetadataLocation, true).filter(purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergTableBaseLocation( + @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec) { + var metadata = readTableMetadataFailsafe(tableMetadataLocation); + if (metadata.isEmpty()) { + return ImmutablePurgeStats.builder() + .duration(Duration.ZERO) + .purgedFiles(0L) + .failedPurges(1) + .build(); + } + + var baseLocation = metadata.get().location(); + var files = findFiles(baseLocation, purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec) { + var files = + identifyIcebergViewFiles(viewMetadataLocation, false).filter(purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergViewBaseLocation( + @Nonnull String viewMetadataLocation, PurgeSpec purgeSpec) { + var metadata = readViewMetadataFailsafe(viewMetadataLocation); + if (metadata.isEmpty()) { + return ImmutablePurgeStats.builder() + .duration(Duration.ZERO) + .purgedFiles(0L) + .failedPurges(1) + .build(); + } + + var baseLocation = metadata.get().location(); + var files = findFiles(baseLocation, purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purge(@Nonnull Stream<FileSpec> locationStream, PurgeSpec purgeSpec) { + return purgeFiles(locationStream.map(FileSpec::location), purgeSpec); + } + + @Override + public PurgeStats purgeFiles(@Nonnull Stream<String> locationStream, PurgeSpec purgeSpec) { + if (fileIO instanceof SupportsBulkOperations bulkOps) { + var startedNanos = System.nanoTime(); + + var iter = locationStream.iterator(); + + var batcher = new PurgeBatcher(purgeSpec, bulkOps); + while (iter.hasNext()) { + batcher.add(iter.next()); + } + batcher.flush(); + + return ImmutablePurgeStats.builder() + .purgedFiles(batcher.purged) + .failedPurges(batcher.failed) + .duration(Duration.ofNanos(System.nanoTime() - startedNanos)) + .build(); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting bulk operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @SuppressWarnings("UnstableApiUsage") + static final class PurgeBatcher { + private final PurgeSpec purgeSpec; + private final SupportsBulkOperations bulkOps; + + private final int deleteBatchSize; + // Using a `Set` prevents duplicate paths in a single bulk-deletion. + + private final Set<String> batch = new HashSet<>(); + + private final Runnable fileDeleteRateLimiter; + private final Runnable batchDeleteRateLimiter; + + long purged = 0L; + + long failed = 0L; + + PurgeBatcher(PurgeSpec purgeSpec, SupportsBulkOperations bulkOps) { + var implSpecificLimit = implSpecificDeleteBatchLimit(bulkOps); + + this.deleteBatchSize = Math.min(implSpecificLimit, Math.max(purgeSpec.deleteBatchSize(), 1)); + + this.purgeSpec = purgeSpec; + this.bulkOps = bulkOps; + + fileDeleteRateLimiter = createLimiter(purgeSpec.fileDeletesPerSecond()); + batchDeleteRateLimiter = createLimiter(purgeSpec.batchDeletesPerSecond()); + } + + private static Runnable createLimiter(OptionalDouble optionalDouble) { + if (optionalDouble.isEmpty()) { + // unlimited + return () -> {}; + } + var limiter = RateLimiter.create(optionalDouble.getAsDouble()); + return limiter::acquire; + } + + void add(String location) { + fileDeleteRateLimiter.run(); + batch.add(location); + + if (batch.size() >= deleteBatchSize) { + flush(); + } + } + + void flush() { + int size = batch.size(); + if (size > 0) { + batch.forEach(purgeSpec.purgeIssuedCallback()); + try { + batchDeleteRateLimiter.run(); + bulkOps.deleteFiles(batch); + purged += size; + } catch (BulkDeletionFailureException e) { + // Object stores do delete the files that exist, but a BulkDeletionFailureException is + // still being thrown. + // However, not all FileIO implementations behave the same way as some don't throw in the + // non-existent-case. + var batchFailed = e.numberFailedObjects(); + purged += size - batchFailed; + failed += batchFailed; + } finally { + batch.clear(); + } + } + } + } + + /** Figure out the hard-coded max batch size limit for a particular FileIO implementation. */ + static int implSpecificDeleteBatchLimit(SupportsBulkOperations bulkOps) { + var className = bulkOps.getClass().getSimpleName(); + return switch (className) { + // See https://aws.amazon.com/blogs/aws/amazon-s3-multi-object-deletion/ + case "S3FileIO" -> 1000; + // See https://cloud.google.com/storage/docs/batch + case "GCSFileIO" -> 100; + // ADLS limited to 50, because the implementation, as of Iceberg 1.10, uses one thread per + // file to be deleted (no specialized bulk deletion). + case "ADLSFileIO" -> 50; + // Use a reasonable(?) default for all other FileIO implementations. + default -> 50; + }; + } + + private Optional<TableMetadata> readTableMetadataFailsafe(String tableMetadataLocation) { + try { + var inputFile = fileIO.newInputFile(tableMetadataLocation); + return Optional.of(TableMetadataParser.read(inputFile)); + } catch (Exception e) { + LOGGER.warn( + "Failure reading table metadata file {}: {}", tableMetadataLocation, e.toString()); Review Comment: Given that runtime exception threw by `TableMetadataParser.read(inputFile)` is `Failed to read file: filepath`, maybe we should print out the `e`, instead of `e.toString()`. ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import jakarta.annotation.Nonnull; +import java.util.stream.Stream; + +/** + * Object storage file operations, used to find files below a given prefix, to purge files, to + * identify referenced files, etc. + * + * <p>All functions of this interface rather yield incomplete results and continue over throwing + * exceptions. + */ +public interface FileOperations { + /** + * Find files that match the given prefix and filter. + * + * <p>Whether existing but inaccessible files are included in the result depends on the object + * store. + * + * <p>Call sites should consider rate-limiting the scan operations, for example, by using Guava's + * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on + * the returned stream. + * + * @param prefix full object storage URI prefix, including scheme and bucket. + * @param filter file filter + * @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link + * FileSpec#size()} attributes populated with the information provided by the object store. + * The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link + * FileSpec#guessTypeFromName() guessed}. + */ + Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter); + + /** + * Identifies all files referenced by the given table-metadata. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, the returned stream will just not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param tableMetadataLocation Iceberg table-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have to be derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate); + + /** + * Identifies all files referenced by the given view-metadata. + * + * <p>In case "container" files like the metadata are not readable, the returned stream will just + * not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param viewMetadataLocation Iceberg view-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have been derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergViewFiles( + @Nonnull String viewMetadataLocation, boolean deduplicate); + + /** + * Purges all files that are referenced by the given table-metadata, respecting the given filter. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(identifyIcebergTableFiles(tableMetadataLocation).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #identifyIcebergTableFiles(String, boolean) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are within the base location of the given table-metadata, purge only + * files that match the given filter. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(findFiles(tableMetadata.baseLocation()).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTableBaseLocation( Review Comment: What's the use case of this method? I'm not sure we need this method, it is also risky, esp. when location is overlapped cross tables. ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.OptionalDouble; +import java.util.function.Consumer; +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; + +@SuppressWarnings("unused") +@PolarisImmutable +public interface PurgeSpec { + PurgeSpec DEFAULT_INSTANCE = PurgeSpec.builder().build(); + + @Value.Default + default FileFilter fileFilter() { + return FileFilter.alwaysTrue(); Review Comment: Do we need a filter that always return true? The caller(`purgeIcebergTable`) doesn't seem need a filter if it is always true. ########## storage/files/impl/src/main/resources/META-INF/beans.xml: ########## @@ -0,0 +1,24 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<beans xmlns="https://jakarta.ee/xml/ns/jakartaee" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd"> + <!-- File required by Weld (used for testing), not by Quarkus --> Review Comment: Do we need this file? I didn't see Weld was used in any test files. ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeStats.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import java.time.Duration; +import org.apache.polaris.immutables.PolarisImmutable; + +@PolarisImmutable +public interface PurgeStats { + Duration duration(); + + /** + * The number of purged files. + * + * <p>The returned value may be wrong and include non-existing files. + */ + long purgedFiles(); + + /** + * Number of files that were not purged. + * + * <p>The returned value may be wrong and not include non-existing files. Review Comment: Same here ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.OptionalDouble; +import java.util.function.Consumer; +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; + +@SuppressWarnings("unused") +@PolarisImmutable +public interface PurgeSpec { + PurgeSpec DEFAULT_INSTANCE = PurgeSpec.builder().build(); + + @Value.Default + default FileFilter fileFilter() { + return FileFilter.alwaysTrue(); + } + + PurgeSpec withFileFilter(FileFilter fileFilter); Review Comment: Do we need it? Does seem to be used anywhere. ########## storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import com.google.common.collect.Streams; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileOperations; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.FileType; +import org.apache.polaris.storage.files.api.ImmutablePurgeStats; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link + * org.apache.iceberg.io.SupportsBulkOperations} and {@link + * org.apache.iceberg.io.SupportsPrefixOperations}. + */ +record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(FileOperationsImpl.class); + + @Override + public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter) { + var prefixUri = StorageUri.of(prefix).resolve("/"); + if (fileIO instanceof SupportsPrefixOperations prefixOps) { + return Streams.stream(prefixOps.listPrefix(prefix).iterator()) + .filter(Objects::nonNull) + .map( + fileInfo -> { + var location = StorageUri.of(fileInfo.location()); + if (!location.isAbsolute()) { + // ADLSFileIO does _not_ include the prefix, but GCSFileIO and S3FileIO do. + location = prefixUri.resolve(location); + } + return FileSpec.builder() + .location(location.toString()) + .size(fileInfo.size()) + .createdAtMillis(fileInfo.createdAtMillis()) + .build(); + }) + .filter(filter); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting prefix operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @Override + public Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate) { + var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation); + if (metadataOpt.isEmpty()) { + return Stream.empty(); + } + var metadata = metadataOpt.get(); + + var metadataFileSpec = + FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build(); + + var fileSources = new ArrayList<Stream<FileSpec>>(); + + fileSources.add(Stream.of(metadataFileSpec)); + + var statisticsFiles = metadata.statisticsFiles(); + if (statisticsFiles != null) { + fileSources.addFirst( + statisticsFiles.stream() + .map( + statisticsFile -> + FileSpec.fromLocationAndSize( + statisticsFile.path(), statisticsFile.fileSizeInBytes()) + .fileType(FileType.ICEBERG_STATISTICS) + .build())); + } + + var previousFiles = metadata.previousFiles(); + if (previousFiles != null) { + fileSources.add( + previousFiles.stream() + .filter( + metadataLogEntry -> + metadataLogEntry.file() != null && !metadataLogEntry.file().isEmpty()) + .map( + metadataLogEntry -> + FileSpec.fromLocation(metadataLogEntry.file()) + .fileType(FileType.ICEBERG_METADATA) + .build())); + } + + var specsById = metadata.specsById(); + + var addPredicate = deduplicator(deduplicate); + + fileSources.addFirst( + metadata.snapshots().stream() + // Newest snapshots first + .sorted((s1, s2) -> Long.compare(s2.timestampMillis(), s1.timestampMillis())) + .flatMap( + snapshot -> identifyIcebergTableSnapshotFiles(snapshot, specsById, addPredicate))); + + // Return "dependencies" before the "metadata" itself, so the probability of being able to + // resume a failed/aborted purge is higher. + + return fileSources.stream().flatMap(Function.identity()); + } + + static Predicate<String> deduplicator(boolean deduplicate) { + if (!deduplicate) { + return x -> true; + } + var set = new LinkedHashSet<String>(); + return location -> { + synchronized (set) { + if (set.size() > 100_000) { + // limit the heap pressure of the deduplication set to 100,000 elements + set.removeFirst(); + } + return set.add(location); + } + }; + } + + Stream<FileSpec> identifyIcebergTableSnapshotFiles( + @Nonnull Snapshot snapshot, + Map<Integer, PartitionSpec> specsById, + Predicate<String> addPredicate) { + var manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null && !addPredicate.test(manifestListLocation)) { + return Stream.empty(); + } + + return identifyIcebergManifests(manifestListLocation, snapshot, specsById, addPredicate); + } + + Stream<FileSpec> identifyIcebergManifests( + String manifestListLocation, + Snapshot snapshot, + Map<Integer, PartitionSpec> specsById, + Predicate<String> addPredicate) { + + var manifestListFileSpecStream = Stream.<FileSpec>empty(); + + if (manifestListLocation != null && !manifestListLocation.isEmpty()) { + var manifestListFileSpec = + FileSpec.fromLocation(manifestListLocation) + .fileType(FileType.ICEBERG_MANIFEST_LIST) + .build(); + manifestListFileSpecStream = Stream.of(manifestListFileSpec); + } + + try { + var allManifestsFiles = + snapshot.allManifests(fileIO).stream() + .filter(manifestFile -> addPredicate.test(manifestFile.path())) + .flatMap( + manifestFile -> + identifyIcebergManifestDataFiles(manifestFile, specsById, addPredicate)); + + // Return "dependencies" before the "metadata" itself, so a failed/aborted purge can be + // resumed. + return Stream.of(allManifestsFiles, manifestListFileSpecStream).flatMap(Function.identity()); + } catch (Exception e) { + LOGGER.warn("Failure reading manifest list file {}: {}", manifestListLocation, e.toString()); + LOGGER.debug("Failure reading manifest list file {}", manifestListLocation); + return manifestListFileSpecStream; + } + } + + @SuppressWarnings("UnnecessaryDefault") + private Stream<FileSpec> identifyIcebergManifestDataFiles( + ManifestFile manifestFile, + Map<Integer, PartitionSpec> specsById, + Predicate<String> addPredicate) { + + var manifestFileSpec = + FileSpec.fromLocationAndSize(manifestFile.path(), manifestFile.length()) + .fileType(FileType.ICEBERG_MANIFEST_FILE) + .build(); + + try (var contentFilesIter = + switch (manifestFile.content()) { + case DATA -> ManifestFiles.read(manifestFile, fileIO).iterator(); + case DELETES -> + ManifestFiles.readDeleteManifest(manifestFile, fileIO, specsById).iterator(); + default -> { + LOGGER.warn( + "Unsupported content type {} in manifest {}", + manifestFile.content(), + manifestFile.path()); + yield CloseableIterator.<ContentFile<? extends ContentFile<?>>>empty(); + } + }) { + + // Cannot leverage streaming here and eagerly build a list, as the manifest-file reader needs + // to be closed. + var files = new ArrayList<FileSpec>(); + while (contentFilesIter.hasNext()) { + var contentFile = contentFilesIter.next(); + if (addPredicate.test(contentFile.location())) { + files.add( + FileSpec.fromLocationAndSize(contentFile.location(), contentFile.fileSizeInBytes()) + .fileType(FileType.fromContentFile(contentFile)) + .build()); + } + } + // Return "dependencies" before the "metadata" itself, so the probability of being able to + // resume a failed/aborted purge is higher. + files.add(manifestFileSpec); + + return files.stream(); + } catch (IOException e) { + LOGGER.warn("Failure reading manifest file {}: {}", manifestFile.path(), e.toString()); + LOGGER.debug("Failure reading manifest file {}", manifestFile.path(), e); + return Stream.of(manifestFileSpec); + } + } + + @Override + public Stream<FileSpec> identifyIcebergViewFiles( + @Nonnull String viewMetadataLocation, boolean deduplicate) { + var metadataOpt = readViewMetadataFailsafe(viewMetadataLocation); + if (metadataOpt.isEmpty()) { + return Stream.empty(); + } + + var metadataFileSpec = + FileSpec.fromLocation(viewMetadataLocation).fileType(FileType.ICEBERG_METADATA).build(); + + return Stream.of(metadataFileSpec); + } + + @Override + public PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec) { + var files = + identifyIcebergTableFiles(tableMetadataLocation, true).filter(purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergTableBaseLocation( + @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec) { + var metadata = readTableMetadataFailsafe(tableMetadataLocation); + if (metadata.isEmpty()) { + return ImmutablePurgeStats.builder() + .duration(Duration.ZERO) + .purgedFiles(0L) + .failedPurges(1) + .build(); + } + + var baseLocation = metadata.get().location(); + var files = findFiles(baseLocation, purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec) { + var files = + identifyIcebergViewFiles(viewMetadataLocation, false).filter(purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergViewBaseLocation( + @Nonnull String viewMetadataLocation, PurgeSpec purgeSpec) { + var metadata = readViewMetadataFailsafe(viewMetadataLocation); + if (metadata.isEmpty()) { + return ImmutablePurgeStats.builder() + .duration(Duration.ZERO) + .purgedFiles(0L) + .failedPurges(1) + .build(); + } + + var baseLocation = metadata.get().location(); + var files = findFiles(baseLocation, purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purge(@Nonnull Stream<FileSpec> locationStream, PurgeSpec purgeSpec) { + return purgeFiles(locationStream.map(FileSpec::location), purgeSpec); + } + + @Override + public PurgeStats purgeFiles(@Nonnull Stream<String> locationStream, PurgeSpec purgeSpec) { + if (fileIO instanceof SupportsBulkOperations bulkOps) { + var startedNanos = System.nanoTime(); + + var iter = locationStream.iterator(); + + var batcher = new PurgeBatcher(purgeSpec, bulkOps); + while (iter.hasNext()) { + batcher.add(iter.next()); + } + batcher.flush(); + + return ImmutablePurgeStats.builder() + .purgedFiles(batcher.purged) + .failedPurges(batcher.failed) + .duration(Duration.ofNanos(System.nanoTime() - startedNanos)) + .build(); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting bulk operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @SuppressWarnings("UnstableApiUsage") + static final class PurgeBatcher { + private final PurgeSpec purgeSpec; + private final SupportsBulkOperations bulkOps; + + private final int deleteBatchSize; + // Using a `Set` prevents duplicate paths in a single bulk-deletion. + + private final Set<String> batch = new HashSet<>(); + + private final Runnable fileDeleteRateLimiter; + private final Runnable batchDeleteRateLimiter; + + long purged = 0L; + + long failed = 0L; + + PurgeBatcher(PurgeSpec purgeSpec, SupportsBulkOperations bulkOps) { + var implSpecificLimit = implSpecificDeleteBatchLimit(bulkOps); + + this.deleteBatchSize = Math.min(implSpecificLimit, Math.max(purgeSpec.deleteBatchSize(), 1)); + + this.purgeSpec = purgeSpec; + this.bulkOps = bulkOps; + + fileDeleteRateLimiter = createLimiter(purgeSpec.fileDeletesPerSecond()); + batchDeleteRateLimiter = createLimiter(purgeSpec.batchDeletesPerSecond()); + } + + private static Runnable createLimiter(OptionalDouble optionalDouble) { + if (optionalDouble.isEmpty()) { + // unlimited + return () -> {}; + } + var limiter = RateLimiter.create(optionalDouble.getAsDouble()); + return limiter::acquire; + } + + void add(String location) { + fileDeleteRateLimiter.run(); + batch.add(location); + + if (batch.size() >= deleteBatchSize) { + flush(); + } + } + + void flush() { + int size = batch.size(); + if (size > 0) { + batch.forEach(purgeSpec.purgeIssuedCallback()); + try { + batchDeleteRateLimiter.run(); + bulkOps.deleteFiles(batch); + purged += size; + } catch (BulkDeletionFailureException e) { + // Object stores do delete the files that exist, but a BulkDeletionFailureException is + // still being thrown. + // However, not all FileIO implementations behave the same way as some don't throw in the + // non-existent-case. + var batchFailed = e.numberFailedObjects(); + purged += size - batchFailed; + failed += batchFailed; + } finally { + batch.clear(); + } + } + } + } + + /** Figure out the hard-coded max batch size limit for a particular FileIO implementation. */ + static int implSpecificDeleteBatchLimit(SupportsBulkOperations bulkOps) { + var className = bulkOps.getClass().getSimpleName(); + return switch (className) { + // See https://aws.amazon.com/blogs/aws/amazon-s3-multi-object-deletion/ + case "S3FileIO" -> 1000; + // See https://cloud.google.com/storage/docs/batch + case "GCSFileIO" -> 100; + // ADLS limited to 50, because the implementation, as of Iceberg 1.10, uses one thread per + // file to be deleted (no specialized bulk deletion). + case "ADLSFileIO" -> 50; + // Use a reasonable(?) default for all other FileIO implementations. + default -> 50; + }; + } + + private Optional<TableMetadata> readTableMetadataFailsafe(String tableMetadataLocation) { + try { + var inputFile = fileIO.newInputFile(tableMetadataLocation); + return Optional.of(TableMetadataParser.read(inputFile)); + } catch (Exception e) { + LOGGER.warn( + "Failure reading table metadata file {}: {}", tableMetadataLocation, e.toString()); + LOGGER.debug("Failure reading table metadata file {}", tableMetadataLocation, e); + return Optional.empty(); + } + } + + private Optional<ViewMetadata> readViewMetadataFailsafe(String viewMetadataLocation) { + try { + var inputFile = fileIO.newInputFile(viewMetadataLocation); + return Optional.of(ViewMetadataParser.read(inputFile)); + } catch (Exception e) { + LOGGER.warn("Failure reading view metadata file {}: {}", viewMetadataLocation, e.toString()); Review Comment: Same here ########## storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import com.google.common.collect.Streams; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileOperations; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.FileType; +import org.apache.polaris.storage.files.api.ImmutablePurgeStats; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link + * org.apache.iceberg.io.SupportsBulkOperations} and {@link + * org.apache.iceberg.io.SupportsPrefixOperations}. + */ +record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(FileOperationsImpl.class); + + @Override + public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter) { + var prefixUri = StorageUri.of(prefix).resolve("/"); + if (fileIO instanceof SupportsPrefixOperations prefixOps) { + return Streams.stream(prefixOps.listPrefix(prefix).iterator()) + .filter(Objects::nonNull) + .map( + fileInfo -> { + var location = StorageUri.of(fileInfo.location()); + if (!location.isAbsolute()) { + // ADLSFileIO does _not_ include the prefix, but GCSFileIO and S3FileIO do. + location = prefixUri.resolve(location); + } + return FileSpec.builder() + .location(location.toString()) + .size(fileInfo.size()) + .createdAtMillis(fileInfo.createdAtMillis()) + .build(); + }) + .filter(filter); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting prefix operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @Override + public Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate) { + var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation); + if (metadataOpt.isEmpty()) { + return Stream.empty(); + } + var metadata = metadataOpt.get(); + + var metadataFileSpec = + FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build(); + + var fileSources = new ArrayList<Stream<FileSpec>>(); + + fileSources.add(Stream.of(metadataFileSpec)); + + var statisticsFiles = metadata.statisticsFiles(); + if (statisticsFiles != null) { + fileSources.addFirst( + statisticsFiles.stream() + .map( + statisticsFile -> + FileSpec.fromLocationAndSize( + statisticsFile.path(), statisticsFile.fileSizeInBytes()) + .fileType(FileType.ICEBERG_STATISTICS) + .build())); + } + + var previousFiles = metadata.previousFiles(); + if (previousFiles != null) { + fileSources.add( + previousFiles.stream() + .filter( + metadataLogEntry -> + metadataLogEntry.file() != null && !metadataLogEntry.file().isEmpty()) + .map( + metadataLogEntry -> + FileSpec.fromLocation(metadataLogEntry.file()) + .fileType(FileType.ICEBERG_METADATA) + .build())); + } + + var specsById = metadata.specsById(); + + var addPredicate = deduplicator(deduplicate); + + fileSources.addFirst( + metadata.snapshots().stream() + // Newest snapshots first + .sorted((s1, s2) -> Long.compare(s2.timestampMillis(), s1.timestampMillis())) + .flatMap( + snapshot -> identifyIcebergTableSnapshotFiles(snapshot, specsById, addPredicate))); + + // Return "dependencies" before the "metadata" itself, so the probability of being able to + // resume a failed/aborted purge is higher. + + return fileSources.stream().flatMap(Function.identity()); + } + + static Predicate<String> deduplicator(boolean deduplicate) { + if (!deduplicate) { + return x -> true; + } + var set = new LinkedHashSet<String>(); + return location -> { + synchronized (set) { + if (set.size() > 100_000) { + // limit the heap pressure of the deduplication set to 100,000 elements + set.removeFirst(); + } + return set.add(location); + } + }; + } + + Stream<FileSpec> identifyIcebergTableSnapshotFiles( + @Nonnull Snapshot snapshot, + Map<Integer, PartitionSpec> specsById, + Predicate<String> addPredicate) { + var manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null && !addPredicate.test(manifestListLocation)) { + return Stream.empty(); + } + + return identifyIcebergManifests(manifestListLocation, snapshot, specsById, addPredicate); + } + + Stream<FileSpec> identifyIcebergManifests( + String manifestListLocation, + Snapshot snapshot, + Map<Integer, PartitionSpec> specsById, + Predicate<String> addPredicate) { + + var manifestListFileSpecStream = Stream.<FileSpec>empty(); + + if (manifestListLocation != null && !manifestListLocation.isEmpty()) { + var manifestListFileSpec = + FileSpec.fromLocation(manifestListLocation) + .fileType(FileType.ICEBERG_MANIFEST_LIST) + .build(); + manifestListFileSpecStream = Stream.of(manifestListFileSpec); + } + + try { + var allManifestsFiles = + snapshot.allManifests(fileIO).stream() Review Comment: `snapshot.allManifests(fileIO)` loads all ManifestFile entries into memory as a `List<ManifestFile>`. Specifically, it: 1. Reads the manifest list file from storage via `Avro.read(io.newInputFile(manifestListLocation))` 2. Deserializes every entry in that Avro file into ManifestFile objects 3. Collects them all into a LinkedList and returns it It doesn't resolve the issue described here, https://github.com/apache/polaris/issues/2365#issuecomment-3723184230. ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.OptionalDouble; +import java.util.function.Consumer; +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; + +@SuppressWarnings("unused") +@PolarisImmutable +public interface PurgeSpec { Review Comment: What is the class for? Can we add Java doc given it's an interface? ########## storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import com.google.common.collect.Streams; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileOperations; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.FileType; +import org.apache.polaris.storage.files.api.ImmutablePurgeStats; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link + * org.apache.iceberg.io.SupportsBulkOperations} and {@link + * org.apache.iceberg.io.SupportsPrefixOperations}. + */ +record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(FileOperationsImpl.class); + + @Override + public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter) { + var prefixUri = StorageUri.of(prefix).resolve("/"); + if (fileIO instanceof SupportsPrefixOperations prefixOps) { + return Streams.stream(prefixOps.listPrefix(prefix).iterator()) + .filter(Objects::nonNull) + .map( + fileInfo -> { + var location = StorageUri.of(fileInfo.location()); + if (!location.isAbsolute()) { + // ADLSFileIO does _not_ include the prefix, but GCSFileIO and S3FileIO do. + location = prefixUri.resolve(location); + } + return FileSpec.builder() + .location(location.toString()) + .size(fileInfo.size()) + .createdAtMillis(fileInfo.createdAtMillis()) + .build(); + }) + .filter(filter); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting prefix operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @Override + public Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate) { + var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation); Review Comment: we might just called `metadataOptional` ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileSpec.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.polaris.immutables.PolarisImmutable; + +/** + * Describes a single file/object in an object storage. + * + * <p>Not all attributes are populated by every {@link FileOperations} function. + */ +@PolarisImmutable +public interface FileSpec { + + /** The full object storage URI. */ + String location(); + + /** + * The type of the file, if known, as a convenience hint. + * + * <p>The type might have been guessed from the file name, in which case the returned value is not + * guaranteed to be accurate. + * + * @see #guessTypeFromName() + */ + Optional<FileType> fileType(); + + /** The size of the file in bytes, if available. */ + OptionalLong size(); + + /** The creation timestamp in milliseconds since the epoch, if available. */ + OptionalLong createdAtMillis(); + + static Builder builder() { + return ImmutableFileSpec.builder(); + } + + static Builder fromLocation(String location) { + return builder().location(location); + } + + static Builder fromLocationAndSize(String location, long size) { Review Comment: Do we need size information for purging table? ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeStats.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import java.time.Duration; +import org.apache.polaris.immutables.PolarisImmutable; + +@PolarisImmutable +public interface PurgeStats { + Duration duration(); Review Comment: What is it used for? ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.OptionalDouble; +import java.util.function.Consumer; +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; + +@SuppressWarnings("unused") +@PolarisImmutable +public interface PurgeSpec { + PurgeSpec DEFAULT_INSTANCE = PurgeSpec.builder().build(); + + @Value.Default + default FileFilter fileFilter() { + return FileFilter.alwaysTrue(); + } + + PurgeSpec withFileFilter(FileFilter fileFilter); + + /** + * Delete batch size for purge/batch-deletion operations. Implementations may opt to ignore this + * parameter and enforce a reasonable or required different limit. + */ + @Value.Default + default int deleteBatchSize() { + return 250; Review Comment: Nit: can we make a constant for this? ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import jakarta.annotation.Nonnull; +import java.util.stream.Stream; + +/** + * Object storage file operations, used to find files below a given prefix, to purge files, to + * identify referenced files, etc. + * + * <p>All functions of this interface rather yield incomplete results and continue over throwing + * exceptions. + */ +public interface FileOperations { + /** + * Find files that match the given prefix and filter. + * + * <p>Whether existing but inaccessible files are included in the result depends on the object + * store. + * + * <p>Call sites should consider rate-limiting the scan operations, for example, by using Guava's + * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on + * the returned stream. + * + * @param prefix full object storage URI prefix, including scheme and bucket. + * @param filter file filter + * @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link + * FileSpec#size()} attributes populated with the information provided by the object store. + * The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link + * FileSpec#guessTypeFromName() guessed}. + */ + Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter); + + /** + * Identifies all files referenced by the given table-metadata. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, the returned stream will just not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param tableMetadataLocation Iceberg table-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have to be derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate); + + /** + * Identifies all files referenced by the given view-metadata. + * + * <p>In case "container" files like the metadata are not readable, the returned stream will just + * not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param viewMetadataLocation Iceberg view-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have been derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergViewFiles( + @Nonnull String viewMetadataLocation, boolean deduplicate); + + /** + * Purges all files that are referenced by the given table-metadata, respecting the given filter. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(identifyIcebergTableFiles(tableMetadataLocation).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #identifyIcebergTableFiles(String, boolean) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are within the base location of the given table-metadata, purge only + * files that match the given filter. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(findFiles(tableMetadata.baseLocation()).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTableBaseLocation( + @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are referenced by the given view-metadata, respecting the given filter. + * + * <p>In case "container" files like the metadata are not readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(identifyIcebergViewFiles(tableMetadataLocation).filter(fileFilter))} + * + * @see #purge(Stream, PurgeSpec) + * @see #identifyIcebergViewFiles(String, boolean) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are within the base location of the given view-metadata, purge only files + * that match the given filter. + * + * <p>In case "container" files like the metadata are not readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(findFiles(viewMetadata.baseLocation()).filter(fileFilter))} + * + * @see #purge(Stream, PurgeSpec) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergViewBaseLocation( + @Nonnull String viewMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that match the given stream of locations. The {@link Stream} will be fully + * consumed. + * + * <p>This is a convenience for {@link #purgeFiles(Stream, PurgeSpec) + * purgeFiles(locationStream.map(FileSpec::location))} + */ + PurgeStats purge(@Nonnull Stream<FileSpec> locationStream, PurgeSpec purgeSpec); Review Comment: Do we need it given this method is basically exactly the same as `purgeFiles`? ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeStats.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import java.time.Duration; +import org.apache.polaris.immutables.PolarisImmutable; + +@PolarisImmutable +public interface PurgeStats { + Duration duration(); + + /** + * The number of purged files. + * + * <p>The returned value may be wrong and include non-existing files. Review Comment: Do we need a number that could be wrong? What is the number for ? ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import jakarta.annotation.Nonnull; +import java.util.stream.Stream; + +/** + * Object storage file operations, used to find files below a given prefix, to purge files, to + * identify referenced files, etc. + * + * <p>All functions of this interface rather yield incomplete results and continue over throwing + * exceptions. + */ +public interface FileOperations { + /** + * Find files that match the given prefix and filter. + * + * <p>Whether existing but inaccessible files are included in the result depends on the object + * store. + * + * <p>Call sites should consider rate-limiting the scan operations, for example, by using Guava's + * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on + * the returned stream. + * + * @param prefix full object storage URI prefix, including scheme and bucket. + * @param filter file filter + * @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link + * FileSpec#size()} attributes populated with the information provided by the object store. + * The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link + * FileSpec#guessTypeFromName() guessed}. + */ + Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter); + + /** + * Identifies all files referenced by the given table-metadata. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, the returned stream will just not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param tableMetadataLocation Iceberg table-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not Review Comment: Agreed with @RussellSpitzer that `dedup` is not needed. It adds overhead and unnecessary complexity without clear benefits, especially since deleting a non existing file is effectively a no op. ########## storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import com.google.common.collect.Streams; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileOperations; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.FileType; +import org.apache.polaris.storage.files.api.ImmutablePurgeStats; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link + * org.apache.iceberg.io.SupportsBulkOperations} and {@link + * org.apache.iceberg.io.SupportsPrefixOperations}. + */ +record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(FileOperationsImpl.class); + + @Override + public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter) { + var prefixUri = StorageUri.of(prefix).resolve("/"); + if (fileIO instanceof SupportsPrefixOperations prefixOps) { + return Streams.stream(prefixOps.listPrefix(prefix).iterator()) + .filter(Objects::nonNull) + .map( + fileInfo -> { + var location = StorageUri.of(fileInfo.location()); + if (!location.isAbsolute()) { + // ADLSFileIO does _not_ include the prefix, but GCSFileIO and S3FileIO do. + location = prefixUri.resolve(location); + } + return FileSpec.builder() + .location(location.toString()) + .size(fileInfo.size()) + .createdAtMillis(fileInfo.createdAtMillis()) + .build(); + }) + .filter(filter); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting prefix operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @Override + public Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate) { + var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation); + if (metadataOpt.isEmpty()) { + return Stream.empty(); + } + var metadata = metadataOpt.get(); + + var metadataFileSpec = + FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build(); + + var fileSources = new ArrayList<Stream<FileSpec>>(); + + fileSources.add(Stream.of(metadataFileSpec)); + + var statisticsFiles = metadata.statisticsFiles(); + if (statisticsFiles != null) { + fileSources.addFirst( + statisticsFiles.stream() + .map( + statisticsFile -> + FileSpec.fromLocationAndSize( + statisticsFile.path(), statisticsFile.fileSizeInBytes()) + .fileType(FileType.ICEBERG_STATISTICS) + .build())); + } + + var previousFiles = metadata.previousFiles(); + if (previousFiles != null) { + fileSources.add( + previousFiles.stream() + .filter( + metadataLogEntry -> + metadataLogEntry.file() != null && !metadataLogEntry.file().isEmpty()) + .map( + metadataLogEntry -> + FileSpec.fromLocation(metadataLogEntry.file()) + .fileType(FileType.ICEBERG_METADATA) + .build())); + } + + var specsById = metadata.specsById(); + + var addPredicate = deduplicator(deduplicate); Review Comment: Do we need a deduplicator here? Given that 1. delete a non-existing file is a no-op. 2. This deduplicator is limited by 100000 items. There is a chance that duplication still happens. ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileType.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import org.apache.iceberg.ContentFile; + +public enum FileType { + UNKNOWN(false, false), + ICEBERG_METADATA(true, false), + ICEBERG_STATISTICS(false, false), + ICEBERG_MANIFEST_LIST(false, false), + ICEBERG_MANIFEST_FILE(false, false), + ICEBERG_DATA_FILE(false, true), + ICEBERG_DELETE_FILE(false, true), + ; + + private final boolean metadata; + private final boolean dataOrDelete; + + FileType(boolean metadata, boolean dataOrDelete) { + this.metadata = metadata; + this.dataOrDelete = dataOrDelete; + } + + @SuppressWarnings("UnnecessaryDefault") + public static FileType fromContentFile(ContentFile<? extends ContentFile<?>> contentFile) { + return switch (contentFile.content()) { + case DATA -> ICEBERG_DATA_FILE; + case EQUALITY_DELETES, POSITION_DELETES -> ICEBERG_DELETE_FILE; + default -> UNKNOWN; + }; + } + + public boolean metadata() { + return metadata; + } + + public boolean dataOrDelete() { Review Comment: Can we call it content file instead? Content file should be a bit more general, for example, if there are another type of content file is introduced later, we don't have to modify this part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
