This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a76f46cab Spark 3.2: Bulk delete support for actions (#7048)
9a76f46cab is described below

commit 9a76f46caba9e6b9ddfcac385073e7f5a928ba04
Author: Russell Spitzer <[email protected]>
AuthorDate: Thu Mar 9 20:03:54 2023 -0600

    Spark 3.2: Bulk delete support for actions (#7048)
    
    This change backports PR #6682 to Spark 3.2.
---
 .../iceberg/spark/actions/BaseSparkAction.java     | 62 ++++++++++++++++++++++
 .../actions/DeleteOrphanFilesSparkAction.java      | 49 +++++++++++------
 .../actions/DeleteReachableFilesSparkAction.java   | 29 ++++++----
 .../spark/actions/ExpireSnapshotsSparkAction.java  | 27 ++++++----
 .../spark/procedures/ExpireSnapshotsProcedure.java | 17 +++++-
 .../procedures/RemoveOrphanFilesProcedure.java     | 16 +++++-
 6 files changed, 164 insertions(+), 36 deletions(-)

diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index cdd80040fa..1b285e8cac 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
 import static org.apache.spark.sql.functions.col;
 import static org.apache.spark.sql.functions.lit;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,14 +47,19 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.ClosingIterator;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.JobGroupUtils;
 import org.apache.iceberg.spark.SparkTableUtil;
@@ -85,6 +91,7 @@ abstract class BaseSparkAction<ThisT> {
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseSparkAction.class);
   private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
   private static final int DELETE_NUM_RETRIES = 3;
+  private static final int DELETE_GROUP_SIZE = 100000;
 
   private final SparkSession spark;
   private final JavaSparkContext sparkContext;
@@ -253,6 +260,37 @@ abstract class BaseSparkAction<ThisT> {
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(SupportsBulkOperations io, 
Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, 
DELETE_GROUP_SIZE);
+
+    Tasks.foreach(fileGroups)
+        .suppressFailureWhenFinished()
+        .run(fileGroup -> deleteFileGroup(fileGroup, io, summary));
+
+    return summary;
+  }
+
+  private static void deleteFileGroup(
+      List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary 
summary) {
+
+    ListMultimap<String, FileInfo> filesByType = Multimaps.index(fileGroup, 
FileInfo::getType);
+    ListMultimap<String, String> pathsByType =
+        Multimaps.transformValues(filesByType, FileInfo::getPath);
+
+    for (Map.Entry<String, Collection<String>> entry : 
pathsByType.asMap().entrySet()) {
+      String type = entry.getKey();
+      Collection<String> paths = entry.getValue();
+      int failures = 0;
+      try {
+        io.deleteFiles(paths);
+      } catch (BulkDeletionFailureException e) {
+        failures = e.numberFailedObjects();
+      }
+      summary.deletedFiles(type, paths.size() - failures);
+    }
+  }
+
   static class DeleteSummary {
     private final AtomicLong dataFilesCount = new AtomicLong(0L);
     private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L);
@@ -261,6 +299,30 @@ abstract class BaseSparkAction<ThisT> {
     private final AtomicLong manifestListsCount = new AtomicLong(0L);
     private final AtomicLong otherFilesCount = new AtomicLong(0L);
 
+    public void deletedFiles(String type, int numFiles) {
+      if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+        dataFilesCount.addAndGet(numFiles);
+
+      } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
+        positionDeleteFilesCount.addAndGet(numFiles);
+
+      } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
+        equalityDeleteFilesCount.addAndGet(numFiles);
+
+      } else if (MANIFEST.equalsIgnoreCase(type)) {
+        manifestsCount.addAndGet(numFiles);
+
+      } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
+        manifestListsCount.addAndGet(numFiles);
+
+      } else if (OTHERS.equalsIgnoreCase(type)) {
+        otherFilesCount.addAndGet(numFiles);
+
+      } else {
+        throw new ValidationException("Illegal file type: %s", type);
+      }
+    }
+
     public void deletedFile(String path, String type) {
       if (FileContent.DATA.name().equalsIgnoreCase(type)) {
         dataFilesCount.incrementAndGet();
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 1abd2107ed..ea73403c2e 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -48,6 +48,8 @@ import 
org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HiddenPathFilter;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -111,21 +113,13 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
   private final SerializableConfiguration hadoopConf;
   private final int listingParallelism;
   private final Table table;
-  private final Consumer<String> defaultDelete =
-      new Consumer<String>() {
-        @Override
-        public void accept(String file) {
-          table.io().deleteFile(file);
-        }
-      };
-
   private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
   private Map<String, String> equalAuthorities = Collections.emptyMap();
   private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
   private String location = null;
   private long olderThanTimestamp = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(3);
   private Dataset<Row> compareToFileList;
-  private Consumer<String> deleteFunc = defaultDelete;
+  private Consumer<String> deleteFunc = null;
   private ExecutorService deleteExecutorService = null;
 
   DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
@@ -239,6 +233,16 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
     return String.format("Deleting orphan files (%s) from %s", 
optionsAsString, table.name());
   }
 
+  private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
+    try {
+      io.deleteFiles(paths);
+      LOG.info("Deleted {} files using bulk deletes", paths.size());
+    } catch (BulkDeletionFailureException e) {
+      int deletedFilesCount = paths.size() - e.numberFailedObjects();
+      LOG.warn("Deleted only {} of {} files using bulk deletes", 
deletedFilesCount, paths.size());
+    }
+  }
+
   private DeleteOrphanFiles.Result doExecute() {
     Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
     Dataset<FileURI> validFileIdentDS = validFileIdentDS();
@@ -246,12 +250,27 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
     List<String> orphanFiles =
         findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, 
prefixMismatchMode);
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
+    if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
+      deleteFiles((SupportsBulkOperations) table.io(), orphanFiles);
+    } else {
+
+      Tasks.Builder<String> deleteTasks =
+          Tasks.foreach(orphanFiles)
+              .noRetry()
+              .executeWith(deleteExecutorService)
+              .suppressFailureWhenFinished()
+              .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", 
file, exc));
+
+      if (deleteFunc == null) {
+        LOG.info(
+            "Table IO {} does not support bulk operations. Using non-bulk 
deletes.",
+            table.io().getClass().getName());
+        deleteTasks.run(table.io()::deleteFile);
+      } else {
+        LOG.info("Custom delete function provided. Using non-bulk deletes");
+        deleteTasks.run(deleteFunc::accept);
+      }
+    }
 
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
index 0f01afa287..cdc60a659d 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.actions.DeleteReachableFiles;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.util.PropertyUtil;
@@ -54,15 +55,8 @@ public class DeleteReachableFilesSparkAction
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteReachableFilesSparkAction.class);
 
   private final String metadataFileLocation;
-  private final Consumer<String> defaultDelete =
-      new Consumer<String>() {
-        @Override
-        public void accept(String file) {
-          io.deleteFile(file);
-        }
-      };
-
-  private Consumer<String> deleteFunc = defaultDelete;
+
+  private Consumer<String> deleteFunc = null;
   private ExecutorService deleteExecutorService = null;
   private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());
 
@@ -132,7 +126,22 @@ public class DeleteReachableFilesSparkAction
   }
 
   private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;
+    if (deleteFunc == null && io instanceof SupportsBulkOperations) {
+      summary = deleteFiles((SupportsBulkOperations) io, files);
+    } else {
+
+      if (deleteFunc == null) {
+        LOG.info(
+            "Table IO {} does not support bulk operations. Using non-bulk 
deletes.",
+            io.getClass().getName());
+        summary = deleteFiles(deleteExecutorService, io::deleteFile, files);
+      } else {
+        LOG.info("Custom delete function provided. Using non-bulk deletes");
+        summary = deleteFiles(deleteExecutorService, deleteFunc, files);
+      }
+    }
+
     LOG.info("Deleted {} total files", summary.totalFilesCount());
 
     return new BaseDeleteReachableFilesActionResult(
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index d9af48c221..95e153a9a5 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
 import org.apache.iceberg.actions.ExpireSnapshots;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -74,18 +75,11 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
 
   private final Table table;
   private final TableOperations ops;
-  private final Consumer<String> defaultDelete =
-      new Consumer<String>() {
-        @Override
-        public void accept(String file) {
-          ops.io().deleteFile(file);
-        }
-      };
 
   private final Set<Long> expiredSnapshotIds = Sets.newHashSet();
   private Long expireOlderThanValue = null;
   private Integer retainLastValue = null;
-  private Consumer<String> deleteFunc = defaultDelete;
+  private Consumer<String> deleteFunc = null;
   private ExecutorService deleteExecutorService = null;
   private Dataset<FileInfo> expiredFileDS = null;
 
@@ -265,7 +259,22 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
   }
 
   private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;
+    if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
+      summary = deleteFiles((SupportsBulkOperations) table.io(), files);
+    } else {
+
+      if (deleteFunc == null) {
+        LOG.info(
+            "Table IO {} does not support bulk operations. Using non-bulk 
deletes.",
+            table.io().getClass().getName());
+        summary = deleteFiles(deleteExecutorService, table.io()::deleteFile, 
files);
+      } else {
+        LOG.info("Custom delete function provided. Using non-bulk deletes");
+        summary = deleteFiles(deleteExecutorService, deleteFunc, files);
+      }
+    }
+
     LOG.info("Deleted {} total files", summary.totalFilesCount());
 
     return new BaseExpireSnapshotsActionResult(
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 36c5dee6bd..a66310f493 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.procedures;
 
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.ExpireSnapshots;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
@@ -33,6 +34,8 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A procedure that expires snapshots in a table.
@@ -41,6 +44,8 @@ import org.apache.spark.sql.types.StructType;
  */
 public class ExpireSnapshotsProcedure extends BaseProcedure {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsProcedure.class);
+
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
@@ -117,7 +122,17 @@ public class ExpireSnapshotsProcedure extends 
BaseProcedure {
           }
 
           if (maxConcurrentDeletes != null) {
-            action.executeDeleteWith(executorService(maxConcurrentDeletes, 
"expire-snapshots"));
+            if (table.io() instanceof SupportsBulkOperations) {
+              LOG.warn(
+                  "max_concurrent_deletes only works with FileIOs that do not 
support bulk deletes. This"
+                      + "table is currently using {} which supports bulk 
deletes so the parameter will be ignored. "
+                      + "See that IO's documentation to learn how to adjust 
parallelism for that particular "
+                      + "IO's bulk delete.",
+                  table.io().getClass().getName());
+            } else {
+
+              action.executeDeleteWith(executorService(maxConcurrentDeletes, 
"expire-snapshots"));
+            }
           }
 
           if (snapshotIds != null) {
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index f49f37c02e..6e66ea2629 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
 import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -39,6 +40,8 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.runtime.BoxedUnit;
 
 /**
@@ -47,6 +50,7 @@ import scala.runtime.BoxedUnit;
  * @see SparkActions#deleteOrphanFiles(Table)
  */
 public class RemoveOrphanFilesProcedure extends BaseProcedure {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class);
 
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
@@ -154,7 +158,17 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
           }
 
           if (maxConcurrentDeletes != null) {
-            action.executeDeleteWith(executorService(maxConcurrentDeletes, 
"remove-orphans"));
+            if (table.io() instanceof SupportsBulkOperations) {
+              LOG.warn(
+                  "max_concurrent_deletes only works with FileIOs that do not 
support bulk deletes. This"
+                      + "table is currently using {} which supports bulk 
deletes so the parameter will be ignored. "
+                      + "See that IO's documentation to learn how to adjust 
parallelism for that particular "
+                      + "IO's bulk delete.",
+                  table.io().getClass().getName());
+            } else {
+
+              action.executeDeleteWith(executorService(maxConcurrentDeletes, 
"remove-orphans"));
+            }
           }
 
           if (fileListView != null) {

Reply via email to