This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new 1852d2d75e Core: Load snapshot after it has been committed to prevent
accidental cleanup of files (#15511) (#15650)
1852d2d75e is described below
commit 1852d2d75e7221ecdd3dfedf44465a56143d4747
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Mar 16 15:21:47 2026 +0100
Core: Load snapshot after it has been committed to prevent accidental
cleanup of files (#15511) (#15650)
---
.../java/org/apache/iceberg/SnapshotProducer.java | 41 +++++++------
.../org/apache/iceberg/TestSnapshotProducer.java | 69 ++++++++++++++++++++++
.../test/java/org/apache/iceberg/TestTables.java | 2 +-
3 files changed, 94 insertions(+), 18 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 77cdac8f4a..6a43d45937 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -44,7 +44,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -422,8 +422,8 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public void commit() {
- // this is always set to the latest commit attempt's snapshot
- AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
+ // this is always set to the latest commit attempt's snapshot id.
+ AtomicLong newSnapshotId = new AtomicLong(-1L);
try (Timed ignore = commitMetrics().totalDuration().start()) {
try {
Tasks.foreach(ops)
@@ -438,7 +438,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
.run(
taskOps -> {
Snapshot newSnapshot = apply();
- stagedSnapshot.set(newSnapshot);
+ newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
@@ -476,22 +476,29 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
throw e;
}
- // at this point, the commit must have succeeded so the stagedSnapshot
is committed
- Snapshot committedSnapshot = stagedSnapshot.get();
try {
- LOG.info(
- "Committed snapshot {} ({})",
- committedSnapshot.snapshotId(),
- getClass().getSimpleName());
+ LOG.info("Committed snapshot {} ({})", newSnapshotId.get(),
getClass().getSimpleName());
+
+ // at this point, the commit must have succeeded. after a refresh, the
snapshot is loaded by
+ // id in case another commit was added between this commit and the
refresh.
+ // it might not be known which commit attempt succeeded in some cases,
so this only cleans
+ // up the one that actually did succeed.
+ Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
+ if (saved != null) {
+ if (cleanupAfterCommit()) {
+ cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
+ }
- if (cleanupAfterCommit()) {
-
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
- }
- // also clean up unused manifest lists created by multiple attempts
- for (String manifestList : manifestLists) {
- if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
- deleteFile(manifestList);
+ // also clean up unused manifest lists created by multiple attempts
+ for (String manifestList : manifestLists) {
+ if (!saved.manifestListLocation().equals(manifestList)) {
+ deleteFile(manifestList);
+ }
}
+ } else {
+ // saved may not be present if the latest metadata couldn't be
loaded due to eventual
+ // consistency problems in refresh. in that case, don't clean up.
+ LOG.warn("Failed to load committed snapshot, skipping manifest
clean-up");
}
} catch (Throwable e) {
LOG.warn(
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index c3e238e3bc..6a64c3c48b 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -18,9 +18,15 @@
*/
package org.apache.iceberg;
+import static org.apache.iceberg.TestBase.FILE_A;
+import static org.apache.iceberg.TestBase.SCHEMA;
+import static org.apache.iceberg.TestBase.SPEC;
import static org.assertj.core.api.Assertions.assertThat;
+import java.io.File;
+import java.nio.file.Paths;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
public class TestSnapshotProducer {
@@ -74,4 +80,67 @@ public class TestSnapshotProducer {
int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize,
fileCount);
assertThat(writerCount).as(errMsg).isEqualTo(expectedManifestWriterCount);
}
+
+ @Test
+ public void manifestNotCleanedUpWhenSnapshotNotLoadableAfterCommit(@TempDir
File tableDir) {
+ // Uses a custom TableOps that returns stale metadata (without the new
snapshot) on the
+ // first refresh() after commit, simulating eventual consistency. Verifies
that commit succeeds
+ // and that the committed data is visible once the table is refreshed again
+ String tableName = "stale-table-on-first-refresh";
+ TestTables.TestTableOperations ops =
opsWithStaleRefreshAfterCommit(tableName, tableDir);
+ TestTables.TestTable tableWithStaleRefresh =
+ TestTables.create(tableDir, tableName, SCHEMA, SPEC,
SortOrder.unsorted(), 2, ops);
+
+ // the first refresh() after the commit will return stale metadata
(without this snapshot), so
+ // SnapshotProducer will skip cleanup to avoid accidentally deleting files
that are part of the
+ // committed snapshot but commit still succeeds
+ tableWithStaleRefresh.newAppend().appendFile(FILE_A).commit();
+
+ // Refresh again to get the real metadata; the snapshot must be visible now
+ tableWithStaleRefresh.ops().refresh();
+ Snapshot snapshot = tableWithStaleRefresh.currentSnapshot();
+ assertThat(snapshot)
+ .as("Committed snapshot must be visible after refresh (eventual
consistency resolved)")
+ .isNotNull();
+
+ File metadata = Paths.get(tableDir.getPath(), "metadata").toFile();
+ assertThat(snapshot.allManifests(tableWithStaleRefresh.io()))
+ .isNotEmpty()
+ .allSatisfy(
+ manifest -> assertThat(metadata.listFiles()).contains(new
File(manifest.path())));
+ }
+
+ /**
+ * Creates a TableOperations that returns stale metadata (without the newly
committed snapshot) on
+ * the first refresh() after a commit. This simulates eventual consistency
where the committed
+ * snapshot is not yet visible. Used to verify that when the snapshot cannot
be loaded after
+ * commit, cleanup is skipped to avoid accidentally deleting files that are
part of the committed
+ * snapshot.
+ */
+ private static TestTables.TestTableOperations opsWithStaleRefreshAfterCommit(
+ String name, File location) {
+ return new TestTables.TestTableOperations(name, location) {
+ private TableMetadata metadataToReturnOnNextRefresh;
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata updatedMetadata) {
+ super.commit(base, updatedMetadata);
+ if (base != null) {
+ // return stale metadata on the first refresh() call
+ this.metadataToReturnOnNextRefresh = base;
+ }
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ if (metadataToReturnOnNextRefresh != null) {
+ this.current = metadataToReturnOnNextRefresh;
+ this.metadataToReturnOnNextRefresh = null;
+ return current;
+ }
+
+ return super.refresh();
+ }
+ };
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 55232689ad..13c859f860 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -276,7 +276,7 @@ public class TestTables {
private final String tableName;
private final File metadata;
private final FileIO fileIO;
- private TableMetadata current = null;
+ protected TableMetadata current = null;
private long lastSnapshotId = 0;
private int failCommits = 0;