amogh-jahagirdar commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r934791146


##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -182,12 +192,26 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         actualFileDF.join(validFileDF, joinCond, 
"leftanti").as(Encoders.STRING()).collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
+    if (batchDeletionSize > 1) {
+      Preconditions.checkArgument(
+          table.io() instanceof SupportsBulkOperations,
+          "FileIO %s does not support bulk deletion",
+          table.io().getClass().getName());
+      SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
+      List<List<String>> fileBatches = Lists.partition(orphanFiles, 
batchDeletionSize);
+      Tasks.foreach(fileBatches)
+          .noRetry()
+          .executeWith(deleteExecutorService)
+          .suppressFailureWhenFinished()
+          .run(bulkFileIO::deleteFiles);

Review Comment:
   Hey Szeon, I'm going to update this PR based on my change in 
https://github.com/apache/iceberg/pull/5379/files.
   
   My thinking is we should always just use deleteFunc if it's passed in 
(should be source of truth for deletion if it's set). If it's not and in the 
case that the file io supports bulk operation we can just do 
fileIO.deleteFiles(), otherwise we go back to the existing mechanism.
   
   That way we preserve existing behavior for the procedure and have the 
additional optimization for file io types which support bulk delete. Also the 
underlying fileIO can take care of how the batches are created (so it's optimal 
for the underlying storage, in the sense of maximizing throughput of the 
deletion, minimizing throttling, etc). The underlying fileIO can also handle 
parallelism as its desire. So in the procedure level we don't need to pass in a 
batch size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to