This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9a76f46cab Spark 3.2: Bulk delete support for actions (#7048)
9a76f46cab is described below
commit 9a76f46caba9e6b9ddfcac385073e7f5a928ba04
Author: Russell Spitzer <[email protected]>
AuthorDate: Thu Mar 9 20:03:54 2023 -0600
Spark 3.2: Bulk delete support for actions (#7048)
This change backports PR #6682 to Spark 3.2.
---
.../iceberg/spark/actions/BaseSparkAction.java | 62 ++++++++++++++++++++++
.../actions/DeleteOrphanFilesSparkAction.java | 49 +++++++++++------
.../actions/DeleteReachableFilesSparkAction.java | 29 ++++++----
.../spark/actions/ExpireSnapshotsSparkAction.java | 27 ++++++----
.../spark/procedures/ExpireSnapshotsProcedure.java | 17 +++++-
.../procedures/RemoveOrphanFilesProcedure.java | 16 +++++-
6 files changed, 164 insertions(+), 36 deletions(-)
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index cdd80040fa..1b285e8cac 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.lit;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -46,14 +47,19 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkTableUtil;
@@ -85,6 +91,7 @@ abstract class BaseSparkAction<ThisT> {
private static final Logger LOG =
LoggerFactory.getLogger(BaseSparkAction.class);
private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
private static final int DELETE_NUM_RETRIES = 3;
+ private static final int DELETE_GROUP_SIZE = 100000;
private final SparkSession spark;
private final JavaSparkContext sparkContext;
@@ -253,6 +260,37 @@ abstract class BaseSparkAction<ThisT> {
return summary;
}
+ protected DeleteSummary deleteFiles(SupportsBulkOperations io,
Iterator<FileInfo> files) {
+ DeleteSummary summary = new DeleteSummary();
+ Iterator<List<FileInfo>> fileGroups = Iterators.partition(files,
DELETE_GROUP_SIZE);
+
+ Tasks.foreach(fileGroups)
+ .suppressFailureWhenFinished()
+ .run(fileGroup -> deleteFileGroup(fileGroup, io, summary));
+
+ return summary;
+ }
+
+ private static void deleteFileGroup(
+ List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary
summary) {
+
+ ListMultimap<String, FileInfo> filesByType = Multimaps.index(fileGroup,
FileInfo::getType);
+ ListMultimap<String, String> pathsByType =
+ Multimaps.transformValues(filesByType, FileInfo::getPath);
+
+ for (Map.Entry<String, Collection<String>> entry :
pathsByType.asMap().entrySet()) {
+ String type = entry.getKey();
+ Collection<String> paths = entry.getValue();
+ int failures = 0;
+ try {
+ io.deleteFiles(paths);
+ } catch (BulkDeletionFailureException e) {
+ failures = e.numberFailedObjects();
+ }
+ summary.deletedFiles(type, paths.size() - failures);
+ }
+ }
+
static class DeleteSummary {
private final AtomicLong dataFilesCount = new AtomicLong(0L);
private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L);
@@ -261,6 +299,30 @@ abstract class BaseSparkAction<ThisT> {
private final AtomicLong manifestListsCount = new AtomicLong(0L);
private final AtomicLong otherFilesCount = new AtomicLong(0L);
+ public void deletedFiles(String type, int numFiles) {
+ if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+ dataFilesCount.addAndGet(numFiles);
+
+ } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
+ positionDeleteFilesCount.addAndGet(numFiles);
+
+ } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
+ equalityDeleteFilesCount.addAndGet(numFiles);
+
+ } else if (MANIFEST.equalsIgnoreCase(type)) {
+ manifestsCount.addAndGet(numFiles);
+
+ } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
+ manifestListsCount.addAndGet(numFiles);
+
+ } else if (OTHERS.equalsIgnoreCase(type)) {
+ otherFilesCount.addAndGet(numFiles);
+
+ } else {
+ throw new ValidationException("Illegal file type: %s", type);
+ }
+ }
+
public void deletedFile(String path, String type) {
if (FileContent.DATA.name().equalsIgnoreCase(type)) {
dataFilesCount.incrementAndGet();
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 1abd2107ed..ea73403c2e 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -48,6 +48,8 @@ import
org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.SupportsBulkOperations;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -111,21 +113,13 @@ public class DeleteOrphanFilesSparkAction extends
BaseSparkAction<DeleteOrphanFi
private final SerializableConfiguration hadoopConf;
private final int listingParallelism;
private final Table table;
- private final Consumer<String> defaultDelete =
- new Consumer<String>() {
- @Override
- public void accept(String file) {
- table.io().deleteFile(file);
- }
- };
-
private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
private Map<String, String> equalAuthorities = Collections.emptyMap();
private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(3);
private Dataset<Row> compareToFileList;
- private Consumer<String> deleteFunc = defaultDelete;
+ private Consumer<String> deleteFunc = null;
private ExecutorService deleteExecutorService = null;
DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
@@ -239,6 +233,16 @@ public class DeleteOrphanFilesSparkAction extends
BaseSparkAction<DeleteOrphanFi
return String.format("Deleting orphan files (%s) from %s",
optionsAsString, table.name());
}
+ private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
+ try {
+ io.deleteFiles(paths);
+ LOG.info("Deleted {} files using bulk deletes", paths.size());
+ } catch (BulkDeletionFailureException e) {
+ int deletedFilesCount = paths.size() - e.numberFailedObjects();
+ LOG.warn("Deleted only {} of {} files using bulk deletes",
deletedFilesCount, paths.size());
+ }
+ }
+
private DeleteOrphanFiles.Result doExecute() {
Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
Dataset<FileURI> validFileIdentDS = validFileIdentDS();
@@ -246,12 +250,27 @@ public class DeleteOrphanFilesSparkAction extends
BaseSparkAction<DeleteOrphanFi
List<String> orphanFiles =
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS,
prefixMismatchMode);
- Tasks.foreach(orphanFiles)
- .noRetry()
- .executeWith(deleteExecutorService)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file,
exc))
- .run(deleteFunc::accept);
+ if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
+ deleteFiles((SupportsBulkOperations) table.io(), orphanFiles);
+ } else {
+
+ Tasks.Builder<String> deleteTasks =
+ Tasks.foreach(orphanFiles)
+ .noRetry()
+ .executeWith(deleteExecutorService)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}",
file, exc));
+
+ if (deleteFunc == null) {
+ LOG.info(
+ "Table IO {} does not support bulk operations. Using non-bulk
deletes.",
+ table.io().getClass().getName());
+ deleteTasks.run(table.io()::deleteFile);
+ } else {
+ LOG.info("Custom delete function provided. Using non-bulk deletes");
+ deleteTasks.run(deleteFunc::accept);
+ }
+ }
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
index 0f01afa287..cdc60a659d 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.actions.DeleteReachableFiles;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
@@ -54,15 +55,8 @@ public class DeleteReachableFilesSparkAction
private static final Logger LOG =
LoggerFactory.getLogger(DeleteReachableFilesSparkAction.class);
private final String metadataFileLocation;
- private final Consumer<String> defaultDelete =
- new Consumer<String>() {
- @Override
- public void accept(String file) {
- io.deleteFile(file);
- }
- };
-
- private Consumer<String> deleteFunc = defaultDelete;
+
+ private Consumer<String> deleteFunc = null;
private ExecutorService deleteExecutorService = null;
private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());
@@ -132,7 +126,22 @@ public class DeleteReachableFilesSparkAction
}
private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
- DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc,
files);
+ DeleteSummary summary;
+ if (deleteFunc == null && io instanceof SupportsBulkOperations) {
+ summary = deleteFiles((SupportsBulkOperations) io, files);
+ } else {
+
+ if (deleteFunc == null) {
+ LOG.info(
+ "Table IO {} does not support bulk operations. Using non-bulk
deletes.",
+ io.getClass().getName());
+ summary = deleteFiles(deleteExecutorService, io::deleteFile, files);
+ } else {
+ LOG.info("Custom delete function provided. Using non-bulk deletes");
+ summary = deleteFiles(deleteExecutorService, deleteFunc, files);
+ }
+ }
+
LOG.info("Deleted {} total files", summary.totalFilesCount());
return new BaseDeleteReachableFilesActionResult(
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index d9af48c221..95e153a9a5 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -74,18 +75,11 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
private final Table table;
private final TableOperations ops;
- private final Consumer<String> defaultDelete =
- new Consumer<String>() {
- @Override
- public void accept(String file) {
- ops.io().deleteFile(file);
- }
- };
private final Set<Long> expiredSnapshotIds = Sets.newHashSet();
private Long expireOlderThanValue = null;
private Integer retainLastValue = null;
- private Consumer<String> deleteFunc = defaultDelete;
+ private Consumer<String> deleteFunc = null;
private ExecutorService deleteExecutorService = null;
private Dataset<FileInfo> expiredFileDS = null;
@@ -265,7 +259,22 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
}
private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
- DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc,
files);
+ DeleteSummary summary;
+ if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
+ summary = deleteFiles((SupportsBulkOperations) table.io(), files);
+ } else {
+
+ if (deleteFunc == null) {
+ LOG.info(
+ "Table IO {} does not support bulk operations. Using non-bulk
deletes.",
+ table.io().getClass().getName());
+ summary = deleteFiles(deleteExecutorService, table.io()::deleteFile,
files);
+ } else {
+ LOG.info("Custom delete function provided. Using non-bulk deletes");
+ summary = deleteFiles(deleteExecutorService, deleteFunc, files);
+ }
+ }
+
LOG.info("Deleted {} total files", summary.totalFilesCount());
return new BaseExpireSnapshotsActionResult(
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 36c5dee6bd..a66310f493 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.procedures;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ExpireSnapshots;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
@@ -33,6 +34,8 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A procedure that expires snapshots in a table.
@@ -41,6 +44,8 @@ import org.apache.spark.sql.types.StructType;
*/
public class ExpireSnapshotsProcedure extends BaseProcedure {
+ private static final Logger LOG =
LoggerFactory.getLogger(ExpireSnapshotsProcedure.class);
+
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
@@ -117,7 +122,17 @@ public class ExpireSnapshotsProcedure extends
BaseProcedure {
}
if (maxConcurrentDeletes != null) {
- action.executeDeleteWith(executorService(maxConcurrentDeletes,
"expire-snapshots"));
+ if (table.io() instanceof SupportsBulkOperations) {
+ LOG.warn(
+ "max_concurrent_deletes only works with FileIOs that do not
support bulk deletes. This"
+ + "table is currently using {} which supports bulk
deletes so the parameter will be ignored. "
+ + "See that IO's documentation to learn how to adjust
parallelism for that particular "
+ + "IO's bulk delete.",
+ table.io().getClass().getName());
+ } else {
+
+ action.executeDeleteWith(executorService(maxConcurrentDeletes,
"expire-snapshots"));
+ }
}
if (snapshotIds != null) {
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index f49f37c02e..6e66ea2629 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -39,6 +40,8 @@ import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;
/**
@@ -47,6 +50,7 @@ import scala.runtime.BoxedUnit;
* @see SparkActions#deleteOrphanFiles(Table)
*/
public class RemoveOrphanFilesProcedure extends BaseProcedure {
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class);
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
@@ -154,7 +158,17 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
}
if (maxConcurrentDeletes != null) {
- action.executeDeleteWith(executorService(maxConcurrentDeletes,
"remove-orphans"));
+ if (table.io() instanceof SupportsBulkOperations) {
+ LOG.warn(
+ "max_concurrent_deletes only works with FileIOs that do not
support bulk deletes. This"
+ + "table is currently using {} which supports bulk
deletes so the parameter will be ignored. "
+ + "See that IO's documentation to learn how to adjust
parallelism for that particular "
+ + "IO's bulk delete.",
+ table.io().getClass().getName());
+ } else {
+
+ action.executeDeleteWith(executorService(maxConcurrentDeletes,
"remove-orphans"));
+ }
}
if (fileListView != null) {