This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 39373d09c2 Core: Allow SnapshotProducer to skip uncommitted manifest
cleanup after commit (#10523)
39373d09c2 is described below
commit 39373d09c276586ddcec971fe35951975bdac66f
Author: Grant Nicholas <[email protected]>
AuthorDate: Thu Aug 1 14:31:18 2024 -0500
Core: Allow SnapshotProducer to skip uncommitted manifest cleanup after
commit (#10523)
---
.../main/java/org/apache/iceberg/FastAppend.java | 10 +++++
.../java/org/apache/iceberg/SnapshotProducer.java | 43 ++++++++++---------
.../java/org/apache/iceberg/TestFastAppend.java | 50 ++++++++++++++++++++++
3 files changed, 82 insertions(+), 21 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 1439289130..4976a8081c 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -198,6 +198,16 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
}
}
+ /**
+ * Cleanup after committing is disabled for FastAppend unless there are
rewrittenAppendManifests
+ * because: 1.) Appended manifests are never rewritten 2.) Manifests which
are written out as part
+ * of appendFile are already cleaned up between commit attempts in
writeNewManifests
+ */
+ @Override
+ protected boolean cleanupAfterCommit() {
+ return !rewrittenAppendManifests.isEmpty();
+ }
+
private List<ManifestFile> writeNewManifests() throws IOException {
if (hasNewFiles && newManifests != null) {
newManifests.forEach(file -> deleteFile(file.path()));
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 9f4bcbc6bb..0a040fe344 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -41,7 +41,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
@@ -368,8 +368,8 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public void commit() {
- // this is always set to the latest commit attempt's snapshot id.
- AtomicLong newSnapshotId = new AtomicLong(-1L);
+ // this is always set to the latest commit attempt's snapshot
+ AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
try (Timed ignore = commitMetrics().totalDuration().start()) {
try {
Tasks.foreach(ops)
@@ -384,7 +384,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
.run(
taskOps -> {
Snapshot newSnapshot = apply();
- newSnapshotId.set(newSnapshot.snapshotId());
+ stagedSnapshot.set(newSnapshot);
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
@@ -422,26 +422,23 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
throw e;
}
+ // at this point, the commit must have succeeded so the stagedSnapshot
is committed
+ Snapshot committedSnapshot = stagedSnapshot.get();
try {
- LOG.info("Committed snapshot {} ({})", newSnapshotId.get(),
getClass().getSimpleName());
-
- // at this point, the commit must have succeeded. after a refresh, the
snapshot is loaded by
- // id in case another commit was added between this commit and the
refresh.
- Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
- if (saved != null) {
- cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
- // also clean up unused manifest lists created by multiple attempts
- for (String manifestList : manifestLists) {
- if (!saved.manifestListLocation().equals(manifestList)) {
- deleteFile(manifestList);
- }
+ LOG.info(
+ "Committed snapshot {} ({})",
+ committedSnapshot.snapshotId(),
+ getClass().getSimpleName());
+
+ if (cleanupAfterCommit()) {
+
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
+ }
+ // also clean up unused manifest lists created by multiple attempts
+ for (String manifestList : manifestLists) {
+ if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
+ deleteFile(manifestList);
}
- } else {
- // saved may not be present if the latest metadata couldn't be
loaded due to eventual
- // consistency problems in refresh. in that case, don't clean up.
- LOG.warn("Failed to load committed snapshot, skipping manifest
clean-up");
}
-
} catch (Throwable e) {
LOG.warn(
"Failed to load committed table metadata or during cleanup,
skipping further cleanup",
@@ -565,6 +562,10 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
return canInheritSnapshotId;
}
+ protected boolean cleanupAfterCommit() {
+ return true;
+ }
+
private static ManifestFile addMetadata(TableOperations ops, ManifestFile
manifest) {
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index b281536ab0..8125c528d9 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -324,6 +324,56 @@ public class TestFastAppend extends TestBase {
assertThat(metadata.currentSnapshot().allManifests(FILE_IO)).contains(newManifest);
}
+ @TestTemplate
+ public void testWriteNewManifestsIdempotency() {
+ // inject 3 failures, the last try will succeed
+ TestTables.TestTableOperations ops = table.ops();
+ ops.failCommits(3);
+
+ AppendFiles append = table.newFastAppend().appendFile(FILE_B);
+ Snapshot pending = append.apply();
+ ManifestFile newManifest = pending.allManifests(FILE_IO).get(0);
+ assertThat(new File(newManifest.path())).exists();
+
+ append.commit();
+
+ TableMetadata metadata = readMetadata();
+
+ // contains only a single manifest, does not duplicate manifests on retries
+ validateSnapshot(null, metadata.currentSnapshot(), FILE_B);
+ assertThat(new File(newManifest.path())).exists();
+
assertThat(metadata.currentSnapshot().allManifests(FILE_IO)).contains(newManifest);
+ assertThat(listManifestFiles(tableDir)).containsExactly(new
File(newManifest.path()));
+ }
+
+ @TestTemplate
+ public void testWriteNewManifestsCleanup() {
+ // append file, stage changes with apply() but do not commit
+ AppendFiles append = table.newFastAppend().appendFile(FILE_A);
+ Snapshot pending = append.apply();
+ ManifestFile oldManifest = pending.allManifests(FILE_IO).get(0);
+ assertThat(new File(oldManifest.path())).exists();
+
+ // append file, stage changes with apply() but do not commit
+ // validate writeNewManifests deleted the old staged manifest
+ append.appendFile(FILE_B);
+ Snapshot newPending = append.apply();
+ List<ManifestFile> manifestFiles = newPending.allManifests(FILE_IO);
+ assertThat(manifestFiles).hasSize(1);
+ ManifestFile newManifest = manifestFiles.get(0);
+ assertThat(newManifest.path()).isNotEqualTo(oldManifest.path());
+
+ append.commit();
+ TableMetadata metadata = readMetadata();
+
+ // contains only a single manifest, old staged manifest is deleted
+ validateSnapshot(null, metadata.currentSnapshot(), FILE_A, FILE_B);
+ assertThat(new File(oldManifest.path())).doesNotExist();
+ assertThat(new File(newManifest.path())).exists();
+
assertThat(metadata.currentSnapshot().allManifests(FILE_IO)).containsExactly(newManifest);
+ assertThat(listManifestFiles(tableDir)).containsExactly(new
File(newManifest.path()));
+ }
+
@TestTemplate
public void testAppendManifestWithSnapshotIdInheritance() throws IOException
{
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
"true").commit();