This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 39373d09c2 Core: Allow SnapshotProducer to skip uncommitted manifest 
cleanup after commit (#10523)
39373d09c2 is described below

commit 39373d09c276586ddcec971fe35951975bdac66f
Author: Grant Nicholas <[email protected]>
AuthorDate: Thu Aug 1 14:31:18 2024 -0500

    Core: Allow SnapshotProducer to skip uncommitted manifest cleanup after 
commit (#10523)
---
 .../main/java/org/apache/iceberg/FastAppend.java   | 10 +++++
 .../java/org/apache/iceberg/SnapshotProducer.java  | 43 ++++++++++---------
 .../java/org/apache/iceberg/TestFastAppend.java    | 50 ++++++++++++++++++++++
 3 files changed, 82 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java 
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 1439289130..4976a8081c 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -198,6 +198,16 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
     }
   }
 
+  /**
+   * Cleanup after committing is disabled for FastAppend unless there are 
rewrittenAppendManifests
+   * because: 1.) Appended manifests are never rewritten 2.) Manifests which 
are written out as part
+   * of appendFile are already cleaned up between commit attempts in 
writeNewManifests
+   */
+  @Override
+  protected boolean cleanupAfterCommit() {
+    return !rewrittenAppendManifests.isEmpty();
+  }
+
   private List<ManifestFile> writeNewManifests() throws IOException {
     if (hasNewFiles && newManifests != null) {
       newManifests.forEach(file -> deleteFile(file.path()));
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 9f4bcbc6bb..0a040fe344 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -41,7 +41,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptingFileIO;
@@ -368,8 +368,8 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   @Override
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public void commit() {
-    // this is always set to the latest commit attempt's snapshot id.
-    AtomicLong newSnapshotId = new AtomicLong(-1L);
+    // this is always set to the latest commit attempt's snapshot
+    AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
     try (Timed ignore = commitMetrics().totalDuration().start()) {
       try {
         Tasks.foreach(ops)
@@ -384,7 +384,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
             .run(
                 taskOps -> {
                   Snapshot newSnapshot = apply();
-                  newSnapshotId.set(newSnapshot.snapshotId());
+                  stagedSnapshot.set(newSnapshot);
                   TableMetadata.Builder update = TableMetadata.buildFrom(base);
                   if (base.snapshot(newSnapshot.snapshotId()) != null) {
                     // this is a rollback operation
@@ -422,26 +422,23 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
         throw e;
       }
 
+      // at this point, the commit must have succeeded so the stagedSnapshot 
is committed
+      Snapshot committedSnapshot = stagedSnapshot.get();
       try {
-        LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), 
getClass().getSimpleName());
-
-        // at this point, the commit must have succeeded. after a refresh, the 
snapshot is loaded by
-        // id in case another commit was added between this commit and the 
refresh.
-        Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
-        if (saved != null) {
-          cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
-          // also clean up unused manifest lists created by multiple attempts
-          for (String manifestList : manifestLists) {
-            if (!saved.manifestListLocation().equals(manifestList)) {
-              deleteFile(manifestList);
-            }
+        LOG.info(
+            "Committed snapshot {} ({})",
+            committedSnapshot.snapshotId(),
+            getClass().getSimpleName());
+
+        if (cleanupAfterCommit()) {
+          
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
+        }
+        // also clean up unused manifest lists created by multiple attempts
+        for (String manifestList : manifestLists) {
+          if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
+            deleteFile(manifestList);
           }
-        } else {
-          // saved may not be present if the latest metadata couldn't be 
loaded due to eventual
-          // consistency problems in refresh. in that case, don't clean up.
-          LOG.warn("Failed to load committed snapshot, skipping manifest 
clean-up");
         }
-
       } catch (Throwable e) {
         LOG.warn(
             "Failed to load committed table metadata or during cleanup, 
skipping further cleanup",
@@ -565,6 +562,10 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
     return canInheritSnapshotId;
   }
 
+  protected boolean cleanupAfterCommit() {
+    return true;
+  }
+
   private static ManifestFile addMetadata(TableOperations ops, ManifestFile 
manifest) {
     try (ManifestReader<DataFile> reader =
         ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java 
b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index b281536ab0..8125c528d9 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -324,6 +324,56 @@ public class TestFastAppend extends TestBase {
     
assertThat(metadata.currentSnapshot().allManifests(FILE_IO)).contains(newManifest);
   }
 
+  @TestTemplate
+  public void testWriteNewManifestsIdempotency() {
+    // inject 3 failures, the last try will succeed
+    TestTables.TestTableOperations ops = table.ops();
+    ops.failCommits(3);
+
+    AppendFiles append = table.newFastAppend().appendFile(FILE_B);
+    Snapshot pending = append.apply();
+    ManifestFile newManifest = pending.allManifests(FILE_IO).get(0);
+    assertThat(new File(newManifest.path())).exists();
+
+    append.commit();
+
+    TableMetadata metadata = readMetadata();
+
+    // contains only a single manifest, does not duplicate manifests on retries
+    validateSnapshot(null, metadata.currentSnapshot(), FILE_B);
+    assertThat(new File(newManifest.path())).exists();
+    
assertThat(metadata.currentSnapshot().allManifests(FILE_IO)).contains(newManifest);
+    assertThat(listManifestFiles(tableDir)).containsExactly(new 
File(newManifest.path()));
+  }
+
+  @TestTemplate
+  public void testWriteNewManifestsCleanup() {
+    // append file, stage changes with apply() but do not commit
+    AppendFiles append = table.newFastAppend().appendFile(FILE_A);
+    Snapshot pending = append.apply();
+    ManifestFile oldManifest = pending.allManifests(FILE_IO).get(0);
+    assertThat(new File(oldManifest.path())).exists();
+
+    // append file, stage changes with apply() but do not commit
+    // validate writeNewManifests deleted the old staged manifest
+    append.appendFile(FILE_B);
+    Snapshot newPending = append.apply();
+    List<ManifestFile> manifestFiles = newPending.allManifests(FILE_IO);
+    assertThat(manifestFiles).hasSize(1);
+    ManifestFile newManifest = manifestFiles.get(0);
+    assertThat(newManifest.path()).isNotEqualTo(oldManifest.path());
+
+    append.commit();
+    TableMetadata metadata = readMetadata();
+
+    // contains only a single manifest, old staged manifest is deleted
+    validateSnapshot(null, metadata.currentSnapshot(), FILE_A, FILE_B);
+    assertThat(new File(oldManifest.path())).doesNotExist();
+    assertThat(new File(newManifest.path())).exists();
+    
assertThat(metadata.currentSnapshot().allManifests(FILE_IO)).containsExactly(newManifest);
+    assertThat(listManifestFiles(tableDir)).containsExactly(new 
File(newManifest.path()));
+  }
+
   @TestTemplate
   public void testAppendManifestWithSnapshotIdInheritance() throws IOException 
{
     
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
"true").commit();

Reply via email to