This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 88a6e4edb1 Core: Support performing delete files and merge appends on
branches (#5618)
88a6e4edb1 is described below
commit 88a6e4edb1f9649edb806407b535f78ae8c259e2
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Oct 17 18:03:51 2022 -0700
Core: Support performing delete files and merge appends on branches (#5618)
---
.../main/java/org/apache/iceberg/MergeAppend.java | 6 +
.../java/org/apache/iceberg/StreamingDelete.java | 6 +
.../java/org/apache/iceberg/TableTestBase.java | 39 +++
.../java/org/apache/iceberg/TestDeleteFiles.java | 139 +++++---
.../java/org/apache/iceberg/TestMergeAppend.java | 358 ++++++++++-----------
.../org/apache/iceberg/TestRemoveSnapshots.java | 64 +++-
6 files changed, 382 insertions(+), 230 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java
b/core/src/main/java/org/apache/iceberg/MergeAppend.java
index 1781e95e9d..3ef553ba78 100644
--- a/core/src/main/java/org/apache/iceberg/MergeAppend.java
+++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java
@@ -48,6 +48,12 @@ class MergeAppend extends
MergingSnapshotProducer<AppendFiles> implements Append
return this;
}
+ @Override
+ public MergeAppend toBranch(String branch) {
+ targetBranch(branch);
+ return this;
+ }
+
@Override
public AppendFiles appendManifest(ManifestFile manifest) {
Preconditions.checkArgument(
diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
index 493c4e44c8..8ff7bb831e 100644
--- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
+++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
@@ -59,4 +59,10 @@ public class StreamingDelete extends
MergingSnapshotProducer<DeleteFiles> implem
deleteByRowFilter(expr);
return this;
}
+
+ @Override
+ public StreamingDelete toBranch(String branch) {
+ targetBranch(branch);
+ return this;
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index ffe909e9bd..65461b465e 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -349,6 +349,45 @@ public class TableTestBase {
validateSnapshot(old, snap, (Long) sequenceNumber, newFiles);
}
+ @SuppressWarnings("checkstyle:HiddenField")
+ Snapshot commit(Table table, SnapshotUpdate snapshotUpdate, String branch) {
+ Snapshot snapshot;
+ if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ snapshotUpdate.commit();
+ snapshot = table.currentSnapshot();
+ } else {
+ ((SnapshotProducer) snapshotUpdate.toBranch(branch)).commit();
+ snapshot = table.snapshot(branch);
+ }
+
+ return snapshot;
+ }
+
+ Snapshot apply(SnapshotUpdate snapshotUpdate, String branch) {
+ if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ return ((SnapshotProducer) snapshotUpdate).apply();
+ } else {
+ return ((SnapshotProducer) snapshotUpdate.toBranch(branch)).apply();
+ }
+ }
+
+ @SuppressWarnings("checkstyle:HiddenField")
+ Snapshot latestSnapshot(Table table, String branch) {
+ if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ return table.currentSnapshot();
+ }
+
+ return table.snapshot(branch);
+ }
+
+ Snapshot latestSnapshot(TableMetadata metadata, String branch) {
+ if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ return metadata.currentSnapshot();
+ }
+
+ return metadata.snapshot(metadata.ref(branch).snapshotId());
+ }
+
void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber,
DataFile... newFiles) {
Assert.assertEquals(
"Should not change delete manifests",
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
index 58d4352626..1ee3b663bc 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
@@ -68,38 +69,44 @@ public class TestDeleteFiles extends TableTestBase {
))
.build();
- @Parameterized.Parameters(name = "formatVersion = {0}")
+ private final String branch;
+
+ @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
public static Object[] parameters() {
- return new Object[] {1, 2};
+ return new Object[][] {
+ new Object[] {1, "main"},
+ new Object[] {1, "testBranch"},
+ new Object[] {2, "main"},
+ new Object[] {2, "testBranch"}
+ };
}
- public TestDeleteFiles(int formatVersion) {
+ public TestDeleteFiles(int formatVersion, String branch) {
super(formatVersion);
+ this.branch = branch;
}
@Test
public void testMultipleDeletes() {
-
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
-
+ commit(
+ table,
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C),
branch);
+ Snapshot append = latestSnapshot(readMetadata(), branch);
Assert.assertEquals("Metadata should be at version 1", 1L, (long)
version());
- Snapshot append = readMetadata().currentSnapshot();
validateSnapshot(null, append, FILE_A, FILE_B, FILE_C);
- table.newDelete().deleteFile(FILE_A).commit();
+ commit(table, table.newDelete().deleteFile(FILE_A), branch);
+ Snapshot delete1 = latestSnapshot(readMetadata(), branch);
Assert.assertEquals("Metadata should be at version 2", 2L, (long)
version());
- Snapshot delete = readMetadata().currentSnapshot();
- Assert.assertEquals("Should have 1 manifest", 1,
delete.allManifests(FILE_IO).size());
+ Assert.assertEquals("Should have 1 manifest", 1,
delete1.allManifests(FILE_IO).size());
validateManifestEntries(
- delete.allManifests(table.io()).get(0),
- ids(delete.snapshotId(), append.snapshotId(), append.snapshotId()),
+ delete1.allManifests(table.io()).get(0),
+ ids(delete1.snapshotId(), append.snapshotId(), append.snapshotId()),
files(FILE_A, FILE_B, FILE_C),
statuses(Status.DELETED, Status.EXISTING, Status.EXISTING));
- table.newDelete().deleteFile(FILE_B).commit();
-
+ Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B),
branch);
Assert.assertEquals("Metadata should be at version 3", 3L, (long)
version());
- Snapshot delete2 = readMetadata().currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1,
delete2.allManifests(FILE_IO).size());
validateManifestEntries(
delete2.allManifests(FILE_IO).get(0),
@@ -147,9 +154,12 @@ public class TestDeleteFiles extends TableTestBase {
.build();
// add both data files
-
table.newFastAppend().appendFile(firstDataFile).appendFile(secondDataFile).commit();
+ Snapshot initialSnapshot =
+ commit(
+ table,
+
table.newFastAppend().appendFile(firstDataFile).appendFile(secondDataFile),
+ branch);
- Snapshot initialSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1,
initialSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
initialSnapshot.allManifests(FILE_IO).get(0),
@@ -158,9 +168,7 @@ public class TestDeleteFiles extends TableTestBase {
statuses(Status.ADDED, Status.ADDED));
// delete the first data file
- table.newDelete().deleteFile(firstDataFile).commit();
-
- Snapshot deleteSnapshot = table.currentSnapshot();
+ Snapshot deleteSnapshot = commit(table,
table.newDelete().deleteFile(firstDataFile), branch);
Assert.assertEquals("Should have 1 manifest", 1,
deleteSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
deleteSnapshot.allManifests(FILE_IO).get(0),
@@ -170,9 +178,9 @@ public class TestDeleteFiles extends TableTestBase {
// delete the second data file using a row filter
// the commit should succeed as there is only one live data file
- table.newDelete().deleteFromRowFilter(Expressions.lessThan("id",
7)).commit();
+ Snapshot finalSnapshot =
+ commit(table,
table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 7)), branch);
- Snapshot finalSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1,
finalSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
finalSnapshot.allManifests(FILE_IO).get(0),
@@ -184,13 +192,15 @@ public class TestDeleteFiles extends TableTestBase {
@Test
public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() {
// add both data files
- table
- .newFastAppend()
- .appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
- .appendFile(DATA_FILE_BUCKET_0_IDS_8_10)
- .commit();
+ Snapshot initialSnapshot =
+ commit(
+ table,
+ table
+ .newFastAppend()
+ .appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
+ .appendFile(DATA_FILE_BUCKET_0_IDS_8_10),
+ branch);
- Snapshot initialSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1,
initialSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
initialSnapshot.allManifests(FILE_IO).get(0),
@@ -199,9 +209,10 @@ public class TestDeleteFiles extends TableTestBase {
statuses(Status.ADDED, Status.ADDED));
// delete the second one using a metrics filter (no partition filter)
- table.newDelete().deleteFromRowFilter(Expressions.greaterThan("id",
5)).commit();
+ Snapshot deleteSnapshot =
+ commit(
+ table,
table.newDelete().deleteFromRowFilter(Expressions.greaterThan("id", 5)),
branch);
- Snapshot deleteSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1,
deleteSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
deleteSnapshot.allManifests(FILE_IO).get(0),
@@ -213,13 +224,15 @@ public class TestDeleteFiles extends TableTestBase {
@Test
public void testDeleteSomeFilesByRowFilterWithCombinedPredicates() {
// add both data files
- table
- .newFastAppend()
- .appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
- .appendFile(DATA_FILE_BUCKET_0_IDS_8_10)
- .commit();
+ Snapshot initialSnapshot =
+ commit(
+ table,
+ table
+ .newFastAppend()
+ .appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
+ .appendFile(DATA_FILE_BUCKET_0_IDS_8_10),
+ branch);
- Snapshot initialSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1,
initialSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
initialSnapshot.allManifests(FILE_IO).get(0),
@@ -231,9 +244,8 @@ public class TestDeleteFiles extends TableTestBase {
Expression partPredicate = Expressions.equal(Expressions.bucket("data",
16), 0);
Expression rowPredicate = Expressions.greaterThan("id", 5);
Expression predicate = Expressions.and(partPredicate, rowPredicate);
- table.newDelete().deleteFromRowFilter(predicate).commit();
-
- Snapshot deleteSnapshot = table.currentSnapshot();
+ Snapshot deleteSnapshot =
+ commit(table, table.newDelete().deleteFromRowFilter(predicate),
branch);
Assert.assertEquals("Should have 1 manifest", 1,
deleteSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
deleteSnapshot.allManifests(FILE_IO).get(0),
@@ -262,18 +274,22 @@ public class TestDeleteFiles extends TableTestBase {
.withPartitionPath("data_trunc_2=aa")
.build();
- table.newFastAppend().appendFile(dataFile).commit();
+ commit(table, table.newFastAppend().appendFile(dataFile), branch);
AssertHelpers.assertThrows(
"Should reject as not all rows match filter",
ValidationException.class,
"Cannot delete file where some, but not all, rows match filter",
- () -> table.newDelete().deleteFromRowFilter(Expressions.equal("data",
"aa")).commit());
+ () ->
+ commit(
+ table,
+
table.newDelete().deleteFromRowFilter(Expressions.equal("data", "aa")),
+ branch));
}
@Test
public void testDeleteCaseSensitivity() {
- table.newFastAppend().appendFile(DATA_FILE_BUCKET_0_IDS_0_2).commit();
+ commit(table,
table.newFastAppend().appendFile(DATA_FILE_BUCKET_0_IDS_0_2), branch);
Expression rowFilter = Expressions.lessThan("iD", 5);
@@ -281,17 +297,22 @@ public class TestDeleteFiles extends TableTestBase {
"Should use case sensitive binding by default",
ValidationException.class,
"Cannot find field 'iD'",
- () -> table.newDelete().deleteFromRowFilter(rowFilter).commit());
+ () -> commit(table, table.newDelete().deleteFromRowFilter(rowFilter),
branch));
AssertHelpers.assertThrows(
"Should fail with case sensitive binding",
ValidationException.class,
"Cannot find field 'iD'",
- () ->
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(true).commit());
+ () ->
+ commit(
+ table,
+
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(true),
+ branch));
-
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(false).commit();
+ Snapshot deleteSnapshot =
+ commit(
+ table,
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(false), branch);
- Snapshot deleteSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1,
deleteSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
deleteSnapshot.allManifests(FILE_IO).get(0),
@@ -300,6 +321,34 @@ public class TestDeleteFiles extends TableTestBase {
statuses(Status.DELETED));
}
+ @Test
+ public void testDeleteFilesOnIndependentBranches() {
+ String testBranch = "testBranch";
+
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
+ Snapshot initialSnapshot = table.currentSnapshot();
+ // Delete A on test branch
+ table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit();
+ Snapshot testBranchTip = table.snapshot(testBranch);
+
+ // Delete B and C on main
+ table.newDelete().deleteFile(FILE_B).deleteFile(FILE_C).commit();
+ Snapshot delete2 = table.currentSnapshot();
+
+ // Verify B and C on testBranch
+ validateManifestEntries(
+ Iterables.getOnlyElement(testBranchTip.allManifests(FILE_IO)),
+ ids(testBranchTip.snapshotId(), initialSnapshot.snapshotId(),
initialSnapshot.snapshotId()),
+ files(FILE_A, FILE_B, FILE_C),
+ statuses(Status.DELETED, Status.EXISTING, Status.EXISTING));
+
+ // Verify A on main
+ validateManifestEntries(
+ Iterables.getOnlyElement(delete2.allManifests(FILE_IO)),
+ ids(initialSnapshot.snapshotId(), delete2.snapshotId(),
delete2.snapshotId()),
+ files(FILE_A, FILE_B, FILE_C),
+ statuses(Status.EXISTING, Status.DELETED, Status.DELETED));
+ }
+
private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0,
value);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index ec71c856a3..8e9d4ab136 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -39,13 +39,21 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestMergeAppend extends TableTestBase {
- @Parameterized.Parameters(name = "formatVersion = {0}")
+ private final String branch;
+
+ @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
public static Object[] parameters() {
- return new Object[] {1, 2};
+ return new Object[][] {
+ new Object[] {1, "main"},
+ new Object[] {1, "testBranch"},
+ new Object[] {2, "main"},
+ new Object[] {2, "testBranch"}
+ };
}
- public TestMergeAppend(int formatVersion) {
+ public TestMergeAppend(int formatVersion, String branch) {
super(formatVersion);
+ this.branch = branch;
}
@Test
@@ -56,10 +64,10 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertNull("Should not have a current snapshot",
base.currentSnapshot());
Assert.assertEquals("Last sequence number should be 0", 0,
base.lastSequenceNumber());
- table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ Snapshot committedSnapshot =
+ commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B),
branch);
- Snapshot committedSnapshot = table.currentSnapshot();
- Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+ Assert.assertNotNull("Should create a snapshot", committedSnapshot);
V1Assert.assertEquals(
"Last sequence number should be 0", 0,
table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
@@ -89,10 +97,9 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Last sequence number should be 0", 0,
base.lastSequenceNumber());
ManifestFile manifest = writeManifest(FILE_A, FILE_B);
- table.newAppend().appendManifest(manifest).commit();
+ Snapshot committedSnapshot = commit(table,
table.newAppend().appendManifest(manifest), branch);
- Snapshot committedSnapshot = table.currentSnapshot();
- Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+ Assert.assertNotNull("Should create a snapshot", committedSnapshot);
V1Assert.assertEquals(
"Last sequence number should be 0", 0,
table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
@@ -126,10 +133,13 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Last sequence number should be 0", 0,
base.lastSequenceNumber());
ManifestFile manifest = writeManifest(FILE_A, FILE_B);
-
table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest).commit();
+ Snapshot committedSnapshot =
+ commit(
+ table,
+
table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest),
+ branch);
- Snapshot committedSnapshot = table.currentSnapshot();
- Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+ Assert.assertNotNull("Should create a snapshot", committedSnapshot);
V1Assert.assertEquals(
"Last sequence number should be 0", 0,
table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
@@ -164,23 +174,27 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertNull("Should not have a current snapshot",
base.currentSnapshot());
Assert.assertEquals("Last sequence number should be 0", 0,
base.lastSequenceNumber());
AtomicInteger scanThreadsIndex = new AtomicInteger(0);
- table
- .newAppend()
- .appendFile(FILE_A)
- .appendFile(FILE_B)
- .scanManifestsWith(
- Executors.newFixedThreadPool(
- 1,
- runnable -> {
- Thread thread = new Thread(runnable);
- thread.setName("scan-" + scanThreadsIndex.getAndIncrement());
- thread.setDaemon(
- true); // daemon threads will be terminated abruptly
when the JVM exits
- return thread;
- }))
- .commit();
+ Snapshot snapshot =
+ commit(
+ table,
+ table
+ .newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .scanManifestsWith(
+ Executors.newFixedThreadPool(
+ 1,
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("scan-" +
scanThreadsIndex.getAndIncrement());
+ thread.setDaemon(
+ true); // daemon threads will be terminated
abruptly when the JVM
+ // exits
+ return thread;
+ })),
+ branch);
Assert.assertTrue("Thread should be created in provided pool",
scanThreadsIndex.get() > 0);
- Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+ Assert.assertNotNull("Should create a snapshot", snapshot);
}
@Test
@@ -195,10 +209,13 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Last sequence number should be 0", 0,
base.lastSequenceNumber());
ManifestFile manifest = writeManifest(FILE_A, FILE_B);
-
table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest).commit();
+ Snapshot committedSnapshot =
+ commit(
+ table,
+
table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest),
+ branch);
- Snapshot committedSnapshot = table.currentSnapshot();
- Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+ Assert.assertNotNull("Should create a snapshot", committedSnapshot);
V1Assert.assertEquals(
"Last sequence number should be 0", 0,
table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
@@ -224,16 +241,16 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
Assert.assertEquals("Table should start empty", 0,
listManifestFiles().size());
- table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ Snapshot commitBefore =
+ commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B),
branch);
- Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+ Assert.assertNotNull("Should create a snapshot", commitBefore);
V1Assert.assertEquals(
"Last sequence number should be 0", 0,
table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 1", 1,
table.ops().current().lastSequenceNumber());
TableMetadata base = readMetadata();
- Snapshot commitBefore = table.currentSnapshot();
long baseId = commitBefore.snapshotId();
validateSnapshot(null, commitBefore, 1, FILE_A, FILE_B);
@@ -241,7 +258,7 @@ public class TestMergeAppend extends TableTestBase {
"Should create 1 manifest for initial write",
1,
commitBefore.allManifests(table.io()).size());
- ManifestFile initialManifest =
base.currentSnapshot().allManifests(table.io()).get(0);
+ ManifestFile initialManifest =
commitBefore.allManifests(table.io()).get(0);
validateManifest(
initialManifest,
seqs(1, 1),
@@ -249,14 +266,13 @@ public class TestMergeAppend extends TableTestBase {
files(FILE_A, FILE_B),
statuses(Status.ADDED, Status.ADDED));
- table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+ Snapshot committedAfter =
+ commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D),
branch);
V1Assert.assertEquals(
"Last sequence number should be 0", 0,
table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 2", 2,
table.ops().current().lastSequenceNumber());
- Snapshot committedAfter = table.currentSnapshot();
-
Assert.assertEquals(
"Should contain 1 merged manifest for second write",
1,
@@ -294,14 +310,16 @@ public class TestMergeAppend extends TableTestBase {
ManifestFile manifest = writeManifest(FILE_A);
ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
- table
- .newAppend()
- .appendManifest(manifest)
- .appendManifest(manifest2)
- .appendManifest(manifest3)
- .commit();
+ Snapshot snap1 =
+ commit(
+ table,
+ table
+ .newAppend()
+ .appendManifest(manifest)
+ .appendManifest(manifest2)
+ .appendManifest(manifest3),
+ branch);
- Snapshot snap1 = table.currentSnapshot();
long commitId1 = snap1.snapshotId();
base = readMetadata();
V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
@@ -312,7 +330,7 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals(
"Should contain 2 merged manifest for first write",
2,
- readMetadata().currentSnapshot().allManifests(table.io()).size());
+ snap1.allManifests(table.io()).size());
validateManifest(
snap1.allManifests(table.io()).get(0),
seqs(1),
@@ -326,13 +344,15 @@ public class TestMergeAppend extends TableTestBase {
files(FILE_C, FILE_D),
statuses(Status.ADDED, Status.ADDED));
- table
- .newAppend()
- .appendManifest(manifest)
- .appendManifest(manifest2)
- .appendManifest(manifest3)
- .commit();
- Snapshot snap2 = table.currentSnapshot();
+ Snapshot snap2 =
+ commit(
+ table,
+ table
+ .newAppend()
+ .appendManifest(manifest)
+ .appendManifest(manifest2)
+ .appendManifest(manifest3),
+ branch);
long commitId2 = snap2.snapshotId();
base = readMetadata();
V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
@@ -343,7 +363,7 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals(
"Should contain 3 merged manifest for second write",
3,
- readMetadata().currentSnapshot().allManifests(table.io()).size());
+ snap2.allManifests(table.io()).size());
validateManifest(
snap2.allManifests(table.io()).get(0),
seqs(2),
@@ -367,14 +387,13 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals(
"Summary metadata should include 3 added files",
"3",
- readMetadata().currentSnapshot().summary().get("added-data-files"));
+ snap2.summary().get("added-data-files"));
}
@Test
public void testManifestsMergeIntoOne() throws IOException {
Assert.assertEquals("Table should start empty", 0,
listManifestFiles().size());
- table.newAppend().appendFile(FILE_A).commit();
- Snapshot snap1 = table.currentSnapshot();
+ Snapshot snap1 = commit(table, table.newAppend().appendFile(FILE_A),
branch);
TableMetadata base = readMetadata();
V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
V2Assert.assertEquals("Last sequence number should be 1", 1,
base.lastSequenceNumber());
@@ -390,8 +409,7 @@ public class TestMergeAppend extends TableTestBase {
files(FILE_A),
statuses(Status.ADDED));
- table.newAppend().appendFile(FILE_B).commit();
- Snapshot snap2 = table.currentSnapshot();
+ Snapshot snap2 = commit(table, table.newAppend().appendFile(FILE_B),
branch);
long commitId2 = snap2.snapshotId();
base = readMetadata();
V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
@@ -413,12 +431,15 @@ public class TestMergeAppend extends TableTestBase {
files(FILE_A),
statuses(Status.ADDED));
- table
- .newAppend()
- .appendManifest(
- writeManifest("input-m0.avro",
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C)))
- .commit();
- Snapshot snap3 = table.currentSnapshot();
+ Snapshot snap3 =
+ commit(
+ table,
+ table
+ .newAppend()
+ .appendManifest(
+ writeManifest(
+ "input-m0.avro",
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))),
+ branch);
base = readMetadata();
V2Assert.assertEquals("Snapshot sequence number should be 3", 3,
snap3.sequenceNumber());
@@ -449,12 +470,15 @@ public class TestMergeAppend extends TableTestBase {
table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT,
"1").commit();
- table
- .newAppend()
- .appendManifest(
- writeManifest("input-m1.avro",
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D)))
- .commit();
- Snapshot snap4 = table.currentSnapshot();
+ Snapshot snap4 =
+ commit(
+ table,
+ table
+ .newAppend()
+ .appendManifest(
+ writeManifest(
+ "input-m1.avro",
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))),
+ branch);
base = readMetadata();
V2Assert.assertEquals("Snapshot sequence number should be 4", 4,
snap4.sequenceNumber());
@@ -485,27 +509,28 @@ public class TestMergeAppend extends TableTestBase {
ManifestFile manifest = writeManifest(FILE_A, FILE_B);
ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
- table
- .newAppend()
- .appendManifest(manifest)
- .appendManifest(manifest2)
- .appendManifest(manifest3)
- .commit();
-
- Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+ Snapshot committed =
+ commit(
+ table,
+ table
+ .newAppend()
+ .appendManifest(manifest)
+ .appendManifest(manifest2)
+ .appendManifest(manifest3),
+ branch);
+
+ Assert.assertNotNull("Should create a snapshot", committed);
V1Assert.assertEquals(
"Last sequence number should be 0", 0,
table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 1", 1,
table.ops().current().lastSequenceNumber());
- Snapshot committed = table.currentSnapshot();
-
Assert.assertEquals(
"Should contain 3 merged manifest after 1st write write",
3,
committed.allManifests(table.io()).size());
- long snapshotId = table.currentSnapshot().snapshotId();
+ long snapshotId = committed.snapshotId();
validateManifest(
committed.allManifests(table.io()).get(0),
@@ -541,18 +566,15 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Table should start empty", 0,
listManifestFiles().size());
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
- table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ Snapshot snap = commit(table,
table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch);
- Snapshot snap = table.currentSnapshot();
validateSnapshot(null, snap, 1, FILE_A, FILE_B);
TableMetadata base = readMetadata();
- long baseId = base.currentSnapshot().snapshotId();
+ long baseId = snap.snapshotId();
Assert.assertEquals(
- "Should create 1 manifest for initial write",
- 1,
- base.currentSnapshot().allManifests(table.io()).size());
- ManifestFile initialManifest =
base.currentSnapshot().allManifests(table.io()).get(0);
+ "Should create 1 manifest for initial write", 1,
snap.allManifests(table.io()).size());
+ ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
validateManifest(
initialManifest,
seqs(1, 1),
@@ -560,9 +582,8 @@ public class TestMergeAppend extends TableTestBase {
files(FILE_A, FILE_B),
statuses(Status.ADDED, Status.ADDED));
- table.newDelete().deleteFile(FILE_A).commit();
+ Snapshot deleteSnapshot = commit(table,
table.newDelete().deleteFile(FILE_A), branch);
- Snapshot deleteSnapshot = table.currentSnapshot();
V2Assert.assertEquals(
"Snapshot sequence number should be 2", 2,
deleteSnapshot.sequenceNumber());
V2Assert.assertEquals(
@@ -571,12 +592,12 @@ public class TestMergeAppend extends TableTestBase {
"Table should end with last-sequence-number 0", 0,
readMetadata().lastSequenceNumber());
TableMetadata delete = readMetadata();
- long deleteId = delete.currentSnapshot().snapshotId();
+ long deleteId = latestSnapshot(table, branch).snapshotId();
Assert.assertEquals(
"Should create 1 filtered manifest for delete",
1,
- delete.currentSnapshot().allManifests(table.io()).size());
- ManifestFile deleteManifest =
delete.currentSnapshot().allManifests(table.io()).get(0);
+ latestSnapshot(table, branch).allManifests(table.io()).size());
+ ManifestFile deleteManifest =
deleteSnapshot.allManifests(table.io()).get(0);
validateManifest(
deleteManifest,
@@ -585,9 +606,9 @@ public class TestMergeAppend extends TableTestBase {
files(FILE_A, FILE_B),
statuses(Status.DELETED, Status.EXISTING));
- table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+ Snapshot committedSnapshot =
+ commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D),
branch);
- Snapshot committedSnapshot = table.currentSnapshot();
V2Assert.assertEquals(
"Snapshot sequence number should be 3", 3,
committedSnapshot.sequenceNumber());
V2Assert.assertEquals(
@@ -621,23 +642,18 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
Assert.assertEquals("Table should start empty", 0,
listManifestFiles().size());
- table.newFastAppend().appendFile(FILE_A).commit();
- Snapshot snap1 = table.currentSnapshot();
+ Snapshot snap1 = commit(table, table.newFastAppend().appendFile(FILE_A),
branch);
long idFileA = snap1.snapshotId();
validateSnapshot(null, snap1, 1, FILE_A);
- table.newFastAppend().appendFile(FILE_B).commit();
- Snapshot snap2 = table.currentSnapshot();
+ Snapshot snap2 = commit(table, table.newFastAppend().appendFile(FILE_B),
branch);
long idFileB = snap2.snapshotId();
validateSnapshot(snap1, snap2, 2, FILE_B);
Assert.assertEquals(
- "Should have 2 manifests from setup writes",
- 2,
- readMetadata().currentSnapshot().allManifests(table.io()).size());
+ "Should have 2 manifests from setup writes", 2,
snap2.allManifests(table.io()).size());
- table.newAppend().appendFile(FILE_C).commit();
- Snapshot snap3 = table.currentSnapshot();
+ Snapshot snap3 = commit(table, table.newAppend().appendFile(FILE_C),
branch);
long idFileC = snap3.snapshotId();
validateSnapshot(snap2, snap3, 3, FILE_C);
@@ -645,11 +661,11 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals(
"Should have 3 unmerged manifests",
3,
- base.currentSnapshot().allManifests(table.io()).size());
- Set<ManifestFile> unmerged =
Sets.newHashSet(base.currentSnapshot().allManifests(table.io()));
+ latestSnapshot(table, branch).allManifests(table.io()).size());
+ Set<ManifestFile> unmerged =
+ Sets.newHashSet(latestSnapshot(table,
branch).allManifests(table.io()));
- table.newAppend().appendFile(FILE_D).commit();
- Snapshot committed = table.currentSnapshot();
+ Snapshot committed = commit(table, table.newAppend().appendFile(FILE_D),
branch);
V2Assert.assertEquals("Snapshot sequence number should be 4", 4,
committed.sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 4", 4,
readMetadata().lastSequenceNumber());
@@ -681,18 +697,15 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
Assert.assertEquals("Table should start empty", 0,
listManifestFiles().size());
- table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ Snapshot snap = commit(table,
table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch);
- Snapshot snap = table.currentSnapshot();
validateSnapshot(null, snap, 1, FILE_A, FILE_B);
TableMetadata base = readMetadata();
- long baseId = base.currentSnapshot().snapshotId();
+ long baseId = snap.snapshotId();
Assert.assertEquals(
- "Should create 1 manifest for initial write",
- 1,
- base.currentSnapshot().allManifests(table.io()).size());
- ManifestFile initialManifest =
base.currentSnapshot().allManifests(table.io()).get(0);
+ "Should create 1 manifest for initial write", 1,
snap.allManifests(table.io()).size());
+ ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
validateManifest(
initialManifest,
seqs(1, 1),
@@ -700,8 +713,8 @@ public class TestMergeAppend extends TableTestBase {
files(FILE_A, FILE_B),
statuses(Status.ADDED, Status.ADDED));
- table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
- Snapshot committed = table.currentSnapshot();
+ Snapshot committed =
+ commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D),
branch);
V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
committed.sequenceNumber());
V2Assert.assertEquals(
@@ -735,18 +748,15 @@ public class TestMergeAppend extends TableTestBase {
@Test
public void testChangedPartitionSpec() {
- table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ Snapshot snap = commit(table,
table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch);
- Snapshot snap = table.currentSnapshot();
long commitId = snap.snapshotId();
validateSnapshot(null, snap, 1, FILE_A, FILE_B);
TableMetadata base = readMetadata();
Assert.assertEquals(
- "Should create 1 manifest for initial write",
- 1,
- base.currentSnapshot().allManifests(table.io()).size());
- ManifestFile initialManifest =
base.currentSnapshot().allManifests(table.io()).get(0);
+ "Should create 1 manifest for initial write", 1,
snap.allManifests(table.io()).size());
+ ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
validateManifest(
initialManifest,
seqs(1, 1),
@@ -760,7 +770,7 @@ public class TestMergeAppend extends TableTestBase {
// commit the new partition spec to the table manually
table.ops().commit(base, base.updatePartitionSpec(newSpec));
- Snapshot snap2 = table.currentSnapshot();
+ Snapshot snap2 = latestSnapshot(table, branch);
V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap2.sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
@@ -775,9 +785,8 @@ public class TestMergeAppend extends TableTestBase {
.withRecordCount(1)
.build();
- table.newAppend().appendFile(newFileY).commit();
+ Snapshot lastSnapshot = commit(table,
table.newAppend().appendFile(newFileY), branch);
- Snapshot lastSnapshot = table.currentSnapshot();
V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
lastSnapshot.sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
@@ -803,23 +812,20 @@ public class TestMergeAppend extends TableTestBase {
@Test
public void testChangedPartitionSpecMergeExisting() {
- table.newAppend().appendFile(FILE_A).commit();
+ Snapshot snap1 = commit(table, table.newAppend().appendFile(FILE_A),
branch);
- Snapshot snap1 = table.currentSnapshot();
long id1 = snap1.snapshotId();
validateSnapshot(null, snap1, 1, FILE_A);
// create a second compatible manifest
- table.newFastAppend().appendFile(FILE_B).commit();
+ Snapshot snap2 = commit(table, table.newFastAppend().appendFile(FILE_B),
branch);
- Snapshot snap2 = table.currentSnapshot();
long id2 = snap2.snapshotId();
validateSnapshot(snap1, snap2, 2, FILE_B);
TableMetadata base = readMetadata();
- Assert.assertEquals(
- "Should contain 2 manifests", 2,
base.currentSnapshot().allManifests(table.io()).size());
- ManifestFile manifest =
base.currentSnapshot().allManifests(table.io()).get(0);
+ Assert.assertEquals("Should contain 2 manifests", 2,
snap2.allManifests(table.io()).size());
+ ManifestFile manifest = snap2.allManifests(table.io()).get(0);
// build the new spec using the table's schema, which uses fresh IDs
PartitionSpec newSpec =
@@ -827,7 +833,7 @@ public class TestMergeAppend extends TableTestBase {
// commit the new partition spec to the table manually
table.ops().commit(base, base.updatePartitionSpec(newSpec));
- Snapshot snap3 = table.currentSnapshot();
+ Snapshot snap3 = latestSnapshot(table, branch);
V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap3.sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
@@ -842,8 +848,7 @@ public class TestMergeAppend extends TableTestBase {
.withRecordCount(1)
.build();
- table.newAppend().appendFile(newFileY).commit();
- Snapshot lastSnapshot = table.currentSnapshot();
+ Snapshot lastSnapshot = commit(table,
table.newAppend().appendFile(newFileY), branch);
V2Assert.assertEquals("Snapshot sequence number should be 3", 3,
lastSnapshot.sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 3", 3,
readMetadata().lastSequenceNumber());
@@ -876,21 +881,21 @@ public class TestMergeAppend extends TableTestBase {
table.updateProperties().set("commit.manifest.min-count-to-merge",
"1").commit();
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
- table.newAppend().appendFile(FILE_A).commit();
+ Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A),
branch);
TableMetadata base = readMetadata();
- long baseId = base.currentSnapshot().snapshotId();
+ long baseId = snap.snapshotId();
V2Assert.assertEquals("Last sequence number should be 1", 1,
base.lastSequenceNumber());
V1Assert.assertEquals(
"Table should end with last-sequence-number 0", 0,
base.lastSequenceNumber());
- ManifestFile initialManifest =
base.currentSnapshot().allManifests(table.io()).get(0);
+ ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
validateManifest(initialManifest, seqs(1), ids(baseId), files(FILE_A),
statuses(Status.ADDED));
table.ops().failCommits(5);
AppendFiles append = table.newAppend().appendFile(FILE_B);
- Snapshot pending = append.apply();
+ Snapshot pending = apply(append, branch);
Assert.assertEquals("Should merge to 1 manifest", 1,
pending.allManifests(table.io()).size());
ManifestFile newManifest = pending.allManifests(table.io()).get(0);
@@ -905,7 +910,7 @@ public class TestMergeAppend extends TableTestBase {
"Should retry 4 times and throw last failure",
CommitFailedException.class,
"Injected failure",
- append::commit);
+ () -> commit(table, append, branch));
V2Assert.assertEquals(
"Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
@@ -914,10 +919,10 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals(
"Should only contain 1 manifest file",
1,
- table.currentSnapshot().allManifests(table.io()).size());
+ latestSnapshot(table, branch).allManifests(table.io()).size());
validateManifest(
- table.currentSnapshot().allManifests(table.io()).get(0),
+ latestSnapshot(table, branch).allManifests(table.io()).get(0),
seqs(1),
ids(baseId),
files(initialManifest),
@@ -934,7 +939,7 @@ public class TestMergeAppend extends TableTestBase {
ManifestFile manifest = writeManifest(FILE_A, FILE_B);
AppendFiles append = table.newAppend().appendManifest(manifest);
- Snapshot pending = append.apply();
+ Snapshot pending = apply(append, branch);
ManifestFile newManifest = pending.allManifests(table.io()).get(0);
Assert.assertTrue("Should create new manifest", new
File(newManifest.path()).exists());
@@ -942,7 +947,7 @@ public class TestMergeAppend extends TableTestBase {
"Should retry 4 times and throw last failure",
CommitFailedException.class,
"Injected failure",
- append::commit);
+ () -> commit(table, append, branch));
V2Assert.assertEquals(
"Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
V1Assert.assertEquals(
@@ -958,21 +963,21 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
- table.newAppend().appendFile(FILE_A).commit();
+ Snapshot current = commit(table, table.newAppend().appendFile(FILE_A),
branch);
TableMetadata base = readMetadata();
- long baseId = base.currentSnapshot().snapshotId();
+ long baseId = current.snapshotId();
V2Assert.assertEquals(
"Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
V1Assert.assertEquals(
"Table should end with last-sequence-number 0", 0,
readMetadata().lastSequenceNumber());
- ManifestFile initialManifest =
base.currentSnapshot().allManifests(table.io()).get(0);
+ ManifestFile initialManifest = current.allManifests(table.io()).get(0);
validateManifest(initialManifest, seqs(1), ids(baseId), files(FILE_A),
statuses(Status.ADDED));
table.ops().failCommits(3);
AppendFiles append = table.newAppend().appendFile(FILE_B);
- Snapshot pending = append.apply();
+ Snapshot pending = apply(append, branch);
Assert.assertEquals("Should merge to 1 manifest", 1,
pending.allManifests(table.io()).size());
ManifestFile newManifest = pending.allManifests(table.io()).get(0);
@@ -984,17 +989,15 @@ public class TestMergeAppend extends TableTestBase {
concat(files(FILE_B), files(initialManifest)));
V2Assert.assertEquals(
- "Snapshot sequence number should be 1", 1,
table.currentSnapshot().sequenceNumber());
+ "Snapshot sequence number should be 1", 1, latestSnapshot(table,
branch).sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
V1Assert.assertEquals(
"Table should end with last-sequence-number 0", 0,
readMetadata().lastSequenceNumber());
- append.commit();
- Snapshot snapshot = table.currentSnapshot();
+ Snapshot snapshot = commit(table, append, branch);
long snapshotId = snapshot.snapshotId();
- V2Assert.assertEquals(
- "Snapshot sequence number should be 2", 2,
table.currentSnapshot().sequenceNumber());
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snapshot.sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
V1Assert.assertEquals(
@@ -1005,12 +1008,10 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals(
"Should commit the same new manifest during retry",
Lists.newArrayList(newManifest),
- metadata.currentSnapshot().allManifests(table.io()));
+ snapshot.allManifests(table.io()));
Assert.assertEquals(
- "Should only contain 1 merged manifest file",
- 1,
- table.currentSnapshot().allManifests(table.io()).size());
+ "Should only contain 1 merged manifest file", 1,
snapshot.allManifests(table.io()).size());
ManifestFile manifestFile = snapshot.allManifests(table.io()).get(0);
validateManifest(
manifestFile,
@@ -1031,13 +1032,12 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertNull("Should not have a current snapshot",
base.currentSnapshot());
ManifestFile manifest = writeManifest(FILE_A, FILE_B);
- table.newAppend().appendManifest(manifest).commit();
+ Snapshot snapshot = commit(table,
table.newAppend().appendManifest(manifest), branch);
- Snapshot snapshot = table.currentSnapshot();
long snapshotId = snapshot.snapshotId();
validateSnapshot(null, snapshot, 1, FILE_A, FILE_B);
- List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
+ List<ManifestFile> manifests = snapshot.allManifests(table.io());
Assert.assertEquals("Should have 1 committed manifest", 1,
manifests.size());
ManifestFile manifestFile = snapshot.allManifests(table.io()).get(0);
validateManifest(
@@ -1079,15 +1079,14 @@ public class TestMergeAppend extends TableTestBase {
table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT,
"1").commit();
ManifestFile manifest1 = writeManifestWithName("manifest-file-1.avro",
FILE_A, FILE_B);
- table.newAppend().appendManifest(manifest1).commit();
+ Snapshot snap1 = commit(table,
table.newAppend().appendManifest(manifest1), branch);
- Snapshot snap1 = table.currentSnapshot();
long commitId1 = snap1.snapshotId();
validateSnapshot(null, snap1, 1, FILE_A, FILE_B);
Assert.assertEquals("Should have only 1 manifest", 1,
snap1.allManifests(table.io()).size());
validateManifest(
- table.currentSnapshot().allManifests(table.io()).get(0),
+ snap1.allManifests(table.io()).get(0),
seqs(1, 1),
ids(commitId1, commitId1),
files(FILE_A, FILE_B),
@@ -1096,12 +1095,10 @@ public class TestMergeAppend extends TableTestBase {
"Unmerged append manifest should not be deleted", new
File(manifest1.path()).exists());
ManifestFile manifest2 = writeManifestWithName("manifest-file-2.avro",
FILE_C, FILE_D);
- table.newAppend().appendManifest(manifest2).commit();
+ Snapshot snap2 = commit(table,
table.newAppend().appendManifest(manifest2), branch);
- Snapshot snap2 = table.currentSnapshot();
long commitId2 = snap2.snapshotId();
- V2Assert.assertEquals(
- "Snapshot sequence number should be 2", 2,
table.currentSnapshot().sequenceNumber());
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
V1Assert.assertEquals(
@@ -1110,7 +1107,7 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals(
"Manifests should be merged into 1", 1,
snap2.allManifests(table.io()).size());
validateManifest(
- table.currentSnapshot().allManifests(table.io()).get(0),
+ latestSnapshot(table, branch).allManifests(table.io()).get(0),
seqs(2, 2, 1, 1),
ids(commitId2, commitId2, commitId1, commitId1),
files(FILE_C, FILE_D, FILE_A, FILE_B),
@@ -1140,7 +1137,10 @@ public class TestMergeAppend extends TableTestBase {
append.appendManifest(manifest);
AssertHelpers.assertThrows(
- "Should reject commit", CommitFailedException.class, "Injected
failure", append::commit);
+ "Should reject commit",
+ CommitFailedException.class,
+ "Injected failure",
+ () -> commit(table, append, branch));
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
Assert.assertTrue("Append manifest should not be deleted", new
File(manifest.path()).exists());
@@ -1159,7 +1159,7 @@ public class TestMergeAppend extends TableTestBase {
"Should reject commit",
IllegalArgumentException.class,
"Cannot append manifest with existing files",
- () ->
table.newAppend().appendManifest(manifestWithExistingFiles).commit());
+ () -> commit(table,
table.newAppend().appendManifest(manifestWithExistingFiles), branch));
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
ManifestFile manifestWithDeletedFiles =
@@ -1168,7 +1168,7 @@ public class TestMergeAppend extends TableTestBase {
"Should reject commit",
IllegalArgumentException.class,
"Cannot append manifest with deleted files",
- () ->
table.newAppend().appendManifest(manifestWithDeletedFiles).commit());
+ () -> commit(table,
table.newAppend().appendManifest(manifestWithDeletedFiles), branch));
Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
}
@@ -1217,18 +1217,15 @@ public class TestMergeAppend extends TableTestBase {
@Test
public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() {
- table.newAppend().appendFile(FILE_A).commit();
+ Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A),
branch);
- Snapshot snap = table.currentSnapshot();
long commitId = snap.snapshotId();
validateSnapshot(null, snap, 1, FILE_A);
TableMetadata base = readMetadata();
Assert.assertEquals(
- "Should create 1 manifest for initial write",
- 1,
- base.currentSnapshot().allManifests(table.io()).size());
- ManifestFile initialManifest =
base.currentSnapshot().allManifests(table.io()).get(0);
+ "Should create 1 manifest for initial write", 1,
snap.allManifests(table.io()).size());
+ ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
validateManifest(
initialManifest, seqs(1), ids(commitId), files(FILE_A),
statuses(Status.ADDED));
@@ -1252,8 +1249,7 @@ public class TestMergeAppend extends TableTestBase {
.withRecordCount(1)
.build();
- table.newAppend().appendFile(newFile).commit();
- Snapshot committedSnapshot = table.currentSnapshot();
+ Snapshot committedSnapshot = commit(table,
table.newAppend().appendFile(newFile), branch);
V2Assert.assertEquals(
"Snapshot sequence number should be 2", 2,
committedSnapshot.sequenceNumber());
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index e8fb5e2658..53e5af520d 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -1150,10 +1151,7 @@ public class TestRemoveSnapshots extends TableTestBase {
.add(firstSnapshot.manifestListLocation())
.add(secondSnapshot.manifestListLocation())
.add(thirdSnapshot.manifestListLocation())
- .addAll(
- secondSnapshot.allManifests(FILE_IO).stream()
- .map(ManifestFile::path)
- .collect(Collectors.toList()))
+ .addAll(manifestPaths(secondSnapshot, table.io()))
.addAll(
manifestOfDeletedFiles.stream()
.map(ManifestFile::path)
@@ -1455,6 +1453,64 @@ public class TestRemoveSnapshots extends TableTestBase {
Assert.assertNull(table.ops().current().snapshot(initialSnapshotId));
}
+ @Test
+ public void testRetainFilesOnRetainedBranches() {
+ // Append a file to main and test branch
+ String testBranch = "test-branch";
+ table.newAppend().appendFile(FILE_A).commit();
+ Snapshot appendA = table.currentSnapshot();
+ table.manageSnapshots().createBranch(testBranch,
appendA.snapshotId()).commit();
+
+ // Delete A from main
+ table.newDelete().deleteFile(FILE_A).commit();
+ Snapshot deletionA = table.currentSnapshot();
+ // Add B to main
+ table.newAppend().appendFile(FILE_B).commit();
+ long tAfterCommits =
waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ Set<String> expectedDeletes = Sets.newHashSet();
+
+ // Only deletionA's manifest list and manifests should be removed
+ expectedDeletes.add(deletionA.manifestListLocation());
+ expectedDeletes.addAll(manifestPaths(deletionA, table.io()));
+
table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit();
+
+ Assert.assertEquals(2, Iterables.size(table.snapshots()));
+ Assert.assertEquals(expectedDeletes, deletedFiles);
+
+ // Delete A on test branch
+ table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit();
+ Snapshot branchDelete = table.snapshot(testBranch);
+
+ // Append C on test branch
+ table.newAppend().appendFile(FILE_C).toBranch(testBranch).commit();
+ Snapshot testBranchHead = table.snapshot(testBranch);
+
+ deletedFiles = Sets.newHashSet();
+ expectedDeletes = Sets.newHashSet();
+
+ waitUntilAfter(testBranchHead.timestampMillis());
+ table
+ .expireSnapshots()
+ .expireOlderThan(testBranchHead.timestampMillis())
+ .deleteWith(deletedFiles::add)
+ .commit();
+
+ expectedDeletes.add(appendA.manifestListLocation());
+ expectedDeletes.addAll(manifestPaths(appendA, table.io()));
+ expectedDeletes.add(branchDelete.manifestListLocation());
+ expectedDeletes.addAll(manifestPaths(branchDelete, table.io()));
+ expectedDeletes.add(FILE_A.path().toString());
+
+ Assert.assertEquals(2, Iterables.size(table.snapshots()));
+ Assert.assertEquals(expectedDeletes, deletedFiles);
+ }
+
+ private Set<String> manifestPaths(Snapshot snapshot, FileIO io) {
+ return
snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet());
+ }
+
private RemoveSnapshots removeSnapshots(Table table) {
RemoveSnapshots removeSnapshots = (RemoveSnapshots)
table.expireSnapshots();
return (RemoveSnapshots)
removeSnapshots.withIncrementalCleanup(incrementalCleanup);