This is an automated email from the ASF dual-hosted git repository. xuba pushed a commit to branch bulk-ops in repository https://gitbox.apache.org/repos/asf/incubator-amoro.git
commit c5e60a8e02c54dbfd15a0bae063b43b6f099c342 Author: Xavier Bai <[email protected]> AuthorDate: Tue Apr 2 10:18:07 2024 +0800 [WAP] batch delete files when expire snapshots and clean orphan files --- .../maintainer/IcebergTableMaintainer.java | 68 +++++++++++----------- .../java/com/netease/arctic/io/ArcticFileIO.java | 15 +++++ .../com/netease/arctic/io/ArcticFileIOAdapter.java | 12 ++++ .../com/netease/arctic/io/ArcticHadoopFileIO.java | 15 +++++ .../com/netease/arctic/utils/TableFileUtil.java | 54 +++++++++++++++++ 5 files changed, 131 insertions(+), 33 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java index ae9f181e0..c072f4253 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -175,36 +175,35 @@ public class IcebergTableMaintainer implements TableMaintainer { private void expireSnapshots(long olderThan, Set<String> exclude) { LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude); final AtomicInteger toDeleteFiles = new AtomicInteger(0); - final AtomicInteger deleteFiles = new AtomicInteger(0); - Set<String> parentDirectory = new HashSet<>(); + Set<String> parentDirectories = new HashSet<>(); + Set<String> expiredFiles = new HashSet<>(); table .expireSnapshots() .retainLast(1) .expireOlderThan(olderThan) .deleteWith( file -> { - try { - if (exclude.isEmpty()) { - arcticFileIO().deleteFile(file); - } else { - String fileUriPath = TableFileUtil.getUriPath(file); - if (!exclude.contains(fileUriPath) - && !exclude.contains(new Path(fileUriPath).getParent().toString())) { - arcticFileIO().deleteFile(file); - } + if (exclude.isEmpty()) { + expiredFiles.add(file); + } else { + String fileUriPath = TableFileUtil.getUriPath(file); + if (!exclude.contains(fileUriPath) + && !exclude.contains(new Path(fileUriPath).getParent().toString())) { + expiredFiles.add(file); } - parentDirectory.add(new Path(file).getParent().toString()); - deleteFiles.incrementAndGet(); - } catch (Throwable t) { - LOG.warn("failed to delete file " + file, t); - } finally { - toDeleteFiles.incrementAndGet(); } + + parentDirectories.add(new Path(file).getParent().toString()); + toDeleteFiles.incrementAndGet(); }) - .cleanExpiredFiles(true) + .cleanExpiredFiles( + true) /* enable clean only for collecting the expired files, will delete them later */ .commit(); - parentDirectory.forEach( + // try to batch delete files + int deletedFiles = TableFileUtil.deleteFiles(table.name(), arcticFileIO(), expiredFiles, true); + + parentDirectories.forEach( parent -> { try { TableFileUtil.deleteEmptyDirectory(arcticFileIO(), parent, exclude); @@ -218,7 +217,7 @@ public class IcebergTableMaintainer implements TableMaintainer { "to delete {} files in {}, success delete {} files", toDeleteFiles.get(), getTable().name(), - deleteFiles.get()); + deletedFiles); } @Override @@ -473,7 +472,7 @@ public class IcebergTableMaintainer implements TableMaintainer { return snapshot.map(Snapshot::timestampMillis).orElse(Long.MAX_VALUE); } - private static int deleteInvalidFilesInFs( + private int deleteInvalidFilesInFs( SupportsFileSystemOperations fio, String location, long lastTime, Set<String> excludes) { if (!fio.exists(location)) { return 0; @@ -495,28 +494,31 @@ public class IcebergTableMaintainer implements TableMaintainer { } else { String parentLocation = TableFileUtil.getParent(p.location()); String parentUriPath = TableFileUtil.getUriPath(parentLocation); + Set<String> filesToDelete = new HashSet<>(); if (!excludes.contains(uriPath) && !excludes.contains(parentUriPath) && p.createdAtMillis() < lastTime) { - fio.deleteFile(p.location()); - deleteCount += 1; + filesToDelete.add(p.location()); } + int deletedCnt = + TableFileUtil.deleteFiles(table.name(), arcticFileIO(), filesToDelete, false); + deleteCount += deletedCnt; } } return deleteCount; } - private static int deleteInvalidFilesByPrefix( + private int deleteInvalidFilesByPrefix( SupportsPrefixOperations pio, String prefix, long lastTime, Set<String> excludes) { - int deleteCount = 0; + Set<String> filesToDelete = new HashSet<>(); for (FileInfo fileInfo : pio.listPrefix(prefix)) { String uriPath = TableFileUtil.getUriPath(fileInfo.location()); if (!excludes.contains(uriPath) && fileInfo.createdAtMillis() < lastTime) { - pio.deleteFile(fileInfo.location()); - deleteCount += 1; + filesToDelete.add(fileInfo.location()); } } - return deleteCount; + + return TableFileUtil.deleteFiles(table.name(), arcticFileIO(), filesToDelete, false); } private static Set<String> getValidMetadataFiles(Table internalTable) { @@ -575,24 +577,24 @@ public class IcebergTableMaintainer implements TableMaintainer { return null; } - private static int deleteInvalidMetadataFile( + private int deleteInvalidMetadataFile( SupportsPrefixOperations pio, String location, long lastTime, Set<String> exclude, Pattern excludeRegex) { - int count = 0; + Set<String> filesToDelete = new HashSet<>(); for (FileInfo fileInfo : pio.listPrefix(location)) { String uriPath = TableFileUtil.getUriPath(fileInfo.location()); if (!exclude.contains(uriPath) && fileInfo.createdAtMillis() < lastTime && (excludeRegex == null || !excludeRegex.matcher(TableFileUtil.getFileName(fileInfo.location())).matches())) { - pio.deleteFile(fileInfo.location()); - count += 1; + filesToDelete.add(fileInfo.location()); } } - return count; + + return TableFileUtil.deleteFiles(table.name(), arcticFileIO(), filesToDelete, false); } private static String formatTime(long timestamp) { diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java index 3f3677855..c3b97deec 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java @@ -20,6 +20,7 @@ package com.netease.arctic.io; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsPrefixOperations; import java.util.concurrent.Callable; @@ -61,6 +62,20 @@ public interface ArcticFileIO extends FileIO { } } + /** Determine if the fileIO supports bulk operations. */ + default boolean supportBulkOperations() { + return false; + } + + /** Return this fileIO as a {@link SupportsBulkOperations} if it is an instance of that type. */ + default SupportsBulkOperations asBulkFileIO() { + if (supportBulkOperations()) { + return (SupportsBulkOperations) this; + } else { + throw new IllegalStateException("Doesn't support bulk operations"); + } + } + /** Returns true if this tableIo is an {@link SupportsFileSystemOperations} */ default boolean supportFileSystemOperations() { return false; diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java b/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java index 201845ee4..78a1507b8 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java @@ -21,6 +21,7 @@ package com.netease.arctic.io; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -67,6 +68,17 @@ public class ArcticFileIOAdapter implements ArcticFileIO { return (SupportsPrefixOperations) io; } + @Override + public boolean supportBulkOperations() { + return io instanceof SupportsBulkOperations; + } + + @Override + public SupportsBulkOperations asBulkFileIO() { + Preconditions.checkArgument(supportBulkOperations()); + return (SupportsBulkOperations) io; + } + @Override public boolean supportFileSystemOperations() { return io instanceof ArcticFileIO && ((ArcticFileIO) io).supportFileSystemOperations(); diff --git a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java index 6f11a23fb..26297b8da 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -234,11 +235,25 @@ public class ArcticHadoopFileIO extends HadoopFileIO }); } + @Override + public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException { + tableMetaStore.doAs( + () -> { + super.deleteFiles(pathsToDelete); + return null; + }); + } + @Override public boolean supportPrefixOperations() { return true; } + @Override + public boolean supportBulkOperations() { + return true; + } + @Override public boolean supportFileSystemOperations() { return true; diff --git a/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java b/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java index 84053d28a..3845d5b33 100644 --- a/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java +++ b/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java @@ -20,12 +20,16 @@ package com.netease.arctic.utils; import com.netease.arctic.io.ArcticFileIO; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.net.URI; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; public class TableFileUtil { private static final Logger LOG = LoggerFactory.getLogger(TableFileUtil.class); @@ -80,6 +84,56 @@ public class TableFileUtil { } } + /** + * Helper to delete files. Bulk deletion is used if possible. + * + * @param tableName table name + * @param io arctic file io + * @param files files to delete + * @param concurrent controls concurrent deletion. Only applicable for non-bulk FileIO + * @return deleted file count + */ + public static int deleteFiles( + String tableName, ArcticFileIO io, Set<String> files, boolean concurrent) { + AtomicInteger failedFileCnt = new AtomicInteger(0); + if (io.supportBulkOperations()) { + try { + io.asBulkFileIO().deleteFiles(files); + } catch (BulkDeletionFailureException e) { + failedFileCnt.set(e.numberFailedObjects()); + LOG.warn("Failed to bulk delete {} files in {}", e.numberFailedObjects(), tableName); + } catch (RuntimeException e) { + failedFileCnt.set(files.size()); + LOG.warn("Failed to bulk delete files in {}", tableName, e); + } + } else { + if (concurrent) { + Tasks.foreach(files) + .executeWith(ThreadPools.getWorkerPool()) + .noRetry() + .suppressFailureWhenFinished() + .onFailure( + (file, exc) -> { + failedFileCnt.addAndGet(1); + LOG.warn("Failed to delete file {}", file, exc); + }) + .run(io::deleteFile); + } else { + files.forEach( + f -> { + try { + io.deleteFile(f); + } catch (RuntimeException e) { + failedFileCnt.addAndGet(1); + LOG.warn("Failed to delete file {}", f, e); + } + }); + } + } + + return files.size() - failedFileCnt.get(); + } + /** * Get the file path after move file to target directory *
