This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch bulk-ops
in repository https://gitbox.apache.org/repos/asf/incubator-amoro.git

commit c5e60a8e02c54dbfd15a0bae063b43b6f099c342
Author: Xavier Bai <[email protected]>
AuthorDate: Tue Apr 2 10:18:07 2024 +0800

    [WAP] batch delete files when expire snapshots and clean orphan files
---
 .../maintainer/IcebergTableMaintainer.java         | 68 +++++++++++-----------
 .../java/com/netease/arctic/io/ArcticFileIO.java   | 15 +++++
 .../com/netease/arctic/io/ArcticFileIOAdapter.java | 12 ++++
 .../com/netease/arctic/io/ArcticHadoopFileIO.java  | 15 +++++
 .../com/netease/arctic/utils/TableFileUtil.java    | 54 +++++++++++++++++
 5 files changed, 131 insertions(+), 33 deletions(-)

diff --git 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java
 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java
index ae9f181e0..c072f4253 100644
--- 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java
+++ 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -175,36 +175,35 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
   private void expireSnapshots(long olderThan, Set<String> exclude) {
     LOG.debug("start expire snapshots older than {}, the exclude is {}", 
olderThan, exclude);
     final AtomicInteger toDeleteFiles = new AtomicInteger(0);
-    final AtomicInteger deleteFiles = new AtomicInteger(0);
-    Set<String> parentDirectory = new HashSet<>();
+    Set<String> parentDirectories = new HashSet<>();
+    Set<String> expiredFiles = new HashSet<>();
     table
         .expireSnapshots()
         .retainLast(1)
         .expireOlderThan(olderThan)
         .deleteWith(
             file -> {
-              try {
-                if (exclude.isEmpty()) {
-                  arcticFileIO().deleteFile(file);
-                } else {
-                  String fileUriPath = TableFileUtil.getUriPath(file);
-                  if (!exclude.contains(fileUriPath)
-                      && !exclude.contains(new 
Path(fileUriPath).getParent().toString())) {
-                    arcticFileIO().deleteFile(file);
-                  }
+              if (exclude.isEmpty()) {
+                expiredFiles.add(file);
+              } else {
+                String fileUriPath = TableFileUtil.getUriPath(file);
+                if (!exclude.contains(fileUriPath)
+                    && !exclude.contains(new 
Path(fileUriPath).getParent().toString())) {
+                  expiredFiles.add(file);
                 }
-                parentDirectory.add(new Path(file).getParent().toString());
-                deleteFiles.incrementAndGet();
-              } catch (Throwable t) {
-                LOG.warn("failed to delete file " + file, t);
-              } finally {
-                toDeleteFiles.incrementAndGet();
               }
+
+              parentDirectories.add(new Path(file).getParent().toString());
+              toDeleteFiles.incrementAndGet();
             })
-        .cleanExpiredFiles(true)
+        .cleanExpiredFiles(
+            true) /* enable clean only for collecting the expired files, will 
delete them later */
         .commit();
 
-    parentDirectory.forEach(
+    // try to batch delete files
+    int deletedFiles = TableFileUtil.deleteFiles(table.name(), arcticFileIO(), 
expiredFiles, true);
+
+    parentDirectories.forEach(
         parent -> {
           try {
             TableFileUtil.deleteEmptyDirectory(arcticFileIO(), parent, 
exclude);
@@ -218,7 +217,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
         "to delete {} files in {}, success delete {} files",
         toDeleteFiles.get(),
         getTable().name(),
-        deleteFiles.get());
+        deletedFiles);
   }
 
   @Override
@@ -473,7 +472,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     return snapshot.map(Snapshot::timestampMillis).orElse(Long.MAX_VALUE);
   }
 
-  private static int deleteInvalidFilesInFs(
+  private int deleteInvalidFilesInFs(
       SupportsFileSystemOperations fio, String location, long lastTime, 
Set<String> excludes) {
     if (!fio.exists(location)) {
       return 0;
@@ -495,28 +494,31 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
       } else {
         String parentLocation = TableFileUtil.getParent(p.location());
         String parentUriPath = TableFileUtil.getUriPath(parentLocation);
+        Set<String> filesToDelete = new HashSet<>();
         if (!excludes.contains(uriPath)
             && !excludes.contains(parentUriPath)
             && p.createdAtMillis() < lastTime) {
-          fio.deleteFile(p.location());
-          deleteCount += 1;
+          filesToDelete.add(p.location());
         }
+        int deletedCnt =
+            TableFileUtil.deleteFiles(table.name(), arcticFileIO(), 
filesToDelete, false);
+        deleteCount += deletedCnt;
       }
     }
     return deleteCount;
   }
 
-  private static int deleteInvalidFilesByPrefix(
+  private int deleteInvalidFilesByPrefix(
       SupportsPrefixOperations pio, String prefix, long lastTime, Set<String> 
excludes) {
-    int deleteCount = 0;
+    Set<String> filesToDelete = new HashSet<>();
     for (FileInfo fileInfo : pio.listPrefix(prefix)) {
       String uriPath = TableFileUtil.getUriPath(fileInfo.location());
       if (!excludes.contains(uriPath) && fileInfo.createdAtMillis() < 
lastTime) {
-        pio.deleteFile(fileInfo.location());
-        deleteCount += 1;
+        filesToDelete.add(fileInfo.location());
       }
     }
-    return deleteCount;
+
+    return TableFileUtil.deleteFiles(table.name(), arcticFileIO(), 
filesToDelete, false);
   }
 
   private static Set<String> getValidMetadataFiles(Table internalTable) {
@@ -575,24 +577,24 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     return null;
   }
 
-  private static int deleteInvalidMetadataFile(
+  private int deleteInvalidMetadataFile(
       SupportsPrefixOperations pio,
       String location,
       long lastTime,
       Set<String> exclude,
       Pattern excludeRegex) {
-    int count = 0;
+    Set<String> filesToDelete = new HashSet<>();
     for (FileInfo fileInfo : pio.listPrefix(location)) {
       String uriPath = TableFileUtil.getUriPath(fileInfo.location());
       if (!exclude.contains(uriPath)
           && fileInfo.createdAtMillis() < lastTime
           && (excludeRegex == null
               || 
!excludeRegex.matcher(TableFileUtil.getFileName(fileInfo.location())).matches()))
 {
-        pio.deleteFile(fileInfo.location());
-        count += 1;
+        filesToDelete.add(fileInfo.location());
       }
     }
-    return count;
+
+    return TableFileUtil.deleteFiles(table.name(), arcticFileIO(), 
filesToDelete, false);
   }
 
   private static String formatTime(long timestamp) {
diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java 
b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java
index 3f3677855..c3b97deec 100644
--- a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java
+++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java
@@ -20,6 +20,7 @@ package com.netease.arctic.io;
 
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.io.SupportsPrefixOperations;
 
 import java.util.concurrent.Callable;
@@ -61,6 +62,20 @@ public interface ArcticFileIO extends FileIO {
     }
   }
 
+  /** Determine if the fileIO supports bulk operations. */
+  default boolean supportBulkOperations() {
+    return false;
+  }
+
+  /** Return this fileIO as a {@link SupportsBulkOperations} if it is an 
instance of that type. */
+  default SupportsBulkOperations asBulkFileIO() {
+    if (supportBulkOperations()) {
+      return (SupportsBulkOperations) this;
+    } else {
+      throw new IllegalStateException("Doesn't support bulk operations");
+    }
+  }
+
   /** Returns true if this tableIo is an {@link SupportsFileSystemOperations} 
*/
   default boolean supportFileSystemOperations() {
     return false;
diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java 
b/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java
index 201845ee4..78a1507b8 100644
--- a/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java
+++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java
@@ -21,6 +21,7 @@ package com.netease.arctic.io;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.io.SupportsPrefixOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
@@ -67,6 +68,17 @@ public class ArcticFileIOAdapter implements ArcticFileIO {
     return (SupportsPrefixOperations) io;
   }
 
+  @Override
+  public boolean supportBulkOperations() {
+    return io instanceof SupportsBulkOperations;
+  }
+
+  @Override
+  public SupportsBulkOperations asBulkFileIO() {
+    Preconditions.checkArgument(supportBulkOperations());
+    return (SupportsBulkOperations) io;
+  }
+
   @Override
   public boolean supportFileSystemOperations() {
     return io instanceof ArcticFileIO && ((ArcticFileIO) 
io).supportFileSystemOperations();
diff --git a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java 
b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java
index 6f11a23fb..26297b8da 100644
--- a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java
+++ b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
@@ -234,11 +235,25 @@ public class ArcticHadoopFileIO extends HadoopFileIO
         });
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    tableMetaStore.doAs(
+        () -> {
+          super.deleteFiles(pathsToDelete);
+          return null;
+        });
+  }
+
   @Override
   public boolean supportPrefixOperations() {
     return true;
   }
 
+  @Override
+  public boolean supportBulkOperations() {
+    return true;
+  }
+
   @Override
   public boolean supportFileSystemOperations() {
     return true;
diff --git a/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java 
b/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java
index 84053d28a..3845d5b33 100644
--- a/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java
+++ b/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java
@@ -20,12 +20,16 @@ package com.netease.arctic.utils;
 
 import com.netease.arctic.io.ArcticFileIO;
 import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.URI;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TableFileUtil {
   private static final Logger LOG = 
LoggerFactory.getLogger(TableFileUtil.class);
@@ -80,6 +84,56 @@ public class TableFileUtil {
     }
   }
 
+  /**
+   * Helper to delete files. Bulk deletion is used if possible.
+   *
+   * @param tableName table name
+   * @param io arctic file io
+   * @param files files to delete
+   * @param concurrent controls concurrent deletion. Only applicable for 
non-bulk FileIO
+   * @return deleted file count
+   */
+  public static int deleteFiles(
+      String tableName, ArcticFileIO io, Set<String> files, boolean 
concurrent) {
+    AtomicInteger failedFileCnt = new AtomicInteger(0);
+    if (io.supportBulkOperations()) {
+      try {
+        io.asBulkFileIO().deleteFiles(files);
+      } catch (BulkDeletionFailureException e) {
+        failedFileCnt.set(e.numberFailedObjects());
+        LOG.warn("Failed to bulk delete {} files in {}", 
e.numberFailedObjects(), tableName);
+      } catch (RuntimeException e) {
+        failedFileCnt.set(files.size());
+        LOG.warn("Failed to bulk delete files in {}", tableName, e);
+      }
+    } else {
+      if (concurrent) {
+        Tasks.foreach(files)
+            .executeWith(ThreadPools.getWorkerPool())
+            .noRetry()
+            .suppressFailureWhenFinished()
+            .onFailure(
+                (file, exc) -> {
+                  failedFileCnt.addAndGet(1);
+                  LOG.warn("Failed to delete file {}", file, exc);
+                })
+            .run(io::deleteFile);
+      } else {
+        files.forEach(
+            f -> {
+              try {
+                io.deleteFile(f);
+              } catch (RuntimeException e) {
+                failedFileCnt.addAndGet(1);
+                LOG.warn("Failed to delete file {}", f, e);
+              }
+            });
+      }
+    }
+
+    return files.size() - failedFileCnt.get();
+  }
+
   /**
    * Get the file path after move file to target directory
    *

Reply via email to