This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8c625dd7d2 Core: Support replacing delete manifests (#9000)
8c625dd7d2 is described below
commit 8c625dd7d21e38235d2864e081c008ef11e0fd20
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Nov 9 14:54:32 2023 -0800
Core: Support replacing delete manifests (#9000)
---
.../org/apache/iceberg/BaseRewriteManifests.java | 13 +-
.../java/org/apache/iceberg/TableTestBase.java | 10 +-
.../org/apache/iceberg/TestRewriteManifests.java | 609 +++++++++++++++++++++
3 files changed, 624 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 1f0d204dc2..87768e3489 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -168,7 +168,7 @@ public class BaseRewriteManifests extends
SnapshotProducer<RewriteManifests>
@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
- List<ManifestFile> currentManifests =
base.currentSnapshot().dataManifests(ops.io());
+ List<ManifestFile> currentManifests =
base.currentSnapshot().allManifests(ops.io());
Set<ManifestFile> currentManifestSet =
ImmutableSet.copyOf(currentManifests);
validateDeletedManifests(currentManifestSet);
@@ -190,7 +190,6 @@ public class BaseRewriteManifests extends
SnapshotProducer<RewriteManifests>
List<ManifestFile> apply = Lists.newArrayList();
Iterables.addAll(apply, newManifestsWithMetadata);
apply.addAll(keptManifests);
- apply.addAll(base.currentSnapshot().deleteManifests(ops.io()));
return apply;
}
@@ -242,7 +241,7 @@ public class BaseRewriteManifests extends
SnapshotProducer<RewriteManifests>
.executeWith(workerPool())
.run(
manifest -> {
- if (predicate != null && !predicate.test(manifest)) {
+ if (containsDeletes(manifest) || !matchesPredicate(manifest)) {
keptManifests.add(manifest);
} else {
rewrittenManifests.add(manifest);
@@ -268,6 +267,14 @@ public class BaseRewriteManifests extends
SnapshotProducer<RewriteManifests>
}
}
+ private boolean containsDeletes(ManifestFile manifest) {
+ return manifest.content() == ManifestContent.DELETES;
+ }
+
+ private boolean matchesPredicate(ManifestFile manifest) {
+ return predicate == null || predicate.test(manifest);
+ }
+
private void validateDeletedManifests(Set<ManifestFile> currentManifests) {
// directly deleted manifests must be still present in the current snapshot
deletedManifests.stream()
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index d50845933e..68ce055289 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -328,19 +328,19 @@ public class TableTestBase {
return writer.toManifestFile();
}
- ManifestEntry<DataFile> manifestEntry(
- ManifestEntry.Status status, Long snapshotId, DataFile file) {
+ <F extends ContentFile<F>> ManifestEntry<F> manifestEntry(
+ ManifestEntry.Status status, Long snapshotId, F file) {
return manifestEntry(status, snapshotId, 0L, 0L, file);
}
- ManifestEntry<DataFile> manifestEntry(
+ <F extends ContentFile<F>> ManifestEntry<F> manifestEntry(
ManifestEntry.Status status,
Long snapshotId,
Long dataSequenceNumber,
Long fileSequenceNumber,
- DataFile file) {
+ F file) {
- GenericManifestEntry<DataFile> entry = new
GenericManifestEntry<>(table.spec().partitionType());
+ GenericManifestEntry<F> entry = new
GenericManifestEntry<>(table.spec().partitionType());
switch (status) {
case ADDED:
if (dataSequenceNumber != null && dataSequenceNumber != 0) {
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index d7daae8b3e..8cc7e44068 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static
org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -36,7 +37,9 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
@@ -1105,6 +1108,612 @@ public class TestRewriteManifests extends TableTestBase
{
"Cannot commit to branch someBranch:
org.apache.iceberg.BaseRewriteManifests does not support branch commits");
}
+ @Test
+ public void testRewriteDataManifestsPreservesDeletes() {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ Table table = load();
+
+ // commit data files
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ // save the append snapshot info
+ Snapshot appendSnapshot = table.currentSnapshot();
+ long appendSnapshotId = appendSnapshot.snapshotId();
+ long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+ // commit delete files
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+ // save the delete snapshot info
+ Snapshot deleteSnapshot = table.currentSnapshot();
+ long deleteSnapshotId = deleteSnapshot.snapshotId();
+ long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+ // there must be 1 data and 1 delete manifest before the rewrite
+ assertManifestCounts(table, 1, 1);
+
+ // rewrite manifests and cluster entries by file path
+ table.rewriteManifests().clusterBy(file ->
file.path().toString()).commit();
+
+ Snapshot rewriteSnapshot = table.currentSnapshot();
+
+ validateSummary(rewriteSnapshot, 1, 1, 2, 2);
+
+ // the rewrite must replace the original data manifest with 2 new data
manifests
+ List<ManifestFile> dataManifests = sortedDataManifests(table.io(),
rewriteSnapshot);
+ assertThat(dataManifests).hasSize(2);
+ validateManifest(
+ dataManifests.get(0),
+ dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ ids(appendSnapshotId),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateManifest(
+ dataManifests.get(1),
+ dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ ids(appendSnapshotId),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ // the rewrite must preserve the original delete manifest (rewriting is
not supported yet)
+ List<ManifestFile> deleteManifests =
rewriteSnapshot.deleteManifests(table.io());
+ ManifestFile deleteManifest = Iterables.getOnlyElement(deleteManifests);
+ validateDeleteManifest(
+ deleteManifest,
+ dataSeqs(deleteSnapshotSeq, deleteSnapshotSeq),
+ fileSeqs(deleteSnapshotSeq, deleteSnapshotSeq),
+ ids(deleteSnapshotId, deleteSnapshotId),
+ files(FILE_A_DELETES, FILE_A2_DELETES),
+ statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+ }
+
+ @Test
+ public void testReplaceDeleteManifestsOnly() throws IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ Table table = load();
+
+ // commit data files
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ // save the append snapshot info
+ Snapshot appendSnapshot = table.currentSnapshot();
+ long appendSnapshotId = appendSnapshot.snapshotId();
+ long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+ // commit delete files
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+ // save the delete snapshot info
+ Snapshot deleteSnapshot = table.currentSnapshot();
+ long deleteSnapshotId = deleteSnapshot.snapshotId();
+ long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+ // there must be 1 data and 1 delete manifest before the rewrite
+ assertManifestCounts(table, 1, 1);
+
+ // split the original delete manifest into 2 new delete manifests
+ ManifestFile originalDeleteManifest =
+ Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
+ ManifestFile newDeleteManifest1 =
+ writeManifest(
+ "delete-manifest-file-1.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId,
+ deleteSnapshotSeq,
+ deleteSnapshotSeq,
+ FILE_A_DELETES));
+ ManifestFile newDeleteManifest2 =
+ writeManifest(
+ "delete-manifest-file-2.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId,
+ deleteSnapshotSeq,
+ deleteSnapshotSeq,
+ FILE_A2_DELETES));
+
+ // replace the original delete manifest with the new delete manifests
+ table
+ .rewriteManifests()
+ .deleteManifest(originalDeleteManifest)
+ .addManifest(newDeleteManifest1)
+ .addManifest(newDeleteManifest2)
+ .commit();
+
+ Snapshot rewriteSnapshot = table.currentSnapshot();
+
+ // the rewrite must preserve the original data manifest
+ ManifestFile dataManifest =
Iterables.getOnlyElement(rewriteSnapshot.dataManifests(table.io()));
+ validateManifest(
+ dataManifest,
+ dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ ids(appendSnapshotId, appendSnapshotId),
+ files(FILE_A, FILE_B),
+ statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+
+ // the rewrite must replace the original delete manifest with 2 new delete
manifests
+ List<ManifestFile> deleteManifests =
rewriteSnapshot.deleteManifests(table.io());
+ assertThat(deleteManifests).hasSize(2);
+ validateDeleteManifest(
+ deleteManifests.get(0),
+ dataSeqs(deleteSnapshotSeq),
+ fileSeqs(deleteSnapshotSeq),
+ ids(deleteSnapshotId),
+ files(FILE_A_DELETES),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateDeleteManifest(
+ deleteManifests.get(1),
+ dataSeqs(deleteSnapshotSeq),
+ fileSeqs(deleteSnapshotSeq),
+ ids(deleteSnapshotId),
+ files(FILE_A2_DELETES),
+ statuses(ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testReplaceDataAndDeleteManifests() throws IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ Table table = load();
+
+ // commit data files
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ // save the append snapshot info
+ Snapshot appendSnapshot = table.currentSnapshot();
+ long appendSnapshotId = appendSnapshot.snapshotId();
+ long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+ // commit delete files
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+ // save the delete snapshot info
+ Snapshot deleteSnapshot = table.currentSnapshot();
+ long deleteSnapshotId = deleteSnapshot.snapshotId();
+ long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+ // there must be 1 data and 1 delete manifest before the rewrite
+ assertManifestCounts(table, 1, 1);
+
+ // split the original data manifest into 2 new data manifests
+ ManifestFile originalDataManifest =
+ Iterables.getOnlyElement(deleteSnapshot.dataManifests(table.io()));
+ ManifestFile newDataManifest1 =
+ writeManifest(
+ "manifest-file-1.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ appendSnapshotId,
+ appendSnapshotSeq,
+ appendSnapshotSeq,
+ FILE_A));
+ ManifestFile newDataManifest2 =
+ writeManifest(
+ "manifest-file-2.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ appendSnapshotId,
+ appendSnapshotSeq,
+ appendSnapshotSeq,
+ FILE_B));
+
+ // split the original delete manifest into 2 new delete manifests
+ ManifestFile originalDeleteManifest =
+ Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
+ ManifestFile newDeleteManifest1 =
+ writeManifest(
+ "delete-manifest-file-1.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId,
+ deleteSnapshotSeq,
+ deleteSnapshotSeq,
+ FILE_A_DELETES));
+ ManifestFile newDeleteManifest2 =
+ writeManifest(
+ "delete-manifest-file-2.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId,
+ deleteSnapshotSeq,
+ deleteSnapshotSeq,
+ FILE_A2_DELETES));
+
+ // replace the original data and delete manifests with new ones
+ table
+ .rewriteManifests()
+ .deleteManifest(originalDataManifest)
+ .addManifest(newDataManifest1)
+ .addManifest(newDataManifest2)
+ .deleteManifest(originalDeleteManifest)
+ .addManifest(newDeleteManifest1)
+ .addManifest(newDeleteManifest2)
+ .commit();
+
+ Snapshot rewriteSnapshot = table.currentSnapshot();
+
+ // the rewrite must replace the original data manifest with 2 new data
manifests
+ List<ManifestFile> dataManifests = sortedDataManifests(table.io(),
rewriteSnapshot);
+ assertThat(dataManifests).hasSize(2);
+ validateManifest(
+ dataManifests.get(0),
+ dataSeqs(appendSnapshotSeq),
+ fileSeqs(appendSnapshotSeq),
+ ids(appendSnapshotId),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateManifest(
+ dataManifests.get(1),
+ dataSeqs(appendSnapshotSeq),
+ fileSeqs(appendSnapshotSeq),
+ ids(appendSnapshotId),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ // the rewrite must replace the original delete manifest with 2 new delete
manifests
+ List<ManifestFile> deleteManifests =
rewriteSnapshot.deleteManifests(table.io());
+ assertThat(deleteManifests).hasSize(2);
+ validateDeleteManifest(
+ deleteManifests.get(0),
+ dataSeqs(deleteSnapshotSeq),
+ fileSeqs(deleteSnapshotSeq),
+ ids(deleteSnapshotId),
+ files(FILE_A_DELETES),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateDeleteManifest(
+ deleteManifests.get(1),
+ dataSeqs(deleteSnapshotSeq),
+ fileSeqs(deleteSnapshotSeq),
+ ids(deleteSnapshotId),
+ files(FILE_A2_DELETES),
+ statuses(ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testDeleteManifestReplacementConcurrentAppend() throws
IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ // commit data files
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ // save the initial append snapshot info
+ Snapshot appendSnapshot = table.currentSnapshot();
+ long appendSnapshotId = appendSnapshot.snapshotId();
+ long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+ // commit delete files
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+ // save the delete snapshot info
+ Snapshot deleteSnapshot = table.currentSnapshot();
+ long deleteSnapshotId = deleteSnapshot.snapshotId();
+ long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+ // split the original delete manifest into 2 new delete manifests
+ ManifestFile originalDeleteManifest =
+ Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
+ ManifestFile newDeleteManifest1 =
+ writeManifest(
+ "delete-manifest-file-1.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId,
+ deleteSnapshotSeq,
+ deleteSnapshotSeq,
+ FILE_A_DELETES));
+ ManifestFile newDeleteManifest2 =
+ writeManifest(
+ "delete-manifest-file-2.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId,
+ deleteSnapshotSeq,
+ deleteSnapshotSeq,
+ FILE_A2_DELETES));
+
+ // start the rewrite
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ rewriteManifests.deleteManifest(originalDeleteManifest);
+ rewriteManifests.addManifest(newDeleteManifest1);
+ rewriteManifests.addManifest(newDeleteManifest2);
+
+ // commit another append concurrently
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+
+ // save the concurrent snapshot info
+ Snapshot concurrentSnapshot = table.currentSnapshot();
+ long concurrentSnapshotSeq = concurrentSnapshot.sequenceNumber();
+ long concurrentSnapshotId = concurrentSnapshot.snapshotId();
+
+ // there must be 2 data manifests and 1 delete manifest before the rewrite
is committed
+ assertManifestCounts(table, 2, 1);
+
+ // commit the rewrite successfully as operations are not in conflict
+ rewriteManifests.commit();
+
+ Snapshot rewriteSnapshot = table.currentSnapshot();
+
+ validateSummary(rewriteSnapshot, 1, 2, 2, 0);
+
+ // the rewrite must preserve the original and added concurrently data
manifests
+ List<ManifestFile> dataManifests =
rewriteSnapshot.dataManifests(table.io());
+ assertThat(dataManifests).hasSize(2);
+ validateManifest(
+ dataManifests.get(0),
+ dataSeqs(concurrentSnapshotSeq, concurrentSnapshotSeq),
+ fileSeqs(concurrentSnapshotSeq, concurrentSnapshotSeq),
+ ids(concurrentSnapshotId, concurrentSnapshotId),
+ files(FILE_C, FILE_D),
+ statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+ validateManifest(
+ dataManifests.get(1),
+ dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ ids(appendSnapshotId, appendSnapshotId),
+ files(FILE_A, FILE_B),
+ statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+
+ // the rewrite must replace the original delete manifest with 2 new delete
manifests
+ List<ManifestFile> deleteManifests =
rewriteSnapshot.deleteManifests(table.io());
+ assertThat(deleteManifests).hasSize(2);
+ validateDeleteManifest(
+ deleteManifests.get(0),
+ dataSeqs(deleteSnapshotSeq),
+ fileSeqs(deleteSnapshotSeq),
+ ids(deleteSnapshotId),
+ files(FILE_A_DELETES),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateDeleteManifest(
+ deleteManifests.get(1),
+ dataSeqs(deleteSnapshotSeq),
+ fileSeqs(deleteSnapshotSeq),
+ ids(deleteSnapshotId),
+ files(FILE_A2_DELETES),
+ statuses(ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testDeleteManifestReplacementConcurrentDeleteFileRemoval()
throws IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ // commit data files
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ // save the initial append snapshot info
+ Snapshot appendSnapshot = table.currentSnapshot();
+ long appendSnapshotId = appendSnapshot.snapshotId();
+ long appendSnapshotSeq = appendSnapshot.sequenceNumber();
+
+ // commit the first set of delete files
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+ // save the first delete snapshot info
+ Snapshot deleteSnapshot1 = table.currentSnapshot();
+ long deleteSnapshotId1 = deleteSnapshot1.snapshotId();
+ long deleteSnapshotSeq1 = deleteSnapshot1.sequenceNumber();
+
+ // commit the second set of delete files
+
table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit();
+
+ // save the second delete snapshot info
+ Snapshot deleteSnapshot2 = table.currentSnapshot();
+ long deleteSnapshotId2 = deleteSnapshot2.snapshotId();
+ long deleteSnapshotSeq2 = deleteSnapshot2.sequenceNumber();
+
+ // split the original delete manifest into 2 new delete manifests
+ ManifestFile originalDeleteManifest =
deleteSnapshot1.deleteManifests(table.io()).get(0);
+ ManifestFile newDeleteManifest1 =
+ writeManifest(
+ "delete-manifest-file-1.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId1,
+ deleteSnapshotSeq1,
+ deleteSnapshotSeq1,
+ FILE_A_DELETES));
+ ManifestFile newDeleteManifest2 =
+ writeManifest(
+ "delete-manifest-file-2.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId1,
+ deleteSnapshotSeq1,
+ deleteSnapshotSeq1,
+ FILE_A2_DELETES));
+
+ // start the rewrite
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ rewriteManifests.deleteManifest(originalDeleteManifest);
+ rewriteManifests.addManifest(newDeleteManifest1);
+ rewriteManifests.addManifest(newDeleteManifest2);
+
+ // commit the third set of delete files concurrently
+ table.newRewrite().deleteFile(FILE_B_DELETES).commit();
+
+ Snapshot concurrentSnapshot = table.currentSnapshot();
+ long concurrentSnapshotId = concurrentSnapshot.snapshotId();
+
+ // there must be 1 data manifest and 2 delete manifests before the rewrite
is committed
+ assertManifestCounts(table, 1, 2);
+
+ // commit the rewrite successfully as operations are not in conflict
+ rewriteManifests.commit();
+
+ Snapshot rewriteSnapshot = table.currentSnapshot();
+
+ validateSummary(rewriteSnapshot, 1, 2, 2, 0);
+
+ // the rewrite must preserve the original data manifest
+ ManifestFile dataManifest =
Iterables.getOnlyElement(rewriteSnapshot.dataManifests(table.io()));
+ validateManifest(
+ dataManifest,
+ dataSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ fileSeqs(appendSnapshotSeq, appendSnapshotSeq),
+ ids(appendSnapshotId, appendSnapshotId),
+ files(FILE_A, FILE_B),
+ statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+
+ // the rewrite must replace the first delete manifest with 2 new delete
manifests
+ // the rewrite must also keep the second delete manifest modified
concurrently
+ List<ManifestFile> deleteManifests =
rewriteSnapshot.deleteManifests(table.io());
+ assertThat(deleteManifests).hasSize(3);
+ validateDeleteManifest(
+ deleteManifests.get(0),
+ dataSeqs(deleteSnapshotSeq1),
+ fileSeqs(deleteSnapshotSeq1),
+ ids(deleteSnapshotId1),
+ files(FILE_A_DELETES),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateDeleteManifest(
+ deleteManifests.get(1),
+ dataSeqs(deleteSnapshotSeq1),
+ fileSeqs(deleteSnapshotSeq1),
+ ids(deleteSnapshotId1),
+ files(FILE_A2_DELETES),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateDeleteManifest(
+ deleteManifests.get(2),
+ dataSeqs(deleteSnapshotSeq2, deleteSnapshotSeq2),
+ fileSeqs(deleteSnapshotSeq2, deleteSnapshotSeq2),
+ ids(concurrentSnapshotId, deleteSnapshotId2),
+ files(FILE_B_DELETES, FILE_C2_DELETES),
+ statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testDeleteManifestReplacementConflictingDeleteFileRemoval()
throws IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ // commit data files
+
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
+
+ // commit delete files
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+
+ // save the delete snapshot info
+ Snapshot deleteSnapshot = table.currentSnapshot();
+ long deleteSnapshotId = deleteSnapshot.snapshotId();
+ long deleteSnapshotSeq = deleteSnapshot.sequenceNumber();
+
+ // split the original delete manifest into 2 new delete manifests
+ ManifestFile originalDeleteManifest =
deleteSnapshot.deleteManifests(table.io()).get(0);
+ ManifestFile newDeleteManifest1 =
+ writeManifest(
+ "delete-manifest-file-1.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId,
+ deleteSnapshotSeq,
+ deleteSnapshotSeq,
+ FILE_A_DELETES));
+ ManifestFile newDeleteManifest2 =
+ writeManifest(
+ "delete-manifest-file-2.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId,
+ deleteSnapshotSeq,
+ deleteSnapshotSeq,
+ FILE_A2_DELETES));
+
+ // start the rewrite
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ rewriteManifests.deleteManifest(originalDeleteManifest);
+ rewriteManifests.addManifest(newDeleteManifest1);
+ rewriteManifests.addManifest(newDeleteManifest2);
+
+ // modify the original delete manifest concurrently
+ table.newRewrite().deleteFile(FILE_A_DELETES).commit();
+
+ // the rewrite must fail as the original delete manifest was replaced
concurrently
+ Assertions.assertThatThrownBy(rewriteManifests::commit)
+ .isInstanceOf(ValidationException.class)
+ .hasMessageStartingWith("Manifest is missing");
+ }
+
+ @Test
+ public void testDeleteManifestReplacementFailure() throws IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ // commit a data file
+ table.newFastAppend().appendFile(FILE_A).commit();
+
+ // commit the first delete file
+ table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+
+ // save the first delete snapshot info
+ Snapshot deleteSnapshot1 = table.currentSnapshot();
+ long deleteSnapshotId1 = deleteSnapshot1.snapshotId();
+ long deleteSnapshotSeq1 = deleteSnapshot1.sequenceNumber();
+
+ // commit the second delete file
+ table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
+
+ // save the second delete snapshot info
+ Snapshot deleteSnapshot2 = table.currentSnapshot();
+ long deleteSnapshotId2 = deleteSnapshot2.snapshotId();
+ long deleteSnapshotSeq2 = deleteSnapshot2.sequenceNumber();
+
+ // there must be 1 data manifest and 2 delete manifests before the rewrite
+ assertManifestCounts(table, 1, 2);
+
+ // combine the original delete manifests into 1 new delete manifest
+ ManifestFile newDeleteManifest =
+ writeManifest(
+ "delete-manifest-file.avro",
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId1,
+ deleteSnapshotSeq1,
+ deleteSnapshotSeq1,
+ FILE_A_DELETES),
+ manifestEntry(
+ ManifestEntry.Status.EXISTING,
+ deleteSnapshotId2,
+ deleteSnapshotSeq2,
+ deleteSnapshotSeq2,
+ FILE_A2_DELETES));
+
+ // configure the table operations to fail
+ table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES,
"1").commit();
+ table.ops().failCommits(5);
+
+ // start the rewrite
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ List<ManifestFile> originalDeleteManifests =
deleteSnapshot2.deleteManifests(table.io());
+ for (ManifestFile originalDeleteManifest : originalDeleteManifests) {
+ rewriteManifests.deleteManifest(originalDeleteManifest);
+ }
+ rewriteManifests.addManifest(newDeleteManifest);
+
+ // the rewrite must fail
+ Assertions.assertThatThrownBy(rewriteManifests::commit)
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessage("Injected failure");
+
+ // the new manifest must not be deleted as the commit hasn't succeeded
+ assertThat(new File(newDeleteManifest.path())).exists();
+ }
+
+ private void assertManifestCounts(
+ Table table, int expectedDataManifestCount, int
expectedDeleteManifestCount) {
+ Snapshot snapshot = table.currentSnapshot();
+
assertThat(snapshot.dataManifests(table.io())).hasSize(expectedDataManifestCount);
+
assertThat(snapshot.deleteManifests(table.io())).hasSize(expectedDeleteManifestCount);
+ }
+
+ private List<ManifestFile> sortedDataManifests(FileIO io, Snapshot snapshot)
{
+ List<ManifestFile> manifests =
Lists.newArrayList(snapshot.dataManifests(io));
+ manifests.sort(Comparator.comparing(ManifestFile::path));
+ return manifests;
+ }
+
private void validateSummary(
Snapshot snapshot, int replaced, int kept, int created, int entryCount) {
Map<String, String> summary = snapshot.summary();