This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c7e367494d Core: Use Bulk deletion for cleaning up uncommitted files
in BaseTransaction (#13653)
c7e367494d is described below
commit c7e367494d1646505b7c162886ca1b1c45a33306
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Jul 25 00:34:46 2025 -0700
Core: Use Bulk deletion for cleaning up uncommitted files in
BaseTransaction (#13653)
---
.../java/org/apache/iceberg/BaseTransaction.java | 57 +++++++++++++---------
.../org/apache/iceberg/TestRemoveSnapshots.java | 23 ++-------
.../test/java/org/apache/iceberg/TestTables.java | 16 ++++++
.../java/org/apache/iceberg/TestTransaction.java | 39 +++++++++++++++
4 files changed, 92 insertions(+), 43 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 4ae99f0609..db59733740 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -39,8 +39,10 @@ import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.metrics.MetricsReporter;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -50,6 +52,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -340,11 +343,8 @@ public class BaseTransaction implements Transaction {
} finally {
// create table never needs to retry because the table has no previous
state. because retries
// are not a
- // concern, it is safe to delete all of the deleted files from
individual operations
- Tasks.foreach(deletedFiles)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted
file: {}", file, exc))
- .run(ops.io()::deleteFile);
+ // concern, it is safe to delete all the deleted files from individual
operations
+ deleteUncommittedFiles(deletedFiles);
}
}
@@ -396,11 +396,8 @@ public class BaseTransaction implements Transaction {
} finally {
// replace table never needs to retry because the table state is
completely replaced. because
// retries are not
- // a concern, it is safe to delete all of the deleted files from
individual operations
- Tasks.foreach(deletedFiles)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted
file: {}", file, exc))
- .run(ops.io()::deleteFile);
+ // a concern, it is safe to delete all the deleted files from individual
operations
+ deleteUncommittedFiles(deletedFiles);
}
}
@@ -458,16 +455,12 @@ public class BaseTransaction implements Transaction {
Set<String> committedFiles = committedFiles(ops, newSnapshots);
if (committedFiles != null) {
- // delete all of the files that were deleted in the most recent set of
operation commits
- Tasks.foreach(deletedFiles)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted
file: {}", file, exc))
- .run(
- path -> {
- if (!committedFiles.contains(path)) {
- ops.io().deleteFile(path);
- }
- });
+ // delete all the files that were deleted in the most recent set of
operation commits
+ Set<String> uncommittedFiles =
+ deletedFiles.stream()
+ .filter(f -> !committedFiles.contains(f))
+ .collect(Collectors.toSet());
+ deleteUncommittedFiles(uncommittedFiles);
} else {
LOG.warn("Failed to load metadata for a committed snapshot, skipping
clean-up");
}
@@ -482,10 +475,7 @@ public class BaseTransaction implements Transaction {
cleanAllUpdates();
// delete all the uncommitted files
- Tasks.foreach(deletedFiles)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file:
{}", file, exc))
- .run(ops.io()::deleteFile);
+ deleteUncommittedFiles(deletedFiles);
}
private void cleanAllUpdates() {
@@ -499,6 +489,25 @@ public class BaseTransaction implements Transaction {
});
}
+ private void deleteUncommittedFiles(Iterable<String> paths) {
+ if (ops.io() instanceof SupportsBulkOperations) {
+ try {
+ ((SupportsBulkOperations) ops.io()).deleteFiles(paths);
+ } catch (BulkDeletionFailureException e) {
+ LOG.warn(
+ "Failed to delete {} uncommitted files using bulk deletes",
e.numberFailedObjects(), e);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to delete uncommitted files using bulk deletes", e);
+ }
+ } else {
+ Tasks.foreach(paths)
+ .executeWith(ThreadPools.getWorkerPool())
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted
file: {}", file, exc))
+ .run(ops.io()::deleteFile);
+ }
+ }
+
private void applyUpdates(TableOperations underlyingOps) {
if (base != underlyingOps.refresh()) {
// use refreshed the metadata
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index 1318c2b332..80fcb8f14e 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
-import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
@@ -1535,7 +1534,7 @@ public class TestRemoveSnapshots extends TestBase {
@TestTemplate
public void testRemoveFromTableWithBulkIO() {
- TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());
+ TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new
TestTables.TestBulkLocalFileIO());
Mockito.doNothing().when(spyFileIO).deleteFiles(any());
@@ -1544,7 +1543,7 @@ public class TestRemoveSnapshots extends TestBase {
@TestTemplate
public void testBulkDeletionWithBulkDeletionFailureException() {
- TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());
+ TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new
TestTables.TestBulkLocalFileIO());
Mockito.doThrow(new BulkDeletionFailureException(2))
.doNothing()
@@ -1556,7 +1555,7 @@ public class TestRemoveSnapshots extends TestBase {
@TestTemplate
public void testBulkDeletionWithRuntimeException() {
- TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());
+ TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new
TestTables.TestBulkLocalFileIO());
Mockito.doThrow(new RuntimeException("Exception when bulk deleting"))
.doNothing()
@@ -1566,7 +1565,7 @@ public class TestRemoveSnapshots extends TestBase {
runBulkDeleteTest(spyFileIO);
}
- private void runBulkDeleteTest(TestBulkLocalFileIO spyFileIO) {
+ private void runBulkDeleteTest(TestTables.TestBulkLocalFileIO spyFileIO) {
String tableName = "tableWithBulkIO";
Table tableWithBulkIO =
TestTables.create(
@@ -1909,18 +1908,4 @@ public class TestRemoveSnapshots extends TestBase {
private static void commitPartitionStats(Table table,
PartitionStatisticsFile statisticsFile) {
table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
}
-
- private static class TestBulkLocalFileIO extends TestTables.LocalFileIO
- implements SupportsBulkOperations {
-
- @Override
- public void deleteFile(String path) {
- throw new RuntimeException("Expected to call the bulk delete
interface.");
- }
-
- @Override
- public void deleteFiles(Iterable<String> pathsToDelete) throws
BulkDeletionFailureException {
- throw new RuntimeException("Expected to mock this function");
- }
- }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 073a95fca2..55232689ad 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -26,10 +26,12 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -392,4 +394,18 @@ public class TestTables {
}
}
}
+
+ static class TestBulkLocalFileIO extends TestTables.LocalFileIO
+ implements SupportsBulkOperations {
+
+ @Override
+ public void deleteFile(String path) {
+ throw new RuntimeException("Expected to call the bulk delete
interface.");
+ }
+
+ @Override
+ public void deleteFiles(Iterable<String> pathsToDelete) throws
BulkDeletionFailureException {
+ throw new RuntimeException("Expected to mock this function");
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java
b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 6ea84e8fc2..31fdba99ec 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
import java.io.File;
import java.io.IOException;
@@ -38,6 +39,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
@ExtendWith(ParameterizedTestExtension.class)
public class TestTransaction extends TestBase {
@@ -239,6 +241,43 @@ public class TestTransaction extends TestBase {
.hasMessage("Injected failure");
}
+ @TestTemplate
+ public void testTransactionFailureBulkDeletionCleanup() throws IOException {
+ TestTables.TestBulkLocalFileIO spyFileIO = Mockito.spy(new
TestTables.TestBulkLocalFileIO());
+ Mockito.doNothing().when(spyFileIO).deleteFiles(any());
+
+ File location = java.nio.file.Files.createTempDirectory(temp,
"junit").toFile();
+ String tableName = "txnFailureBulkDeleteTest";
+ TestTables.TestTable tableWithBulkIO =
+ TestTables.create(
+ location,
+ tableName,
+ SCHEMA,
+ SPEC,
+ SortOrder.unsorted(),
+ formatVersion,
+ new TestTables.TestTableOperations(tableName, location,
spyFileIO));
+
+ // set retries to 0 to catch the failure
+ tableWithBulkIO.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES,
"0").commit();
+
+ // eagerly generate manifest and manifest-list
+ Transaction txn = tableWithBulkIO.newTransaction();
+ txn.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ ManifestFile appendManifest =
txn.table().currentSnapshot().allManifests(table.io()).get(0);
+ String txnManifestList =
txn.table().currentSnapshot().manifestListLocation();
+
+ // cause the transaction commit to fail
+ tableWithBulkIO.ops().failCommits(1);
+ assertThatThrownBy(txn::commitTransaction)
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessage("Injected failure");
+
+ // ensure both files are deleted on transaction failure
+ Mockito.verify(spyFileIO).deleteFiles(Set.of(appendManifest.path(),
txnManifestList));
+ }
+
@TestTemplate
public void testTransactionRetry() {
// use only one retry