This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c625dd7d2 Core: Support replacing delete manifests (#9000)
8c625dd7d2 is described below

commit 8c625dd7d21e38235d2864e081c008ef11e0fd20
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Nov 9 14:54:32 2023 -0800

    Core: Support replacing delete manifests (#9000)
---
 .../org/apache/iceberg/BaseRewriteManifests.java   |  13 +-
 .../java/org/apache/iceberg/TableTestBase.java     |  10 +-
 .../org/apache/iceberg/TestRewriteManifests.java   | 609 +++++++++++++++++++++
 3 files changed, 624 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java 
b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 1f0d204dc2..87768e3489 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -168,7 +168,7 @@ public class BaseRewriteManifests extends 
SnapshotProducer<RewriteManifests>
 
   @Override
   public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
-    List<ManifestFile> currentManifests = 
base.currentSnapshot().dataManifests(ops.io());
+    List<ManifestFile> currentManifests = 
base.currentSnapshot().allManifests(ops.io());
     Set<ManifestFile> currentManifestSet = 
ImmutableSet.copyOf(currentManifests);
 
     validateDeletedManifests(currentManifestSet);
@@ -190,7 +190,6 @@ public class BaseRewriteManifests extends 
SnapshotProducer<RewriteManifests>
     List<ManifestFile> apply = Lists.newArrayList();
     Iterables.addAll(apply, newManifestsWithMetadata);
     apply.addAll(keptManifests);
-    apply.addAll(base.currentSnapshot().deleteManifests(ops.io()));
 
     return apply;
   }
@@ -242,7 +241,7 @@ public class BaseRewriteManifests extends 
SnapshotProducer<RewriteManifests>
           .executeWith(workerPool())
           .run(
               manifest -> {
-                if (predicate != null && !predicate.test(manifest)) {
+                if (containsDeletes(manifest) || !matchesPredicate(manifest)) {
                   keptManifests.add(manifest);
                 } else {
                   rewrittenManifests.add(manifest);
@@ -268,6 +267,14 @@ public class BaseRewriteManifests extends 
SnapshotProducer<RewriteManifests>
     }
   }
 
+  private boolean containsDeletes(ManifestFile manifest) {
+    return manifest.content() == ManifestContent.DELETES;
+  }
+
+  private boolean matchesPredicate(ManifestFile manifest) {
+    return predicate == null || predicate.test(manifest);
+  }
+
   private void validateDeletedManifests(Set<ManifestFile> currentManifests) {
     // directly deleted manifests must be still present in the current snapshot
     deletedManifests.stream()
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java 
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index d50845933e..68ce055289 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -328,19 +328,19 @@ public class TableTestBase {
     return writer.toManifestFile();
   }
 
-  ManifestEntry<DataFile> manifestEntry(
-      ManifestEntry.Status status, Long snapshotId, DataFile file) {
+  <F extends ContentFile<F>> ManifestEntry<F> manifestEntry(
+      ManifestEntry.Status status, Long snapshotId, F file) {
     return manifestEntry(status, snapshotId, 0L, 0L, file);
   }
 
-  ManifestEntry<DataFile> manifestEntry(
+  <F extends ContentFile<F>> ManifestEntry<F> manifestEntry(
       ManifestEntry.Status status,
       Long snapshotId,
       Long dataSequenceNumber,
       Long fileSequenceNumber,
-      DataFile file) {
+      F file) {
 
-    GenericManifestEntry<DataFile> entry = new 
GenericManifestEntry<>(table.spec().partitionType());
+    GenericManifestEntry<F> entry = new 
GenericManifestEntry<>(table.spec().partitionType());
     switch (status) {
       case ADDED:
         if (dataSequenceNumber != null && dataSequenceNumber != 0) {
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java 
b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index d7daae8b3e..8cc7e44068 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
 import static 
org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -36,7 +37,9 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Test;
@@ -1105,6 +1108,612 @@ public class TestRewriteManifests extends TableTestBase 
{
             "Cannot commit to branch someBranch: 
org.apache.iceberg.BaseRewriteManifests does not support branch commits");
   }
 
+  @Test
+  public void testRewriteDataManifestsPreservesDeletes() {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    Table table = load();
+
+    // commit data files
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    // save the append snapshot info
+    Snapshot appendSnapshot = table.currentSnapshot();
+    long appendSnapshotId = appendSnapshot.snapshotId();
+    long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+    // commit delete files
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+    // save the delete snapshot info
+    Snapshot deleteSnapshot = table.currentSnapshot();
+    long deleteSnapshotId = deleteSnapshot.snapshotId();
+    long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+    // there must be 1 data and 1 delete manifest before the rewrite
+    assertManifestCounts(table, 1, 1);
+
+    // rewrite manifests and cluster entries by file path
+    table.rewriteManifests().clusterBy(file -> 
file.path().toString()).commit();
+
+    Snapshot rewriteSnapshot = table.currentSnapshot();
+
+    validateSummary(rewriteSnapshot, 1, 1, 2, 2);
+
+    // the rewrite must replace the original data manifest with 2 new data 
manifests
+    List<ManifestFile> dataManifests = sortedDataManifests(table.io(), 
rewriteSnapshot);
+    assertThat(dataManifests).hasSize(2);
+    validateManifest(
+        dataManifests.get(0),
+        dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        ids(appendSnapshotId),
+        files(FILE_A),
+        statuses(ManifestEntry.Status.EXISTING));
+    validateManifest(
+        dataManifests.get(1),
+        dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        ids(appendSnapshotId),
+        files(FILE_B),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    // the rewrite must preserve the original delete manifest (rewriting is 
not supported yet)
+    List<ManifestFile> deleteManifests = 
rewriteSnapshot.deleteManifests(table.io());
+    ManifestFile deleteManifest = Iterables.getOnlyElement(deleteManifests);
+    validateDeleteManifest(
+        deleteManifest,
+        dataSeqs(deleteSnapshotSeq, deleteSnapshotSeq),
+        fileSeqs(deleteSnapshotSeq, deleteSnapshotSeq),
+        ids(deleteSnapshotId, deleteSnapshotId),
+        files(FILE_A_DELETES, FILE_A2_DELETES),
+        statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+  }
+
+  @Test
+  public void testReplaceDeleteManifestsOnly() throws IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    Table table = load();
+
+    // commit data files
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    // save the append snapshot info
+    Snapshot appendSnapshot = table.currentSnapshot();
+    long appendSnapshotId = appendSnapshot.snapshotId();
+    long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+    // commit delete files
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+    // save the delete snapshot info
+    Snapshot deleteSnapshot = table.currentSnapshot();
+    long deleteSnapshotId = deleteSnapshot.snapshotId();
+    long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+    // there must be 1 data and 1 delete manifest before the rewrite
+    assertManifestCounts(table, 1, 1);
+
+    // split the original delete manifest into 2 new delete manifests
+    ManifestFile originalDeleteManifest =
+        Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
+    ManifestFile newDeleteManifest1 =
+        writeManifest(
+            "delete-manifest-file-1.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId,
+                deleteSnapshotSeq,
+                deleteSnapshotSeq,
+                FILE_A_DELETES));
+    ManifestFile newDeleteManifest2 =
+        writeManifest(
+            "delete-manifest-file-2.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId,
+                deleteSnapshotSeq,
+                deleteSnapshotSeq,
+                FILE_A2_DELETES));
+
+    // replace the original delete manifest with the new delete manifests
+    table
+        .rewriteManifests()
+        .deleteManifest(originalDeleteManifest)
+        .addManifest(newDeleteManifest1)
+        .addManifest(newDeleteManifest2)
+        .commit();
+
+    Snapshot rewriteSnapshot = table.currentSnapshot();
+
+    // the rewrite must preserve the original data manifest
+    ManifestFile dataManifest = 
Iterables.getOnlyElement(rewriteSnapshot.dataManifests(table.io()));
+    validateManifest(
+        dataManifest,
+        dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        ids(appendSnapshotId, appendSnapshotId),
+        files(FILE_A, FILE_B),
+        statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+
+    // the rewrite must replace the original delete manifest with 2 new delete 
manifests
+    List<ManifestFile> deleteManifests = 
rewriteSnapshot.deleteManifests(table.io());
+    assertThat(deleteManifests).hasSize(2);
+    validateDeleteManifest(
+        deleteManifests.get(0),
+        dataSeqs(deleteSnapshotSeq),
+        fileSeqs(deleteSnapshotSeq),
+        ids(deleteSnapshotId),
+        files(FILE_A_DELETES),
+        statuses(ManifestEntry.Status.EXISTING));
+    validateDeleteManifest(
+        deleteManifests.get(1),
+        dataSeqs(deleteSnapshotSeq),
+        fileSeqs(deleteSnapshotSeq),
+        ids(deleteSnapshotId),
+        files(FILE_A2_DELETES),
+        statuses(ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testReplaceDataAndDeleteManifests() throws IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    Table table = load();
+
+    // commit data files
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    // save the append snapshot info
+    Snapshot appendSnapshot = table.currentSnapshot();
+    long appendSnapshotId = appendSnapshot.snapshotId();
+    long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+    // commit delete files
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+    // save the delete snapshot info
+    Snapshot deleteSnapshot = table.currentSnapshot();
+    long deleteSnapshotId = deleteSnapshot.snapshotId();
+    long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+    // there must be 1 data and 1 delete manifest before the rewrite
+    assertManifestCounts(table, 1, 1);
+
+    // split the original data manifest into 2 new data manifests
+    ManifestFile originalDataManifest =
+        Iterables.getOnlyElement(deleteSnapshot.dataManifests(table.io()));
+    ManifestFile newDataManifest1 =
+        writeManifest(
+            "manifest-file-1.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                appendSnapshotId,
+                appendSnapshotSeq,
+                appendSnapshotSeq,
+                FILE_A));
+    ManifestFile newDataManifest2 =
+        writeManifest(
+            "manifest-file-2.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                appendSnapshotId,
+                appendSnapshotSeq,
+                appendSnapshotSeq,
+                FILE_B));
+
+    // split the original delete manifest into 2 new delete manifests
+    ManifestFile originalDeleteManifest =
+        Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
+    ManifestFile newDeleteManifest1 =
+        writeManifest(
+            "delete-manifest-file-1.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId,
+                deleteSnapshotSeq,
+                deleteSnapshotSeq,
+                FILE_A_DELETES));
+    ManifestFile newDeleteManifest2 =
+        writeManifest(
+            "delete-manifest-file-2.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId,
+                deleteSnapshotSeq,
+                deleteSnapshotSeq,
+                FILE_A2_DELETES));
+
+    // replace the original data and delete manifests with new ones
+    table
+        .rewriteManifests()
+        .deleteManifest(originalDataManifest)
+        .addManifest(newDataManifest1)
+        .addManifest(newDataManifest2)
+        .deleteManifest(originalDeleteManifest)
+        .addManifest(newDeleteManifest1)
+        .addManifest(newDeleteManifest2)
+        .commit();
+
+    Snapshot rewriteSnapshot = table.currentSnapshot();
+
+    // the rewrite must replace the original data manifest with 2 new data 
manifests
+    List<ManifestFile> dataManifests = sortedDataManifests(table.io(), 
rewriteSnapshot);
+    assertThat(dataManifests).hasSize(2);
+    validateManifest(
+        dataManifests.get(0),
+        dataSeqs(appendSnapshotSeq),
+        fileSeqs(appendSnapshotSeq),
+        ids(appendSnapshotId),
+        files(FILE_A),
+        statuses(ManifestEntry.Status.EXISTING));
+    validateManifest(
+        dataManifests.get(1),
+        dataSeqs(appendSnapshotSeq),
+        fileSeqs(appendSnapshotSeq),
+        ids(appendSnapshotId),
+        files(FILE_B),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    // the rewrite must replace the original delete manifest with 2 new delete 
manifests
+    List<ManifestFile> deleteManifests = 
rewriteSnapshot.deleteManifests(table.io());
+    assertThat(deleteManifests).hasSize(2);
+    validateDeleteManifest(
+        deleteManifests.get(0),
+        dataSeqs(deleteSnapshotSeq),
+        fileSeqs(deleteSnapshotSeq),
+        ids(deleteSnapshotId),
+        files(FILE_A_DELETES),
+        statuses(ManifestEntry.Status.EXISTING));
+    validateDeleteManifest(
+        deleteManifests.get(1),
+        dataSeqs(deleteSnapshotSeq),
+        fileSeqs(deleteSnapshotSeq),
+        ids(deleteSnapshotId),
+        files(FILE_A2_DELETES),
+        statuses(ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testDeleteManifestReplacementConcurrentAppend() throws 
IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    // commit data files
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    // save the initial append snapshot info
+    Snapshot appendSnapshot = table.currentSnapshot();
+    long appendSnapshotId = appendSnapshot.snapshotId();
+    long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+    // commit delete files
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+    // save the delete snapshot info
+    Snapshot deleteSnapshot = table.currentSnapshot();
+    long deleteSnapshotId = deleteSnapshot.snapshotId();
+    long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+    // split the original delete manifest into 2 new delete manifests
+    ManifestFile originalDeleteManifest =
+        Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
+    ManifestFile newDeleteManifest1 =
+        writeManifest(
+            "delete-manifest-file-1.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId,
+                deleteSnapshotSeq,
+                deleteSnapshotSeq,
+                FILE_A_DELETES));
+    ManifestFile newDeleteManifest2 =
+        writeManifest(
+            "delete-manifest-file-2.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId,
+                deleteSnapshotSeq,
+                deleteSnapshotSeq,
+                FILE_A2_DELETES));
+
+    // start the rewrite
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    rewriteManifests.deleteManifest(originalDeleteManifest);
+    rewriteManifests.addManifest(newDeleteManifest1);
+    rewriteManifests.addManifest(newDeleteManifest2);
+
+    // commit another append concurrently
+    table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // save the concurrent snapshot info
+    Snapshot concurrentSnapshot = table.currentSnapshot();
+    long concurrentSnapshotSeq = concurrentSnapshot.sequenceNumber();
+    long concurrentSnapshotId = concurrentSnapshot.snapshotId();
+
+    // there must be 2 data manifests and 1 delete manifest before the rewrite 
is committed
+    assertManifestCounts(table, 2, 1);
+
+    // commit the rewrite successfully as operations are not in conflict
+    rewriteManifests.commit();
+
+    Snapshot rewriteSnapshot = table.currentSnapshot();
+
+    validateSummary(rewriteSnapshot, 1, 2, 2, 0);
+
+    // the rewrite must preserve the original and added concurrently data 
manifests
+    List<ManifestFile> dataManifests = 
rewriteSnapshot.dataManifests(table.io());
+    assertThat(dataManifests).hasSize(2);
+    validateManifest(
+        dataManifests.get(0),
+        dataSeqs(concurrentSnapshotSeq, concurrentSnapshotSeq),
+        fileSeqs(concurrentSnapshotSeq, concurrentSnapshotSeq),
+        ids(concurrentSnapshotId, concurrentSnapshotId),
+        files(FILE_C, FILE_D),
+        statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+    validateManifest(
+        dataManifests.get(1),
+        dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        ids(appendSnapshotId, appendSnapshotId),
+        files(FILE_A, FILE_B),
+        statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+
+    // the rewrite must replace the original delete manifest with 2 new delete 
manifests
+    List<ManifestFile> deleteManifests = 
rewriteSnapshot.deleteManifests(table.io());
+    assertThat(deleteManifests).hasSize(2);
+    validateDeleteManifest(
+        deleteManifests.get(0),
+        dataSeqs(deleteSnapshotSeq),
+        fileSeqs(deleteSnapshotSeq),
+        ids(deleteSnapshotId),
+        files(FILE_A_DELETES),
+        statuses(ManifestEntry.Status.EXISTING));
+    validateDeleteManifest(
+        deleteManifests.get(1),
+        dataSeqs(deleteSnapshotSeq),
+        fileSeqs(deleteSnapshotSeq),
+        ids(deleteSnapshotId),
+        files(FILE_A2_DELETES),
+        statuses(ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testDeleteManifestReplacementConcurrentDeleteFileRemoval() 
throws IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    // commit data files
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    // save the initial append snapshot info
+    Snapshot appendSnapshot = table.currentSnapshot();
+    long appendSnapshotId = appendSnapshot.snapshotId();
+    long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+    // commit the first set of delete files
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+    // save the first delete snapshot info
+    Snapshot deleteSnapshot1 = table.currentSnapshot();
+    long deleteSnapshotId1 = deleteSnapshot1.snapshotId();
+    long deleteSnapshotSeq1 = deleteSnapshot1.sequenceNumber();
+
+    // commit the second set of delete files
+    
table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit();
+
+    // save the second delete snapshot info
+    Snapshot deleteSnapshot2 = table.currentSnapshot();
+    long deleteSnapshotId2 = deleteSnapshot2.snapshotId();
+    long deleteSnapshotSeq2 = deleteSnapshot2.sequenceNumber();
+
+    // split the original delete manifest into 2 new delete manifests
+    ManifestFile originalDeleteManifest = 
deleteSnapshot1.deleteManifests(table.io()).get(0);
+    ManifestFile newDeleteManifest1 =
+        writeManifest(
+            "delete-manifest-file-1.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId1,
+                deleteSnapshotSeq1,
+                deleteSnapshotSeq1,
+                FILE_A_DELETES));
+    ManifestFile newDeleteManifest2 =
+        writeManifest(
+            "delete-manifest-file-2.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId1,
+                deleteSnapshotSeq1,
+                deleteSnapshotSeq1,
+                FILE_A2_DELETES));
+
+    // start the rewrite
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    rewriteManifests.deleteManifest(originalDeleteManifest);
+    rewriteManifests.addManifest(newDeleteManifest1);
+    rewriteManifests.addManifest(newDeleteManifest2);
+
+    // commit the third set of delete files concurrently
+    table.newRewrite().deleteFile(FILE_B_DELETES).commit();
+
+    Snapshot concurrentSnapshot = table.currentSnapshot();
+    long concurrentSnapshotId = concurrentSnapshot.snapshotId();
+
+    // there must be 1 data manifest and 2 delete manifests before the rewrite 
is committed
+    assertManifestCounts(table, 1, 2);
+
+    // commit the rewrite successfully as operations are not in conflict
+    rewriteManifests.commit();
+
+    Snapshot rewriteSnapshot = table.currentSnapshot();
+
+    validateSummary(rewriteSnapshot, 1, 2, 2, 0);
+
+    // the rewrite must preserve the original data manifest
+    ManifestFile dataManifest = 
Iterables.getOnlyElement(rewriteSnapshot.dataManifests(table.io()));
+    validateManifest(
+        dataManifest,
+        dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+        ids(appendSnapshotId, appendSnapshotId),
+        files(FILE_A, FILE_B),
+        statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+
+    // the rewrite must replace the first delete manifest with 2 new delete 
manifests
+    // the rewrite must also keep the second delete manifest modified 
concurrently
+    List<ManifestFile> deleteManifests = 
rewriteSnapshot.deleteManifests(table.io());
+    assertThat(deleteManifests).hasSize(3);
+    validateDeleteManifest(
+        deleteManifests.get(0),
+        dataSeqs(deleteSnapshotSeq1),
+        fileSeqs(deleteSnapshotSeq1),
+        ids(deleteSnapshotId1),
+        files(FILE_A_DELETES),
+        statuses(ManifestEntry.Status.EXISTING));
+    validateDeleteManifest(
+        deleteManifests.get(1),
+        dataSeqs(deleteSnapshotSeq1),
+        fileSeqs(deleteSnapshotSeq1),
+        ids(deleteSnapshotId1),
+        files(FILE_A2_DELETES),
+        statuses(ManifestEntry.Status.EXISTING));
+    validateDeleteManifest(
+        deleteManifests.get(2),
+        dataSeqs(deleteSnapshotSeq2, deleteSnapshotSeq2),
+        fileSeqs(deleteSnapshotSeq2, deleteSnapshotSeq2),
+        ids(concurrentSnapshotId, deleteSnapshotId2),
+        files(FILE_B_DELETES, FILE_C2_DELETES),
+        statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testDeleteManifestReplacementConflictingDeleteFileRemoval() 
throws IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    // commit data files
+    
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
+
+    // commit delete files
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+    // save the delete snapshot info
+    Snapshot deleteSnapshot = table.currentSnapshot();
+    long deleteSnapshotId = deleteSnapshot.snapshotId();
+    long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+    // split the original delete manifest into 2 new delete manifests
+    ManifestFile originalDeleteManifest = 
deleteSnapshot.deleteManifests(table.io()).get(0);
+    ManifestFile newDeleteManifest1 =
+        writeManifest(
+            "delete-manifest-file-1.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId,
+                deleteSnapshotSeq,
+                deleteSnapshotSeq,
+                FILE_A_DELETES));
+    ManifestFile newDeleteManifest2 =
+        writeManifest(
+            "delete-manifest-file-2.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId,
+                deleteSnapshotSeq,
+                deleteSnapshotSeq,
+                FILE_A2_DELETES));
+
+    // start the rewrite
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    rewriteManifests.deleteManifest(originalDeleteManifest);
+    rewriteManifests.addManifest(newDeleteManifest1);
+    rewriteManifests.addManifest(newDeleteManifest2);
+
+    // modify the original delete manifest concurrently
+    table.newRewrite().deleteFile(FILE_A_DELETES).commit();
+
+    // the rewrite must fail as the original delete manifest was replaced 
concurrently
+    Assertions.assertThatThrownBy(rewriteManifests::commit)
+        .isInstanceOf(ValidationException.class)
+        .hasMessageStartingWith("Manifest is missing");
+  }
+
+  @Test
+  public void testDeleteManifestReplacementFailure() throws IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    // commit a data file
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    // commit the first delete file
+    table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+
+    // save the first delete snapshot info
+    Snapshot deleteSnapshot1 = table.currentSnapshot();
+    long deleteSnapshotId1 = deleteSnapshot1.snapshotId();
+    long deleteSnapshotSeq1 = deleteSnapshot1.sequenceNumber();
+
+    // commit the second delete file
+    table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
+
+    // save the second delete snapshot info
+    Snapshot deleteSnapshot2 = table.currentSnapshot();
+    long deleteSnapshotId2 = deleteSnapshot2.snapshotId();
+    long deleteSnapshotSeq2 = deleteSnapshot2.sequenceNumber();
+
+    // there must be 1 data manifest and 2 delete manifests before the rewrite
+    assertManifestCounts(table, 1, 2);
+
+    // combine the original delete manifests into 1 new delete manifest
+    ManifestFile newDeleteManifest =
+        writeManifest(
+            "delete-manifest-file.avro",
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId1,
+                deleteSnapshotSeq1,
+                deleteSnapshotSeq1,
+                FILE_A_DELETES),
+            manifestEntry(
+                ManifestEntry.Status.EXISTING,
+                deleteSnapshotId2,
+                deleteSnapshotSeq2,
+                deleteSnapshotSeq2,
+                FILE_A2_DELETES));
+
+    // configure the table operations to fail
+    table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, 
"1").commit();
+    table.ops().failCommits(5);
+
+    // start the rewrite
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    List<ManifestFile> originalDeleteManifests = 
deleteSnapshot2.deleteManifests(table.io());
+    for (ManifestFile originalDeleteManifest : originalDeleteManifests) {
+      rewriteManifests.deleteManifest(originalDeleteManifest);
+    }
+    rewriteManifests.addManifest(newDeleteManifest);
+
+    // the rewrite must fail
+    Assertions.assertThatThrownBy(rewriteManifests::commit)
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessage("Injected failure");
+
+    // the new manifest must not be deleted as the commit hasn't succeeded
+    assertThat(new File(newDeleteManifest.path())).exists();
+  }
+
+  private void assertManifestCounts(
+      Table table, int expectedDataManifestCount, int 
expectedDeleteManifestCount) {
+    Snapshot snapshot = table.currentSnapshot();
+    
assertThat(snapshot.dataManifests(table.io())).hasSize(expectedDataManifestCount);
+    
assertThat(snapshot.deleteManifests(table.io())).hasSize(expectedDeleteManifestCount);
+  }
+
+  private List<ManifestFile> sortedDataManifests(FileIO io, Snapshot snapshot) 
{
+    List<ManifestFile> manifests = 
Lists.newArrayList(snapshot.dataManifests(io));
+    manifests.sort(Comparator.comparing(ManifestFile::path));
+    return manifests;
+  }
+
   private void validateSummary(
       Snapshot snapshot, int replaced, int kept, int created, int entryCount) {
     Map<String, String> summary = snapshot.summary();

Reply via email to