This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new aa5894a139 Core: Load snapshot after it has been committed to prevent
accidental cleanup of files (#15511)
aa5894a139 is described below
commit aa5894a139bca2967bf5cc01feef3a920fcc6ca5
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Mar 16 12:23:32 2026 +0100
Core: Load snapshot after it has been committed to prevent accidental
cleanup of files (#15511)
---
.../java/org/apache/iceberg/SnapshotProducer.java | 41 ++++++++------
.../org/apache/iceberg/TestSnapshotProducer.java | 66 ++++++++++++++++++++++
.../test/java/org/apache/iceberg/TestTables.java | 2 +-
.../org/apache/iceberg/rest/TestRESTCatalog.java | 2 +-
4 files changed, 92 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index ffbebf5998..1fd7dab62f 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -44,7 +44,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -455,8 +455,8 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public void commit() {
- // this is always set to the latest commit attempt's snapshot
- AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
+ // this is always set to the latest commit attempt's snapshot id.
+ AtomicLong newSnapshotId = new AtomicLong(-1L);
try (Timed ignore = commitMetrics().totalDuration().start()) {
try {
Tasks.foreach(ops)
@@ -471,7 +471,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
.run(
taskOps -> {
Snapshot newSnapshot = apply();
- stagedSnapshot.set(newSnapshot);
+ newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
@@ -509,22 +509,29 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
throw e;
}
- // at this point, the commit must have succeeded so the stagedSnapshot
is committed
- Snapshot committedSnapshot = stagedSnapshot.get();
try {
- LOG.info(
- "Committed snapshot {} ({})",
- committedSnapshot.snapshotId(),
- getClass().getSimpleName());
+ LOG.info("Committed snapshot {} ({})", newSnapshotId.get(),
getClass().getSimpleName());
+
+ // at this point, the commit must have succeeded. after a refresh, the
snapshot is loaded by
+ // id in case another commit was added between this commit and the
refresh.
+ // it might not be known which commit attempt succeeded in some cases,
so this only cleans
+ // up the one that actually did succeed.
+ Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
+ if (saved != null) {
+ if (cleanupAfterCommit()) {
+ cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
+ }
- if (cleanupAfterCommit()) {
-
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
- }
- // also clean up unused manifest lists created by multiple attempts
- for (String manifestList : manifestLists) {
- if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
- deleteFile(manifestList);
+ // also clean up unused manifest lists created by multiple attempts
+ for (String manifestList : manifestLists) {
+ if (!saved.manifestListLocation().equals(manifestList)) {
+ deleteFile(manifestList);
+ }
}
+ } else {
+ // saved may not be present if the latest metadata couldn't be
loaded due to eventual
+ // consistency problems in refresh. in that case, don't clean up.
+ LOG.warn("Failed to load committed snapshot, skipping manifest
clean-up");
}
} catch (Throwable e) {
LOG.warn(
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index ad25a30b56..b6c0ab65e2 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -22,7 +22,9 @@ import static
org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@@ -158,4 +160,68 @@ public class TestSnapshotProducer extends TestBase {
// Verify the table wasn't updated
assertThat(table.snapshots()).hasSize(1);
}
+
+ @TestTemplate
+ public void manifestNotCleanedUpWhenSnapshotNotLoadableAfterCommit() {
+ // Uses a custom TableOps that returns stale metadata (without the new
snapshot) on the
+ // first refresh() after commit, simulating eventual consistency. Verifies
that commit succeeds
+ // and that the committed data is visible once the table is refreshed again
+ String tableName = "stale-table-on-first-refresh";
+ TestTables.TestTableOperations ops =
opsWithStaleRefreshAfterCommit(tableName, tableDir);
+ TestTables.TestTable tableWithStaleRefresh =
+ TestTables.create(
+ tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(),
formatVersion, ops);
+
+ // the first refresh() after the commit will return stale metadata
(without this snapshot), so
+ // SnapshotProducer will skip cleanup to avoid accidentally deleting files
that are part of the
+ // committed snapshot but commit still succeeds
+ tableWithStaleRefresh.newAppend().appendFile(FILE_A).commit();
+
+ // Refresh again to get the real metadata; the snapshot must be visible now
+ tableWithStaleRefresh.ops().refresh();
+ Snapshot snapshot = tableWithStaleRefresh.currentSnapshot();
+ assertThat(snapshot)
+ .as("Committed snapshot must be visible after refresh (eventual
consistency resolved)")
+ .isNotNull();
+
+ File metadata = Paths.get(tableDir.getPath(), "metadata").toFile();
+ assertThat(snapshot.allManifests(tableWithStaleRefresh.io()))
+ .isNotEmpty()
+ .allSatisfy(
+ manifest -> assertThat(metadata.listFiles()).contains(new
File(manifest.path())));
+ }
+
+ /**
+ * Creates a TableOperations that returns stale metadata (without the newly
committed snapshot) on
+ * the first refresh() after a commit. This simulates eventual consistency
where the committed
+ * snapshot is not yet visible. Used to verify that when the snapshot cannot
be loaded after
+ * commit, cleanup is skipped to avoid accidentally deleting files that are
part of the committed
+ * snapshot.
+ */
+ private static TestTables.TestTableOperations opsWithStaleRefreshAfterCommit(
+ String name, File location) {
+ return new TestTables.TestTableOperations(name, location) {
+ private TableMetadata metadataToReturnOnNextRefresh;
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata updatedMetadata) {
+ super.commit(base, updatedMetadata);
+ if (base != null) {
+ // return stale metadata on the first refresh() call
+ this.metadataToReturnOnNextRefresh = base;
+ }
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ if (metadataToReturnOnNextRefresh != null) {
+ this.current = metadataToReturnOnNextRefresh;
+ this.metadataToReturnOnNextRefresh = null;
+ return current;
+ }
+
+ return super.refresh();
+ }
+ };
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 55232689ad..13c859f860 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -276,7 +276,7 @@ public class TestTables {
private final String tableName;
private final File metadata;
private final FileIO fileIO;
- private TableMetadata current = null;
+ protected TableMetadata current = null;
private long lastSnapshotId = 0;
private int failCommits = 0;
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 56ab8f97f8..d07efdd3a2 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -3659,7 +3659,7 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
table.newAppend().appendFile(FILE_A).commit();
// loadTable is executed once
- Mockito.verify(adapter)
+ Mockito.verify(adapter, times(2))
.execute(matches(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(),
any(), any());
// CommitReport reflects the table state after the commit