pingtimeout commented on code in PR #3256: URL: https://github.com/apache/polaris/pull/3256#discussion_r2665464698
########## storage/files/impl/src/testFixtures/resources/logback-test.xml: ########## @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + + Copyright (C) 2020 Dremio Review Comment: This should be the ASF header. ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import jakarta.annotation.Nonnull; +import java.util.stream.Stream; + +/** + * Object storage file operations, used to find files below a given prefix, to purge files, to + * identify referenced files, etc. + * + * <p>All functions of this interface rather yield incomplete results and continue over throwing + * exceptions. + */ +public interface FileOperations { + /** + * Find files that match the given prefix and filter. + * + * <p>Whether existing but inaccessible files are included in the result depends on the object + * store. + * + * <p>Call sites should consider rate-limiting the scan operations, for example, by using Guava's + * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on + * the returned stream. + * + * @param prefix full object storage URI prefix, including scheme and bucket. + * @param filter file filter + * @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link + * FileSpec#size()} attributes populated with the information provided by the object store. + * The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link + * FileSpec#guessTypeFromName() guessed}. + */ + Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter); + + /** + * Identifies all files referenced by the given table-metadata. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, the returned stream will just not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param tableMetadataLocation Iceberg table-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have to be derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate); + + /** + * Identifies all files referenced by the given view-metadata. + * + * <p>In case "container" files like the metadata are not readable, the returned stream will just + * not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param viewMetadataLocation Iceberg view-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have been derived from user-provided Review Comment: "as it would have been" -> "as it would have to be" ? ########## storage/files/impl/build.gradle.kts: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("polaris-server") + id("org.kordamp.gradle.jandex") +} + +dependencies { + implementation(project(":polaris-storage-files-api")) + + implementation(platform(libs.iceberg.bom)) + implementation("org.apache.iceberg:iceberg-api") + implementation("org.apache.iceberg:iceberg-core") + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-annotations") + implementation("com.fasterxml.jackson.core:jackson-core") + implementation("com.fasterxml.jackson.core:jackson-databind") + implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-smile") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-guava") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jdk8") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") + + implementation(libs.guava) + implementation(libs.slf4j.api) + + implementation(libs.jakarta.inject.api) + implementation(libs.jakarta.validation.api) + implementation(libs.jakarta.inject.api) Review Comment: This line appears twice, once of the `implementation(libs.jakarta.inject.api)` should be removed. ########## storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import com.google.common.collect.Streams; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileOperations; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.FileType; +import org.apache.polaris.storage.files.api.ImmutablePurgeStats; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.projectnessie.storage.uri.StorageUri; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link + * org.apache.iceberg.io.SupportsBulkOperations} and {@link + * org.apache.iceberg.io.SupportsPrefixOperations}. + */ +record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations { Review Comment: _nit_ @snazy wouldn't it be better to have the validation performed at compile time instead of at runtime? Something like: ``` record FileOperationsImpl<T extends FileIO & SupportsBulkOperations & SupportsPrefixOperations>(@Nonnull T fileIO) implements FileOperations { ... } ``` The point here is that the Javadoc states that the given instance must implement both interfaces (in all cases). But the implementation is a bit more relaxed as the implementation of this or that interface is only enforced if the client calls the methods that need it. I wonder whether a compiled-time check would be better (more consistent). It would also remove the need for a `instanceof` with a type cast. ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import jakarta.annotation.Nonnull; +import java.util.stream.Stream; + +/** + * Object storage file operations, used to find files below a given prefix, to purge files, to + * identify referenced files, etc. + * + * <p>All functions of this interface rather yield incomplete results and continue over throwing + * exceptions. + */ +public interface FileOperations { + /** + * Find files that match the given prefix and filter. + * + * <p>Whether existing but inaccessible files are included in the result depends on the object + * store. + * + * <p>Call sites should consider rate-limiting the scan operations, for example, by using Guava's + * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on + * the returned stream. + * + * @param prefix full object storage URI prefix, including scheme and bucket. + * @param filter file filter + * @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link + * FileSpec#size()} attributes populated with the information provided by the object store. + * The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link + * FileSpec#guessTypeFromName() guessed}. + */ + Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter); + + /** + * Identifies all files referenced by the given table-metadata. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, the returned stream will just not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param tableMetadataLocation Iceberg table-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have to be derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate); + + /** + * Identifies all files referenced by the given view-metadata. + * + * <p>In case "container" files like the metadata are not readable, the returned stream will just + * not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param viewMetadataLocation Iceberg view-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have been derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergViewFiles( + @Nonnull String viewMetadataLocation, boolean deduplicate); + + /** + * Purges all files that are referenced by the given table-metadata, respecting the given filter. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(identifyIcebergTableFiles(tableMetadataLocation).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #identifyIcebergTableFiles(String, boolean) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are within the base location of the given table-metadata, purge only + * files that match the given filter. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(findFiles(tableMetadata.baseLocation()).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTableBaseLocation( + @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are referenced by the given view-metadata, respecting the given filter. * Review Comment: ```suggestion * Purges all files that are referenced by the given view-metadata, respecting the given filter. ``` ########## storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import jakarta.annotation.Nonnull; +import java.util.stream.Stream; + +/** + * Object storage file operations, used to find files below a given prefix, to purge files, to + * identify referenced files, etc. + * + * <p>All functions of this interface rather yield incomplete results and continue over throwing + * exceptions. + */ +public interface FileOperations { + /** + * Find files that match the given prefix and filter. + * + * <p>Whether existing but inaccessible files are included in the result depends on the object + * store. + * + * <p>Call sites should consider rate-limiting the scan operations, for example, by using Guava's + * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on + * the returned stream. + * + * @param prefix full object storage URI prefix, including scheme and bucket. + * @param filter file filter + * @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link + * FileSpec#size()} attributes populated with the information provided by the object store. + * The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link + * FileSpec#guessTypeFromName() guessed}. + */ + Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter); + + /** + * Identifies all files referenced by the given table-metadata. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, the returned stream will just not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param tableMetadataLocation Iceberg table-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have to be derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate); + + /** + * Identifies all files referenced by the given view-metadata. + * + * <p>In case "container" files like the metadata are not readable, the returned stream will just + * not include those. + * + * <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param viewMetadataLocation Iceberg view-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have been derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream<FileSpec> identifyIcebergViewFiles( + @Nonnull String viewMetadataLocation, boolean deduplicate); + + /** + * Purges all files that are referenced by the given table-metadata, respecting the given filter. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(identifyIcebergTableFiles(tableMetadataLocation).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #identifyIcebergTableFiles(String, boolean) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are within the base location of the given table-metadata, purge only + * files that match the given filter. + * + * <p>In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(findFiles(tableMetadata.baseLocation()).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTableBaseLocation( + @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are referenced by the given view-metadata, respecting the given filter. * + * + * <p>In case "container" files like the metadata are not readable, those files are just ignored. + * + * <p>This is effectively a convenience for {@code + * purge(identifyIcebergViewFiles(tableMetadataLocation).filter(fileFilter))} + * + * @see #purge(Stream, PurgeSpec) + * @see #identifyIcebergViewFiles(String, boolean) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are within the base location of the given view-metadata, purge only files + * that match the given filter. * Review Comment: ```suggestion * that match the given filter. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
