This is an automated email from the ASF dual-hosted git repository. etudenhoefner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new c7e367494d Core: Use Bulk deletion for cleaning up uncommitted files in BaseTransaction (#13653) c7e367494d is described below commit c7e367494d1646505b7c162886ca1b1c45a33306 Author: Hongyue/Steve Zhang <hongyue.apa...@gmail.com> AuthorDate: Fri Jul 25 00:34:46 2025 -0700 Core: Use Bulk deletion for cleaning up uncommitted files in BaseTransaction (#13653) --- .../java/org/apache/iceberg/BaseTransaction.java | 57 +++++++++++++--------- .../org/apache/iceberg/TestRemoveSnapshots.java | 23 ++------- .../test/java/org/apache/iceberg/TestTables.java | 16 ++++++ .../java/org/apache/iceberg/TestTransaction.java | 39 +++++++++++++++ 4 files changed, 92 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 4ae99f0609..db59733740 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -39,8 +39,10 @@ import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.metrics.LoggingMetricsReporter; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -50,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -340,11 +343,8 @@ public class BaseTransaction implements Transaction { } finally { // create table never needs to retry because the table has no previous state. because retries // are not a - // concern, it is safe to delete all of the deleted files from individual operations - Tasks.foreach(deletedFiles) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) - .run(ops.io()::deleteFile); + // concern, it is safe to delete all the deleted files from individual operations + deleteUncommittedFiles(deletedFiles); } } @@ -396,11 +396,8 @@ public class BaseTransaction implements Transaction { } finally { // replace table never needs to retry because the table state is completely replaced. because // retries are not - // a concern, it is safe to delete all of the deleted files from individual operations - Tasks.foreach(deletedFiles) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) - .run(ops.io()::deleteFile); + // a concern, it is safe to delete all the deleted files from individual operations + deleteUncommittedFiles(deletedFiles); } } @@ -458,16 +455,12 @@ public class BaseTransaction implements Transaction { Set<String> committedFiles = committedFiles(ops, newSnapshots); if (committedFiles != null) { - // delete all of the files that were deleted in the most recent set of operation commits - Tasks.foreach(deletedFiles) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) - .run( - path -> { - if (!committedFiles.contains(path)) { - ops.io().deleteFile(path); - } - }); + // delete all the files that were deleted in the most recent set of operation commits + Set<String> uncommittedFiles = + deletedFiles.stream() + .filter(f -> !committedFiles.contains(f)) + .collect(Collectors.toSet()); + deleteUncommittedFiles(uncommittedFiles); } else { LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up"); } @@ -482,10 +475,7 @@ public class BaseTransaction implements Transaction { cleanAllUpdates(); // delete all the uncommitted files - Tasks.foreach(deletedFiles) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) - .run(ops.io()::deleteFile); + deleteUncommittedFiles(deletedFiles); } private void cleanAllUpdates() { @@ -499,6 +489,25 @@ public class BaseTransaction implements Transaction { }); } + private void deleteUncommittedFiles(Iterable<String> paths) { + if (ops.io() instanceof SupportsBulkOperations) { + try { + ((SupportsBulkOperations) ops.io()).deleteFiles(paths); + } catch (BulkDeletionFailureException e) { + LOG.warn( + "Failed to delete {} uncommitted files using bulk deletes", e.numberFailedObjects(), e); + } catch (RuntimeException e) { + LOG.warn("Failed to delete uncommitted files using bulk deletes", e); + } + } else { + Tasks.foreach(paths) + .executeWith(ThreadPools.getWorkerPool()) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) + .run(ops.io()::deleteFile); + } + } + private void applyUpdates(TableOperations underlyingOps) { if (base != underlyingOps.refresh()) { // use refreshed the metadata diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index 1318c2b332..80fcb8f14e 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -44,7 +44,6 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; -import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.puffin.Blob; import org.apache.iceberg.puffin.Puffin; import org.apache.iceberg.puffin.PuffinWriter; @@ -1535,7 +1534,7 @@ public class TestRemoveSnapshots extends TestBase { @TestTemplate public void testRemoveFromTableWithBulkIO() { - TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO()); + TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestTables.TestBulkLocalFileIO()); Mockito.doNothing().when(spyFileIO).deleteFiles(any()); @@ -1544,7 +1543,7 @@ public class TestRemoveSnapshots extends TestBase { @TestTemplate public void testBulkDeletionWithBulkDeletionFailureException() { - TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO()); + TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestTables.TestBulkLocalFileIO()); Mockito.doThrow(new BulkDeletionFailureException(2)) .doNothing() @@ -1556,7 +1555,7 @@ public class TestRemoveSnapshots extends TestBase { @TestTemplate public void testBulkDeletionWithRuntimeException() { - TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO()); + TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestTables.TestBulkLocalFileIO()); Mockito.doThrow(new RuntimeException("Exception when bulk deleting")) .doNothing() @@ -1566,7 +1565,7 @@ public class TestRemoveSnapshots extends TestBase { runBulkDeleteTest(spyFileIO); } - private void runBulkDeleteTest(TestBulkLocalFileIO spyFileIO) { + private void runBulkDeleteTest(TestTables.TestBulkLocalFileIO spyFileIO) { String tableName = "tableWithBulkIO"; Table tableWithBulkIO = TestTables.create( @@ -1909,18 +1908,4 @@ public class TestRemoveSnapshots extends TestBase { private static void commitPartitionStats(Table table, PartitionStatisticsFile statisticsFile) { table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); } - - private static class TestBulkLocalFileIO extends TestTables.LocalFileIO - implements SupportsBulkOperations { - - @Override - public void deleteFile(String path) { - throw new RuntimeException("Expected to call the bulk delete interface."); - } - - @Override - public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException { - throw new RuntimeException("Expected to mock this function"); - } - } } diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index 073a95fca2..55232689ad 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -26,10 +26,12 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -392,4 +394,18 @@ public class TestTables { } } } + + static class TestBulkLocalFileIO extends TestTables.LocalFileIO + implements SupportsBulkOperations { + + @Override + public void deleteFile(String path) { + throw new RuntimeException("Expected to call the bulk delete interface."); + } + + @Override + public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException { + throw new RuntimeException("Expected to mock this function"); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index 6ea84e8fc2..31fdba99ec 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -21,6 +21,7 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; +import static org.mockito.ArgumentMatchers.any; import java.io.File; import java.io.IOException; @@ -38,6 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; @ExtendWith(ParameterizedTestExtension.class) public class TestTransaction extends TestBase { @@ -239,6 +241,43 @@ public class TestTransaction extends TestBase { .hasMessage("Injected failure"); } + @TestTemplate + public void testTransactionFailureBulkDeletionCleanup() throws IOException { + TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestTables.TestBulkLocalFileIO()); + Mockito.doNothing().when(spyFileIO).deleteFiles(any()); + + File location = java.nio.file.Files.createTempDirectory(temp, "junit").toFile(); + String tableName = "txnFailureBulkDeleteTest"; + TestTables.TestTable tableWithBulkIO = + TestTables.create( + location, + tableName, + SCHEMA, + SPEC, + SortOrder.unsorted(), + formatVersion, + new TestTables.TestTableOperations(tableName, location, spyFileIO)); + + // set retries to 0 to catch the failure + tableWithBulkIO.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "0").commit(); + + // eagerly generate manifest and manifest-list + Transaction txn = tableWithBulkIO.newTransaction(); + txn.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + ManifestFile appendManifest = txn.table().currentSnapshot().allManifests(table.io()).get(0); + String txnManifestList = txn.table().currentSnapshot().manifestListLocation(); + + // cause the transaction commit to fail + tableWithBulkIO.ops().failCommits(1); + assertThatThrownBy(txn::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + + // ensure both files are deleted on transaction failure + Mockito.verify(spyFileIO).deleteFiles(Set.of(appendManifest.path(), txnManifestList)); + } + @TestTemplate public void testTransactionRetry() { // use only one retry