This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c7e367494d Core: Use Bulk deletion for cleaning up uncommitted files 
in BaseTransaction (#13653)
c7e367494d is described below

commit c7e367494d1646505b7c162886ca1b1c45a33306
Author: Hongyue/Steve Zhang <hongyue.apa...@gmail.com>
AuthorDate: Fri Jul 25 00:34:46 2025 -0700

    Core: Use Bulk deletion for cleaning up uncommitted files in 
BaseTransaction (#13653)
---
 .../java/org/apache/iceberg/BaseTransaction.java   | 57 +++++++++++++---------
 .../org/apache/iceberg/TestRemoveSnapshots.java    | 23 ++-------
 .../test/java/org/apache/iceberg/TestTables.java   | 16 ++++++
 .../java/org/apache/iceberg/TestTransaction.java   | 39 +++++++++++++++
 4 files changed, 92 insertions(+), 43 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java 
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 4ae99f0609..db59733740 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -39,8 +39,10 @@ import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.metrics.LoggingMetricsReporter;
 import org.apache.iceberg.metrics.MetricsReporter;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -50,6 +52,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -340,11 +343,8 @@ public class BaseTransaction implements Transaction {
     } finally {
       // create table never needs to retry because the table has no previous 
state. because retries
       // are not a
-      // concern, it is safe to delete all of the deleted files from 
individual operations
-      Tasks.foreach(deletedFiles)
-          .suppressFailureWhenFinished()
-          .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted 
file: {}", file, exc))
-          .run(ops.io()::deleteFile);
+      // concern, it is safe to delete all the deleted files from individual 
operations
+      deleteUncommittedFiles(deletedFiles);
     }
   }
 
@@ -396,11 +396,8 @@ public class BaseTransaction implements Transaction {
     } finally {
       // replace table never needs to retry because the table state is 
completely replaced. because
       // retries are not
-      // a concern, it is safe to delete all of the deleted files from 
individual operations
-      Tasks.foreach(deletedFiles)
-          .suppressFailureWhenFinished()
-          .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted 
file: {}", file, exc))
-          .run(ops.io()::deleteFile);
+      // a concern, it is safe to delete all the deleted files from individual 
operations
+      deleteUncommittedFiles(deletedFiles);
     }
   }
 
@@ -458,16 +455,12 @@ public class BaseTransaction implements Transaction {
 
       Set<String> committedFiles = committedFiles(ops, newSnapshots);
       if (committedFiles != null) {
-        // delete all of the files that were deleted in the most recent set of 
operation commits
-        Tasks.foreach(deletedFiles)
-            .suppressFailureWhenFinished()
-            .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted 
file: {}", file, exc))
-            .run(
-                path -> {
-                  if (!committedFiles.contains(path)) {
-                    ops.io().deleteFile(path);
-                  }
-                });
+        // delete all the files that were deleted in the most recent set of 
operation commits
+        Set<String> uncommittedFiles =
+            deletedFiles.stream()
+                .filter(f -> !committedFiles.contains(f))
+                .collect(Collectors.toSet());
+        deleteUncommittedFiles(uncommittedFiles);
       } else {
         LOG.warn("Failed to load metadata for a committed snapshot, skipping 
clean-up");
       }
@@ -482,10 +475,7 @@ public class BaseTransaction implements Transaction {
     cleanAllUpdates();
 
     // delete all the uncommitted files
-    Tasks.foreach(deletedFiles)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: 
{}", file, exc))
-        .run(ops.io()::deleteFile);
+    deleteUncommittedFiles(deletedFiles);
   }
 
   private void cleanAllUpdates() {
@@ -499,6 +489,25 @@ public class BaseTransaction implements Transaction {
             });
   }
 
+  private void deleteUncommittedFiles(Iterable<String> paths) {
+    if (ops.io() instanceof SupportsBulkOperations) {
+      try {
+        ((SupportsBulkOperations) ops.io()).deleteFiles(paths);
+      } catch (BulkDeletionFailureException e) {
+        LOG.warn(
+            "Failed to delete {} uncommitted files using bulk deletes", 
e.numberFailedObjects(), e);
+      } catch (RuntimeException e) {
+        LOG.warn("Failed to delete uncommitted files using bulk deletes", e);
+      }
+    } else {
+      Tasks.foreach(paths)
+          .executeWith(ThreadPools.getWorkerPool())
+          .suppressFailureWhenFinished()
+          .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted 
file: {}", file, exc))
+          .run(ops.io()::deleteFile);
+    }
+  }
+
   private void applyUpdates(TableOperations underlyingOps) {
     if (base != underlyingOps.refresh()) {
       // use refreshed the metadata
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java 
b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index 1318c2b332..80fcb8f14e 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.PositionOutputStream;
-import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.puffin.Blob;
 import org.apache.iceberg.puffin.Puffin;
 import org.apache.iceberg.puffin.PuffinWriter;
@@ -1535,7 +1534,7 @@ public class TestRemoveSnapshots extends TestBase {
 
   @TestTemplate
   public void testRemoveFromTableWithBulkIO() {
-    TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());
+    TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new 
TestTables.TestBulkLocalFileIO());
 
     Mockito.doNothing().when(spyFileIO).deleteFiles(any());
 
@@ -1544,7 +1543,7 @@ public class TestRemoveSnapshots extends TestBase {
 
   @TestTemplate
   public void testBulkDeletionWithBulkDeletionFailureException() {
-    TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());
+    TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new 
TestTables.TestBulkLocalFileIO());
 
     Mockito.doThrow(new BulkDeletionFailureException(2))
         .doNothing()
@@ -1556,7 +1555,7 @@ public class TestRemoveSnapshots extends TestBase {
 
   @TestTemplate
   public void testBulkDeletionWithRuntimeException() {
-    TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());
+    TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new 
TestTables.TestBulkLocalFileIO());
 
     Mockito.doThrow(new RuntimeException("Exception when bulk deleting"))
         .doNothing()
@@ -1566,7 +1565,7 @@ public class TestRemoveSnapshots extends TestBase {
     runBulkDeleteTest(spyFileIO);
   }
 
-  private void runBulkDeleteTest(TestBulkLocalFileIO spyFileIO) {
+  private void runBulkDeleteTest(TestTables.TestBulkLocalFileIO spyFileIO) {
     String tableName = "tableWithBulkIO";
     Table tableWithBulkIO =
         TestTables.create(
@@ -1909,18 +1908,4 @@ public class TestRemoveSnapshots extends TestBase {
   private static void commitPartitionStats(Table table, 
PartitionStatisticsFile statisticsFile) {
     
table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
   }
-
-  private static class TestBulkLocalFileIO extends TestTables.LocalFileIO
-      implements SupportsBulkOperations {
-
-    @Override
-    public void deleteFile(String path) {
-      throw new RuntimeException("Expected to call the bulk delete 
interface.");
-    }
-
-    @Override
-    public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
-      throw new RuntimeException("Expected to mock this function");
-    }
-  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java 
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 073a95fca2..55232689ad 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -26,10 +26,12 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -392,4 +394,18 @@ public class TestTables {
       }
     }
   }
+
+  static class TestBulkLocalFileIO extends TestTables.LocalFileIO
+      implements SupportsBulkOperations {
+
+    @Override
+    public void deleteFile(String path) {
+      throw new RuntimeException("Expected to call the bulk delete 
interface.");
+    }
+
+    @Override
+    public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+      throw new RuntimeException("Expected to mock this function");
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java 
b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 6ea84e8fc2..31fdba99ec 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
 
 import java.io.File;
 import java.io.IOException;
@@ -38,6 +39,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
 
 @ExtendWith(ParameterizedTestExtension.class)
 public class TestTransaction extends TestBase {
@@ -239,6 +241,43 @@ public class TestTransaction extends TestBase {
         .hasMessage("Injected failure");
   }
 
+  @TestTemplate
+  public void testTransactionFailureBulkDeletionCleanup() throws IOException {
+    TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new 
TestTables.TestBulkLocalFileIO());
+    Mockito.doNothing().when(spyFileIO).deleteFiles(any());
+
+    File location = java.nio.file.Files.createTempDirectory(temp, 
"junit").toFile();
+    String tableName = "txnFailureBulkDeleteTest";
+    TestTables.TestTable tableWithBulkIO =
+        TestTables.create(
+            location,
+            tableName,
+            SCHEMA,
+            SPEC,
+            SortOrder.unsorted(),
+            formatVersion,
+            new TestTables.TestTableOperations(tableName, location, 
spyFileIO));
+
+    // set retries to 0 to catch the failure
+    tableWithBulkIO.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, 
"0").commit();
+
+    // eagerly generate manifest and manifest-list
+    Transaction txn = tableWithBulkIO.newTransaction();
+    txn.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    ManifestFile appendManifest = 
txn.table().currentSnapshot().allManifests(table.io()).get(0);
+    String txnManifestList = 
txn.table().currentSnapshot().manifestListLocation();
+
+    // cause the transaction commit to fail
+    tableWithBulkIO.ops().failCommits(1);
+    assertThatThrownBy(txn::commitTransaction)
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessage("Injected failure");
+
+    // ensure both files are deleted on transaction failure
+    Mockito.verify(spyFileIO).deleteFiles(Set.of(appendManifest.path(), 
txnManifestList));
+  }
+
   @TestTemplate
   public void testTransactionRetry() {
     // use only one retry

Reply via email to