This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 88a6e4edb1 Core: Support performing delete files and merge appends on 
branches (#5618)
88a6e4edb1 is described below

commit 88a6e4edb1f9649edb806407b535f78ae8c259e2
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Oct 17 18:03:51 2022 -0700

    Core: Support performing delete files and merge appends on branches (#5618)
---
 .../main/java/org/apache/iceberg/MergeAppend.java  |   6 +
 .../java/org/apache/iceberg/StreamingDelete.java   |   6 +
 .../java/org/apache/iceberg/TableTestBase.java     |  39 +++
 .../java/org/apache/iceberg/TestDeleteFiles.java   | 139 +++++---
 .../java/org/apache/iceberg/TestMergeAppend.java   | 358 ++++++++++-----------
 .../org/apache/iceberg/TestRemoveSnapshots.java    |  64 +++-
 6 files changed, 382 insertions(+), 230 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java 
b/core/src/main/java/org/apache/iceberg/MergeAppend.java
index 1781e95e9d..3ef553ba78 100644
--- a/core/src/main/java/org/apache/iceberg/MergeAppend.java
+++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java
@@ -48,6 +48,12 @@ class MergeAppend extends 
MergingSnapshotProducer<AppendFiles> implements Append
     return this;
   }
 
+  @Override
+  public MergeAppend toBranch(String branch) {
+    targetBranch(branch);
+    return this;
+  }
+
   @Override
   public AppendFiles appendManifest(ManifestFile manifest) {
     Preconditions.checkArgument(
diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java 
b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
index 493c4e44c8..8ff7bb831e 100644
--- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
+++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
@@ -59,4 +59,10 @@ public class StreamingDelete extends 
MergingSnapshotProducer<DeleteFiles> implem
     deleteByRowFilter(expr);
     return this;
   }
+
+  @Override
+  public StreamingDelete toBranch(String branch) {
+    targetBranch(branch);
+    return this;
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java 
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index ffe909e9bd..65461b465e 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -349,6 +349,45 @@ public class TableTestBase {
     validateSnapshot(old, snap, (Long) sequenceNumber, newFiles);
   }
 
+  @SuppressWarnings("checkstyle:HiddenField")
+  Snapshot commit(Table table, SnapshotUpdate snapshotUpdate, String branch) {
+    Snapshot snapshot;
+    if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+      snapshotUpdate.commit();
+      snapshot = table.currentSnapshot();
+    } else {
+      ((SnapshotProducer) snapshotUpdate.toBranch(branch)).commit();
+      snapshot = table.snapshot(branch);
+    }
+
+    return snapshot;
+  }
+
+  Snapshot apply(SnapshotUpdate snapshotUpdate, String branch) {
+    if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+      return ((SnapshotProducer) snapshotUpdate).apply();
+    } else {
+      return ((SnapshotProducer) snapshotUpdate.toBranch(branch)).apply();
+    }
+  }
+
+  @SuppressWarnings("checkstyle:HiddenField")
+  Snapshot latestSnapshot(Table table, String branch) {
+    if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+      return table.currentSnapshot();
+    }
+
+    return table.snapshot(branch);
+  }
+
+  Snapshot latestSnapshot(TableMetadata metadata, String branch) {
+    if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+      return metadata.currentSnapshot();
+    }
+
+    return metadata.snapshot(metadata.ref(branch).snapshotId());
+  }
+
   void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, 
DataFile... newFiles) {
     Assert.assertEquals(
         "Should not change delete manifests",
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java 
b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
index 58d4352626..1ee3b663bc 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
@@ -68,38 +69,44 @@ public class TestDeleteFiles extends TableTestBase {
                   ))
           .build();
 
-  @Parameterized.Parameters(name = "formatVersion = {0}")
+  private final String branch;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
   public static Object[] parameters() {
-    return new Object[] {1, 2};
+    return new Object[][] {
+      new Object[] {1, "main"},
+      new Object[] {1, "testBranch"},
+      new Object[] {2, "main"},
+      new Object[] {2, "testBranch"}
+    };
   }
 
-  public TestDeleteFiles(int formatVersion) {
+  public TestDeleteFiles(int formatVersion, String branch) {
     super(formatVersion);
+    this.branch = branch;
   }
 
   @Test
   public void testMultipleDeletes() {
-    
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
-
+    commit(
+        table, 
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C), 
branch);
+    Snapshot append = latestSnapshot(readMetadata(), branch);
     Assert.assertEquals("Metadata should be at version 1", 1L, (long) 
version());
-    Snapshot append = readMetadata().currentSnapshot();
     validateSnapshot(null, append, FILE_A, FILE_B, FILE_C);
 
-    table.newDelete().deleteFile(FILE_A).commit();
+    commit(table, table.newDelete().deleteFile(FILE_A), branch);
+    Snapshot delete1 = latestSnapshot(readMetadata(), branch);
 
     Assert.assertEquals("Metadata should be at version 2", 2L, (long) 
version());
-    Snapshot delete = readMetadata().currentSnapshot();
-    Assert.assertEquals("Should have 1 manifest", 1, 
delete.allManifests(FILE_IO).size());
+    Assert.assertEquals("Should have 1 manifest", 1, 
delete1.allManifests(FILE_IO).size());
     validateManifestEntries(
-        delete.allManifests(table.io()).get(0),
-        ids(delete.snapshotId(), append.snapshotId(), append.snapshotId()),
+        delete1.allManifests(table.io()).get(0),
+        ids(delete1.snapshotId(), append.snapshotId(), append.snapshotId()),
         files(FILE_A, FILE_B, FILE_C),
         statuses(Status.DELETED, Status.EXISTING, Status.EXISTING));
 
-    table.newDelete().deleteFile(FILE_B).commit();
-
+    Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B), 
branch);
     Assert.assertEquals("Metadata should be at version 3", 3L, (long) 
version());
-    Snapshot delete2 = readMetadata().currentSnapshot();
     Assert.assertEquals("Should have 1 manifest", 1, 
delete2.allManifests(FILE_IO).size());
     validateManifestEntries(
         delete2.allManifests(FILE_IO).get(0),
@@ -147,9 +154,12 @@ public class TestDeleteFiles extends TableTestBase {
             .build();
 
     // add both data files
-    
table.newFastAppend().appendFile(firstDataFile).appendFile(secondDataFile).commit();
+    Snapshot initialSnapshot =
+        commit(
+            table,
+            
table.newFastAppend().appendFile(firstDataFile).appendFile(secondDataFile),
+            branch);
 
-    Snapshot initialSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should have 1 manifest", 1, 
initialSnapshot.allManifests(FILE_IO).size());
     validateManifestEntries(
         initialSnapshot.allManifests(FILE_IO).get(0),
@@ -158,9 +168,7 @@ public class TestDeleteFiles extends TableTestBase {
         statuses(Status.ADDED, Status.ADDED));
 
     // delete the first data file
-    table.newDelete().deleteFile(firstDataFile).commit();
-
-    Snapshot deleteSnapshot = table.currentSnapshot();
+    Snapshot deleteSnapshot = commit(table, 
table.newDelete().deleteFile(firstDataFile), branch);
     Assert.assertEquals("Should have 1 manifest", 1, 
deleteSnapshot.allManifests(FILE_IO).size());
     validateManifestEntries(
         deleteSnapshot.allManifests(FILE_IO).get(0),
@@ -170,9 +178,9 @@ public class TestDeleteFiles extends TableTestBase {
 
     // delete the second data file using a row filter
     // the commit should succeed as there is only one live data file
-    table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 
7)).commit();
+    Snapshot finalSnapshot =
+        commit(table, 
table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 7)), branch);
 
-    Snapshot finalSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should have 1 manifest", 1, 
finalSnapshot.allManifests(FILE_IO).size());
     validateManifestEntries(
         finalSnapshot.allManifests(FILE_IO).get(0),
@@ -184,13 +192,15 @@ public class TestDeleteFiles extends TableTestBase {
   @Test
   public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() {
     // add both data files
-    table
-        .newFastAppend()
-        .appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
-        .appendFile(DATA_FILE_BUCKET_0_IDS_8_10)
-        .commit();
+    Snapshot initialSnapshot =
+        commit(
+            table,
+            table
+                .newFastAppend()
+                .appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
+                .appendFile(DATA_FILE_BUCKET_0_IDS_8_10),
+            branch);
 
-    Snapshot initialSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should have 1 manifest", 1, 
initialSnapshot.allManifests(FILE_IO).size());
     validateManifestEntries(
         initialSnapshot.allManifests(FILE_IO).get(0),
@@ -199,9 +209,10 @@ public class TestDeleteFiles extends TableTestBase {
         statuses(Status.ADDED, Status.ADDED));
 
     // delete the second one using a metrics filter (no partition filter)
-    table.newDelete().deleteFromRowFilter(Expressions.greaterThan("id", 
5)).commit();
+    Snapshot deleteSnapshot =
+        commit(
+            table, 
table.newDelete().deleteFromRowFilter(Expressions.greaterThan("id", 5)), 
branch);
 
-    Snapshot deleteSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should have 1 manifest", 1, 
deleteSnapshot.allManifests(FILE_IO).size());
     validateManifestEntries(
         deleteSnapshot.allManifests(FILE_IO).get(0),
@@ -213,13 +224,15 @@ public class TestDeleteFiles extends TableTestBase {
   @Test
   public void testDeleteSomeFilesByRowFilterWithCombinedPredicates() {
     // add both data files
-    table
-        .newFastAppend()
-        .appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
-        .appendFile(DATA_FILE_BUCKET_0_IDS_8_10)
-        .commit();
+    Snapshot initialSnapshot =
+        commit(
+            table,
+            table
+                .newFastAppend()
+                .appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
+                .appendFile(DATA_FILE_BUCKET_0_IDS_8_10),
+            branch);
 
-    Snapshot initialSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should have 1 manifest", 1, 
initialSnapshot.allManifests(FILE_IO).size());
     validateManifestEntries(
         initialSnapshot.allManifests(FILE_IO).get(0),
@@ -231,9 +244,8 @@ public class TestDeleteFiles extends TableTestBase {
     Expression partPredicate = Expressions.equal(Expressions.bucket("data", 
16), 0);
     Expression rowPredicate = Expressions.greaterThan("id", 5);
     Expression predicate = Expressions.and(partPredicate, rowPredicate);
-    table.newDelete().deleteFromRowFilter(predicate).commit();
-
-    Snapshot deleteSnapshot = table.currentSnapshot();
+    Snapshot deleteSnapshot =
+        commit(table, table.newDelete().deleteFromRowFilter(predicate), 
branch);
     Assert.assertEquals("Should have 1 manifest", 1, 
deleteSnapshot.allManifests(FILE_IO).size());
     validateManifestEntries(
         deleteSnapshot.allManifests(FILE_IO).get(0),
@@ -262,18 +274,22 @@ public class TestDeleteFiles extends TableTestBase {
             .withPartitionPath("data_trunc_2=aa")
             .build();
 
-    table.newFastAppend().appendFile(dataFile).commit();
+    commit(table, table.newFastAppend().appendFile(dataFile), branch);
 
     AssertHelpers.assertThrows(
         "Should reject as not all rows match filter",
         ValidationException.class,
         "Cannot delete file where some, but not all, rows match filter",
-        () -> table.newDelete().deleteFromRowFilter(Expressions.equal("data", 
"aa")).commit());
+        () ->
+            commit(
+                table,
+                
table.newDelete().deleteFromRowFilter(Expressions.equal("data", "aa")),
+                branch));
   }
 
   @Test
   public void testDeleteCaseSensitivity() {
-    table.newFastAppend().appendFile(DATA_FILE_BUCKET_0_IDS_0_2).commit();
+    commit(table, 
table.newFastAppend().appendFile(DATA_FILE_BUCKET_0_IDS_0_2), branch);
 
     Expression rowFilter = Expressions.lessThan("iD", 5);
 
@@ -281,17 +297,22 @@ public class TestDeleteFiles extends TableTestBase {
         "Should use case sensitive binding by default",
         ValidationException.class,
         "Cannot find field 'iD'",
-        () -> table.newDelete().deleteFromRowFilter(rowFilter).commit());
+        () -> commit(table, table.newDelete().deleteFromRowFilter(rowFilter), 
branch));
 
     AssertHelpers.assertThrows(
         "Should fail with case sensitive binding",
         ValidationException.class,
         "Cannot find field 'iD'",
-        () -> 
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(true).commit());
+        () ->
+            commit(
+                table,
+                
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(true),
+                branch));
 
-    
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(false).commit();
+    Snapshot deleteSnapshot =
+        commit(
+            table, 
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(false), branch);
 
-    Snapshot deleteSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should have 1 manifest", 1, 
deleteSnapshot.allManifests(FILE_IO).size());
     validateManifestEntries(
         deleteSnapshot.allManifests(FILE_IO).get(0),
@@ -300,6 +321,34 @@ public class TestDeleteFiles extends TableTestBase {
         statuses(Status.DELETED));
   }
 
+  @Test
+  public void testDeleteFilesOnIndependentBranches() {
+    String testBranch = "testBranch";
+    
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
+    Snapshot initialSnapshot = table.currentSnapshot();
+    // Delete A on test branch
+    table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit();
+    Snapshot testBranchTip = table.snapshot(testBranch);
+
+    // Delete B and C on main
+    table.newDelete().deleteFile(FILE_B).deleteFile(FILE_C).commit();
+    Snapshot delete2 = table.currentSnapshot();
+
+    // Verify B and C on testBranch
+    validateManifestEntries(
+        Iterables.getOnlyElement(testBranchTip.allManifests(FILE_IO)),
+        ids(testBranchTip.snapshotId(), initialSnapshot.snapshotId(), 
initialSnapshot.snapshotId()),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(Status.DELETED, Status.EXISTING, Status.EXISTING));
+
+    // Verify A on main
+    validateManifestEntries(
+        Iterables.getOnlyElement(delete2.allManifests(FILE_IO)),
+        ids(initialSnapshot.snapshotId(), delete2.snapshotId(), 
delete2.snapshotId()),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(Status.EXISTING, Status.DELETED, Status.DELETED));
+  }
+
   private static ByteBuffer longToBuffer(long value) {
     return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, 
value);
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java 
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index ec71c856a3..8e9d4ab136 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -39,13 +39,21 @@ import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class TestMergeAppend extends TableTestBase {
-  @Parameterized.Parameters(name = "formatVersion = {0}")
+  private final String branch;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
   public static Object[] parameters() {
-    return new Object[] {1, 2};
+    return new Object[][] {
+      new Object[] {1, "main"},
+      new Object[] {1, "testBranch"},
+      new Object[] {2, "main"},
+      new Object[] {2, "testBranch"}
+    };
   }
 
-  public TestMergeAppend(int formatVersion) {
+  public TestMergeAppend(int formatVersion, String branch) {
     super(formatVersion);
+    this.branch = branch;
   }
 
   @Test
@@ -56,10 +64,10 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertNull("Should not have a current snapshot", 
base.currentSnapshot());
     Assert.assertEquals("Last sequence number should be 0", 0, 
base.lastSequenceNumber());
 
-    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    Snapshot committedSnapshot =
+        commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B), 
branch);
 
-    Snapshot committedSnapshot = table.currentSnapshot();
-    Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+    Assert.assertNotNull("Should create a snapshot", committedSnapshot);
     V1Assert.assertEquals(
         "Last sequence number should be 0", 0, 
table.ops().current().lastSequenceNumber());
     V2Assert.assertEquals(
@@ -89,10 +97,9 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals("Last sequence number should be 0", 0, 
base.lastSequenceNumber());
 
     ManifestFile manifest = writeManifest(FILE_A, FILE_B);
-    table.newAppend().appendManifest(manifest).commit();
+    Snapshot committedSnapshot = commit(table, 
table.newAppend().appendManifest(manifest), branch);
 
-    Snapshot committedSnapshot = table.currentSnapshot();
-    Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+    Assert.assertNotNull("Should create a snapshot", committedSnapshot);
     V1Assert.assertEquals(
         "Last sequence number should be 0", 0, 
table.ops().current().lastSequenceNumber());
     V2Assert.assertEquals(
@@ -126,10 +133,13 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals("Last sequence number should be 0", 0, 
base.lastSequenceNumber());
 
     ManifestFile manifest = writeManifest(FILE_A, FILE_B);
-    
table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest).commit();
+    Snapshot committedSnapshot =
+        commit(
+            table,
+            
table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest),
+            branch);
 
-    Snapshot committedSnapshot = table.currentSnapshot();
-    Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+    Assert.assertNotNull("Should create a snapshot", committedSnapshot);
     V1Assert.assertEquals(
         "Last sequence number should be 0", 0, 
table.ops().current().lastSequenceNumber());
     V2Assert.assertEquals(
@@ -164,23 +174,27 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertNull("Should not have a current snapshot", 
base.currentSnapshot());
     Assert.assertEquals("Last sequence number should be 0", 0, 
base.lastSequenceNumber());
     AtomicInteger scanThreadsIndex = new AtomicInteger(0);
-    table
-        .newAppend()
-        .appendFile(FILE_A)
-        .appendFile(FILE_B)
-        .scanManifestsWith(
-            Executors.newFixedThreadPool(
-                1,
-                runnable -> {
-                  Thread thread = new Thread(runnable);
-                  thread.setName("scan-" + scanThreadsIndex.getAndIncrement());
-                  thread.setDaemon(
-                      true); // daemon threads will be terminated abruptly 
when the JVM exits
-                  return thread;
-                }))
-        .commit();
+    Snapshot snapshot =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendFile(FILE_A)
+                .appendFile(FILE_B)
+                .scanManifestsWith(
+                    Executors.newFixedThreadPool(
+                        1,
+                        runnable -> {
+                          Thread thread = new Thread(runnable);
+                          thread.setName("scan-" + 
scanThreadsIndex.getAndIncrement());
+                          thread.setDaemon(
+                              true); // daemon threads will be terminated 
abruptly when the JVM
+                          // exits
+                          return thread;
+                        })),
+            branch);
     Assert.assertTrue("Thread should be created in provided pool", 
scanThreadsIndex.get() > 0);
-    Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+    Assert.assertNotNull("Should create a snapshot", snapshot);
   }
 
   @Test
@@ -195,10 +209,13 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals("Last sequence number should be 0", 0, 
base.lastSequenceNumber());
 
     ManifestFile manifest = writeManifest(FILE_A, FILE_B);
-    
table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest).commit();
+    Snapshot committedSnapshot =
+        commit(
+            table,
+            
table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest),
+            branch);
 
-    Snapshot committedSnapshot = table.currentSnapshot();
-    Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+    Assert.assertNotNull("Should create a snapshot", committedSnapshot);
     V1Assert.assertEquals(
         "Last sequence number should be 0", 0, 
table.ops().current().lastSequenceNumber());
     V2Assert.assertEquals(
@@ -224,16 +241,16 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
     Assert.assertEquals("Table should start empty", 0, 
listManifestFiles().size());
 
-    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    Snapshot commitBefore =
+        commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B), 
branch);
 
-    Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+    Assert.assertNotNull("Should create a snapshot", commitBefore);
     V1Assert.assertEquals(
         "Last sequence number should be 0", 0, 
table.ops().current().lastSequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 1", 1, 
table.ops().current().lastSequenceNumber());
 
     TableMetadata base = readMetadata();
-    Snapshot commitBefore = table.currentSnapshot();
     long baseId = commitBefore.snapshotId();
     validateSnapshot(null, commitBefore, 1, FILE_A, FILE_B);
 
@@ -241,7 +258,7 @@ public class TestMergeAppend extends TableTestBase {
         "Should create 1 manifest for initial write",
         1,
         commitBefore.allManifests(table.io()).size());
-    ManifestFile initialManifest = 
base.currentSnapshot().allManifests(table.io()).get(0);
+    ManifestFile initialManifest = 
commitBefore.allManifests(table.io()).get(0);
     validateManifest(
         initialManifest,
         seqs(1, 1),
@@ -249,14 +266,13 @@ public class TestMergeAppend extends TableTestBase {
         files(FILE_A, FILE_B),
         statuses(Status.ADDED, Status.ADDED));
 
-    table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+    Snapshot committedAfter =
+        commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D), 
branch);
     V1Assert.assertEquals(
         "Last sequence number should be 0", 0, 
table.ops().current().lastSequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 2", 2, 
table.ops().current().lastSequenceNumber());
 
-    Snapshot committedAfter = table.currentSnapshot();
-
     Assert.assertEquals(
         "Should contain 1 merged manifest for second write",
         1,
@@ -294,14 +310,16 @@ public class TestMergeAppend extends TableTestBase {
     ManifestFile manifest = writeManifest(FILE_A);
     ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
     ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
-    table
-        .newAppend()
-        .appendManifest(manifest)
-        .appendManifest(manifest2)
-        .appendManifest(manifest3)
-        .commit();
+    Snapshot snap1 =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendManifest(manifest)
+                .appendManifest(manifest2)
+                .appendManifest(manifest3),
+            branch);
 
-    Snapshot snap1 = table.currentSnapshot();
     long commitId1 = snap1.snapshotId();
     base = readMetadata();
     V2Assert.assertEquals("Snapshot sequence number should be 1", 1, 
snap1.sequenceNumber());
@@ -312,7 +330,7 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals(
         "Should contain 2 merged manifest for first write",
         2,
-        readMetadata().currentSnapshot().allManifests(table.io()).size());
+        snap1.allManifests(table.io()).size());
     validateManifest(
         snap1.allManifests(table.io()).get(0),
         seqs(1),
@@ -326,13 +344,15 @@ public class TestMergeAppend extends TableTestBase {
         files(FILE_C, FILE_D),
         statuses(Status.ADDED, Status.ADDED));
 
-    table
-        .newAppend()
-        .appendManifest(manifest)
-        .appendManifest(manifest2)
-        .appendManifest(manifest3)
-        .commit();
-    Snapshot snap2 = table.currentSnapshot();
+    Snapshot snap2 =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendManifest(manifest)
+                .appendManifest(manifest2)
+                .appendManifest(manifest3),
+            branch);
     long commitId2 = snap2.snapshotId();
     base = readMetadata();
     V2Assert.assertEquals("Snapshot sequence number should be 2", 2, 
snap2.sequenceNumber());
@@ -343,7 +363,7 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals(
         "Should contain 3 merged manifest for second write",
         3,
-        readMetadata().currentSnapshot().allManifests(table.io()).size());
+        snap2.allManifests(table.io()).size());
     validateManifest(
         snap2.allManifests(table.io()).get(0),
         seqs(2),
@@ -367,14 +387,13 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals(
         "Summary metadata should include 3 added files",
         "3",
-        readMetadata().currentSnapshot().summary().get("added-data-files"));
+        snap2.summary().get("added-data-files"));
   }
 
   @Test
   public void testManifestsMergeIntoOne() throws IOException {
     Assert.assertEquals("Table should start empty", 0, 
listManifestFiles().size());
-    table.newAppend().appendFile(FILE_A).commit();
-    Snapshot snap1 = table.currentSnapshot();
+    Snapshot snap1 = commit(table, table.newAppend().appendFile(FILE_A), 
branch);
     TableMetadata base = readMetadata();
     V2Assert.assertEquals("Snapshot sequence number should be 1", 1, 
snap1.sequenceNumber());
     V2Assert.assertEquals("Last sequence number should be 1", 1, 
base.lastSequenceNumber());
@@ -390,8 +409,7 @@ public class TestMergeAppend extends TableTestBase {
         files(FILE_A),
         statuses(Status.ADDED));
 
-    table.newAppend().appendFile(FILE_B).commit();
-    Snapshot snap2 = table.currentSnapshot();
+    Snapshot snap2 = commit(table, table.newAppend().appendFile(FILE_B), 
branch);
     long commitId2 = snap2.snapshotId();
     base = readMetadata();
     V2Assert.assertEquals("Snapshot sequence number should be 2", 2, 
snap2.sequenceNumber());
@@ -413,12 +431,15 @@ public class TestMergeAppend extends TableTestBase {
         files(FILE_A),
         statuses(Status.ADDED));
 
-    table
-        .newAppend()
-        .appendManifest(
-            writeManifest("input-m0.avro", 
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C)))
-        .commit();
-    Snapshot snap3 = table.currentSnapshot();
+    Snapshot snap3 =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendManifest(
+                    writeManifest(
+                        "input-m0.avro", 
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))),
+            branch);
 
     base = readMetadata();
     V2Assert.assertEquals("Snapshot sequence number should be 3", 3, 
snap3.sequenceNumber());
@@ -449,12 +470,15 @@ public class TestMergeAppend extends TableTestBase {
 
     table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, 
"1").commit();
 
-    table
-        .newAppend()
-        .appendManifest(
-            writeManifest("input-m1.avro", 
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D)))
-        .commit();
-    Snapshot snap4 = table.currentSnapshot();
+    Snapshot snap4 =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendManifest(
+                    writeManifest(
+                        "input-m1.avro", 
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))),
+            branch);
 
     base = readMetadata();
     V2Assert.assertEquals("Snapshot sequence number should be 4", 4, 
snap4.sequenceNumber());
@@ -485,27 +509,28 @@ public class TestMergeAppend extends TableTestBase {
     ManifestFile manifest = writeManifest(FILE_A, FILE_B);
     ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
     ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
-    table
-        .newAppend()
-        .appendManifest(manifest)
-        .appendManifest(manifest2)
-        .appendManifest(manifest3)
-        .commit();
-
-    Assert.assertNotNull("Should create a snapshot", table.currentSnapshot());
+    Snapshot committed =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendManifest(manifest)
+                .appendManifest(manifest2)
+                .appendManifest(manifest3),
+            branch);
+
+    Assert.assertNotNull("Should create a snapshot", committed);
     V1Assert.assertEquals(
         "Last sequence number should be 0", 0, 
table.ops().current().lastSequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 1", 1, 
table.ops().current().lastSequenceNumber());
 
-    Snapshot committed = table.currentSnapshot();
-
     Assert.assertEquals(
         "Should contain 3 merged manifest after 1st write write",
         3,
         committed.allManifests(table.io()).size());
 
-    long snapshotId = table.currentSnapshot().snapshotId();
+    long snapshotId = committed.snapshotId();
 
     validateManifest(
         committed.allManifests(table.io()).get(0),
@@ -541,18 +566,15 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals("Table should start empty", 0, 
listManifestFiles().size());
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
 
-    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    Snapshot snap = commit(table, 
table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch);
 
-    Snapshot snap = table.currentSnapshot();
     validateSnapshot(null, snap, 1, FILE_A, FILE_B);
 
     TableMetadata base = readMetadata();
-    long baseId = base.currentSnapshot().snapshotId();
+    long baseId = snap.snapshotId();
     Assert.assertEquals(
-        "Should create 1 manifest for initial write",
-        1,
-        base.currentSnapshot().allManifests(table.io()).size());
-    ManifestFile initialManifest = 
base.currentSnapshot().allManifests(table.io()).get(0);
+        "Should create 1 manifest for initial write", 1, 
snap.allManifests(table.io()).size());
+    ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
     validateManifest(
         initialManifest,
         seqs(1, 1),
@@ -560,9 +582,8 @@ public class TestMergeAppend extends TableTestBase {
         files(FILE_A, FILE_B),
         statuses(Status.ADDED, Status.ADDED));
 
-    table.newDelete().deleteFile(FILE_A).commit();
+    Snapshot deleteSnapshot = commit(table, 
table.newDelete().deleteFile(FILE_A), branch);
 
-    Snapshot deleteSnapshot = table.currentSnapshot();
     V2Assert.assertEquals(
         "Snapshot sequence number should be 2", 2, 
deleteSnapshot.sequenceNumber());
     V2Assert.assertEquals(
@@ -571,12 +592,12 @@ public class TestMergeAppend extends TableTestBase {
         "Table should end with last-sequence-number 0", 0, 
readMetadata().lastSequenceNumber());
 
     TableMetadata delete = readMetadata();
-    long deleteId = delete.currentSnapshot().snapshotId();
+    long deleteId = latestSnapshot(table, branch).snapshotId();
     Assert.assertEquals(
         "Should create 1 filtered manifest for delete",
         1,
-        delete.currentSnapshot().allManifests(table.io()).size());
-    ManifestFile deleteManifest = 
delete.currentSnapshot().allManifests(table.io()).get(0);
+        latestSnapshot(table, branch).allManifests(table.io()).size());
+    ManifestFile deleteManifest = 
deleteSnapshot.allManifests(table.io()).get(0);
 
     validateManifest(
         deleteManifest,
@@ -585,9 +606,9 @@ public class TestMergeAppend extends TableTestBase {
         files(FILE_A, FILE_B),
         statuses(Status.DELETED, Status.EXISTING));
 
-    table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+    Snapshot committedSnapshot =
+        commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D), 
branch);
 
-    Snapshot committedSnapshot = table.currentSnapshot();
     V2Assert.assertEquals(
         "Snapshot sequence number should be 3", 3, 
committedSnapshot.sequenceNumber());
     V2Assert.assertEquals(
@@ -621,23 +642,18 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
     Assert.assertEquals("Table should start empty", 0, 
listManifestFiles().size());
 
-    table.newFastAppend().appendFile(FILE_A).commit();
-    Snapshot snap1 = table.currentSnapshot();
+    Snapshot snap1 = commit(table, table.newFastAppend().appendFile(FILE_A), 
branch);
     long idFileA = snap1.snapshotId();
     validateSnapshot(null, snap1, 1, FILE_A);
 
-    table.newFastAppend().appendFile(FILE_B).commit();
-    Snapshot snap2 = table.currentSnapshot();
+    Snapshot snap2 = commit(table, table.newFastAppend().appendFile(FILE_B), 
branch);
     long idFileB = snap2.snapshotId();
     validateSnapshot(snap1, snap2, 2, FILE_B);
 
     Assert.assertEquals(
-        "Should have 2 manifests from setup writes",
-        2,
-        readMetadata().currentSnapshot().allManifests(table.io()).size());
+        "Should have 2 manifests from setup writes", 2, 
snap2.allManifests(table.io()).size());
 
-    table.newAppend().appendFile(FILE_C).commit();
-    Snapshot snap3 = table.currentSnapshot();
+    Snapshot snap3 = commit(table, table.newAppend().appendFile(FILE_C), 
branch);
     long idFileC = snap3.snapshotId();
     validateSnapshot(snap2, snap3, 3, FILE_C);
 
@@ -645,11 +661,11 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals(
         "Should have 3 unmerged manifests",
         3,
-        base.currentSnapshot().allManifests(table.io()).size());
-    Set<ManifestFile> unmerged = 
Sets.newHashSet(base.currentSnapshot().allManifests(table.io()));
+        latestSnapshot(table, branch).allManifests(table.io()).size());
+    Set<ManifestFile> unmerged =
+        Sets.newHashSet(latestSnapshot(table, 
branch).allManifests(table.io()));
 
-    table.newAppend().appendFile(FILE_D).commit();
-    Snapshot committed = table.currentSnapshot();
+    Snapshot committed = commit(table, table.newAppend().appendFile(FILE_D), 
branch);
     V2Assert.assertEquals("Snapshot sequence number should be 4", 4, 
committed.sequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 4", 4, 
readMetadata().lastSequenceNumber());
@@ -681,18 +697,15 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
     Assert.assertEquals("Table should start empty", 0, 
listManifestFiles().size());
 
-    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    Snapshot snap = commit(table, 
table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch);
 
-    Snapshot snap = table.currentSnapshot();
     validateSnapshot(null, snap, 1, FILE_A, FILE_B);
 
     TableMetadata base = readMetadata();
-    long baseId = base.currentSnapshot().snapshotId();
+    long baseId = snap.snapshotId();
     Assert.assertEquals(
-        "Should create 1 manifest for initial write",
-        1,
-        base.currentSnapshot().allManifests(table.io()).size());
-    ManifestFile initialManifest = 
base.currentSnapshot().allManifests(table.io()).get(0);
+        "Should create 1 manifest for initial write", 1, 
snap.allManifests(table.io()).size());
+    ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
     validateManifest(
         initialManifest,
         seqs(1, 1),
@@ -700,8 +713,8 @@ public class TestMergeAppend extends TableTestBase {
         files(FILE_A, FILE_B),
         statuses(Status.ADDED, Status.ADDED));
 
-    table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
-    Snapshot committed = table.currentSnapshot();
+    Snapshot committed =
+        commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D), 
branch);
 
     V2Assert.assertEquals("Snapshot sequence number should be 2", 2, 
committed.sequenceNumber());
     V2Assert.assertEquals(
@@ -735,18 +748,15 @@ public class TestMergeAppend extends TableTestBase {
 
   @Test
   public void testChangedPartitionSpec() {
-    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    Snapshot snap = commit(table, 
table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch);
 
-    Snapshot snap = table.currentSnapshot();
     long commitId = snap.snapshotId();
     validateSnapshot(null, snap, 1, FILE_A, FILE_B);
 
     TableMetadata base = readMetadata();
     Assert.assertEquals(
-        "Should create 1 manifest for initial write",
-        1,
-        base.currentSnapshot().allManifests(table.io()).size());
-    ManifestFile initialManifest = 
base.currentSnapshot().allManifests(table.io()).get(0);
+        "Should create 1 manifest for initial write", 1, 
snap.allManifests(table.io()).size());
+    ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
     validateManifest(
         initialManifest,
         seqs(1, 1),
@@ -760,7 +770,7 @@ public class TestMergeAppend extends TableTestBase {
 
     // commit the new partition spec to the table manually
     table.ops().commit(base, base.updatePartitionSpec(newSpec));
-    Snapshot snap2 = table.currentSnapshot();
+    Snapshot snap2 = latestSnapshot(table, branch);
     V2Assert.assertEquals("Snapshot sequence number should be 1", 1, 
snap2.sequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 1", 1, 
readMetadata().lastSequenceNumber());
@@ -775,9 +785,8 @@ public class TestMergeAppend extends TableTestBase {
             .withRecordCount(1)
             .build();
 
-    table.newAppend().appendFile(newFileY).commit();
+    Snapshot lastSnapshot = commit(table, 
table.newAppend().appendFile(newFileY), branch);
 
-    Snapshot lastSnapshot = table.currentSnapshot();
     V2Assert.assertEquals("Snapshot sequence number should be 2", 2, 
lastSnapshot.sequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 2", 2, 
readMetadata().lastSequenceNumber());
@@ -803,23 +812,20 @@ public class TestMergeAppend extends TableTestBase {
 
   @Test
   public void testChangedPartitionSpecMergeExisting() {
-    table.newAppend().appendFile(FILE_A).commit();
+    Snapshot snap1 = commit(table, table.newAppend().appendFile(FILE_A), 
branch);
 
-    Snapshot snap1 = table.currentSnapshot();
     long id1 = snap1.snapshotId();
     validateSnapshot(null, snap1, 1, FILE_A);
 
     // create a second compatible manifest
-    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot snap2 = commit(table, table.newFastAppend().appendFile(FILE_B), 
branch);
 
-    Snapshot snap2 = table.currentSnapshot();
     long id2 = snap2.snapshotId();
     validateSnapshot(snap1, snap2, 2, FILE_B);
 
     TableMetadata base = readMetadata();
-    Assert.assertEquals(
-        "Should contain 2 manifests", 2, 
base.currentSnapshot().allManifests(table.io()).size());
-    ManifestFile manifest = 
base.currentSnapshot().allManifests(table.io()).get(0);
+    Assert.assertEquals("Should contain 2 manifests", 2, 
snap2.allManifests(table.io()).size());
+    ManifestFile manifest = snap2.allManifests(table.io()).get(0);
 
     // build the new spec using the table's schema, which uses fresh IDs
     PartitionSpec newSpec =
@@ -827,7 +833,7 @@ public class TestMergeAppend extends TableTestBase {
 
     // commit the new partition spec to the table manually
     table.ops().commit(base, base.updatePartitionSpec(newSpec));
-    Snapshot snap3 = table.currentSnapshot();
+    Snapshot snap3 = latestSnapshot(table, branch);
     V2Assert.assertEquals("Snapshot sequence number should be 2", 2, 
snap3.sequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 2", 2, 
readMetadata().lastSequenceNumber());
@@ -842,8 +848,7 @@ public class TestMergeAppend extends TableTestBase {
             .withRecordCount(1)
             .build();
 
-    table.newAppend().appendFile(newFileY).commit();
-    Snapshot lastSnapshot = table.currentSnapshot();
+    Snapshot lastSnapshot = commit(table, 
table.newAppend().appendFile(newFileY), branch);
     V2Assert.assertEquals("Snapshot sequence number should be 3", 3, 
lastSnapshot.sequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 3", 3, 
readMetadata().lastSequenceNumber());
@@ -876,21 +881,21 @@ public class TestMergeAppend extends TableTestBase {
     table.updateProperties().set("commit.manifest.min-count-to-merge", 
"1").commit();
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
 
-    table.newAppend().appendFile(FILE_A).commit();
+    Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A), 
branch);
 
     TableMetadata base = readMetadata();
-    long baseId = base.currentSnapshot().snapshotId();
+    long baseId = snap.snapshotId();
     V2Assert.assertEquals("Last sequence number should be 1", 1, 
base.lastSequenceNumber());
     V1Assert.assertEquals(
         "Table should end with last-sequence-number 0", 0, 
base.lastSequenceNumber());
 
-    ManifestFile initialManifest = 
base.currentSnapshot().allManifests(table.io()).get(0);
+    ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
     validateManifest(initialManifest, seqs(1), ids(baseId), files(FILE_A), 
statuses(Status.ADDED));
 
     table.ops().failCommits(5);
 
     AppendFiles append = table.newAppend().appendFile(FILE_B);
-    Snapshot pending = append.apply();
+    Snapshot pending = apply(append, branch);
 
     Assert.assertEquals("Should merge to 1 manifest", 1, 
pending.allManifests(table.io()).size());
     ManifestFile newManifest = pending.allManifests(table.io()).get(0);
@@ -905,7 +910,7 @@ public class TestMergeAppend extends TableTestBase {
         "Should retry 4 times and throw last failure",
         CommitFailedException.class,
         "Injected failure",
-        append::commit);
+        () -> commit(table, append, branch));
 
     V2Assert.assertEquals(
         "Last sequence number should be 1", 1, 
readMetadata().lastSequenceNumber());
@@ -914,10 +919,10 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals(
         "Should only contain 1 manifest file",
         1,
-        table.currentSnapshot().allManifests(table.io()).size());
+        latestSnapshot(table, branch).allManifests(table.io()).size());
 
     validateManifest(
-        table.currentSnapshot().allManifests(table.io()).get(0),
+        latestSnapshot(table, branch).allManifests(table.io()).get(0),
         seqs(1),
         ids(baseId),
         files(initialManifest),
@@ -934,7 +939,7 @@ public class TestMergeAppend extends TableTestBase {
 
     ManifestFile manifest = writeManifest(FILE_A, FILE_B);
     AppendFiles append = table.newAppend().appendManifest(manifest);
-    Snapshot pending = append.apply();
+    Snapshot pending = apply(append, branch);
     ManifestFile newManifest = pending.allManifests(table.io()).get(0);
     Assert.assertTrue("Should create new manifest", new 
File(newManifest.path()).exists());
 
@@ -942,7 +947,7 @@ public class TestMergeAppend extends TableTestBase {
         "Should retry 4 times and throw last failure",
         CommitFailedException.class,
         "Injected failure",
-        append::commit);
+        () -> commit(table, append, branch));
     V2Assert.assertEquals(
         "Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
     V1Assert.assertEquals(
@@ -958,21 +963,21 @@ public class TestMergeAppend extends TableTestBase {
 
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
 
-    table.newAppend().appendFile(FILE_A).commit();
+    Snapshot current = commit(table, table.newAppend().appendFile(FILE_A), 
branch);
 
     TableMetadata base = readMetadata();
-    long baseId = base.currentSnapshot().snapshotId();
+    long baseId = current.snapshotId();
     V2Assert.assertEquals(
         "Last sequence number should be 1", 1, 
readMetadata().lastSequenceNumber());
     V1Assert.assertEquals(
         "Table should end with last-sequence-number 0", 0, 
readMetadata().lastSequenceNumber());
-    ManifestFile initialManifest = 
base.currentSnapshot().allManifests(table.io()).get(0);
+    ManifestFile initialManifest = current.allManifests(table.io()).get(0);
     validateManifest(initialManifest, seqs(1), ids(baseId), files(FILE_A), 
statuses(Status.ADDED));
 
     table.ops().failCommits(3);
 
     AppendFiles append = table.newAppend().appendFile(FILE_B);
-    Snapshot pending = append.apply();
+    Snapshot pending = apply(append, branch);
 
     Assert.assertEquals("Should merge to 1 manifest", 1, 
pending.allManifests(table.io()).size());
     ManifestFile newManifest = pending.allManifests(table.io()).get(0);
@@ -984,17 +989,15 @@ public class TestMergeAppend extends TableTestBase {
         concat(files(FILE_B), files(initialManifest)));
 
     V2Assert.assertEquals(
-        "Snapshot sequence number should be 1", 1, 
table.currentSnapshot().sequenceNumber());
+        "Snapshot sequence number should be 1", 1, latestSnapshot(table, 
branch).sequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 1", 1, 
readMetadata().lastSequenceNumber());
     V1Assert.assertEquals(
         "Table should end with last-sequence-number 0", 0, 
readMetadata().lastSequenceNumber());
 
-    append.commit();
-    Snapshot snapshot = table.currentSnapshot();
+    Snapshot snapshot = commit(table, append, branch);
     long snapshotId = snapshot.snapshotId();
-    V2Assert.assertEquals(
-        "Snapshot sequence number should be 2", 2, 
table.currentSnapshot().sequenceNumber());
+    V2Assert.assertEquals("Snapshot sequence number should be 2", 2, 
snapshot.sequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 2", 2, 
readMetadata().lastSequenceNumber());
     V1Assert.assertEquals(
@@ -1005,12 +1008,10 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals(
         "Should commit the same new manifest during retry",
         Lists.newArrayList(newManifest),
-        metadata.currentSnapshot().allManifests(table.io()));
+        snapshot.allManifests(table.io()));
 
     Assert.assertEquals(
-        "Should only contain 1 merged manifest file",
-        1,
-        table.currentSnapshot().allManifests(table.io()).size());
+        "Should only contain 1 merged manifest file", 1, 
snapshot.allManifests(table.io()).size());
     ManifestFile manifestFile = snapshot.allManifests(table.io()).get(0);
     validateManifest(
         manifestFile,
@@ -1031,13 +1032,12 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertNull("Should not have a current snapshot", 
base.currentSnapshot());
 
     ManifestFile manifest = writeManifest(FILE_A, FILE_B);
-    table.newAppend().appendManifest(manifest).commit();
+    Snapshot snapshot = commit(table, 
table.newAppend().appendManifest(manifest), branch);
 
-    Snapshot snapshot = table.currentSnapshot();
     long snapshotId = snapshot.snapshotId();
     validateSnapshot(null, snapshot, 1, FILE_A, FILE_B);
 
-    List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
+    List<ManifestFile> manifests = snapshot.allManifests(table.io());
     Assert.assertEquals("Should have 1 committed manifest", 1, 
manifests.size());
     ManifestFile manifestFile = snapshot.allManifests(table.io()).get(0);
     validateManifest(
@@ -1079,15 +1079,14 @@ public class TestMergeAppend extends TableTestBase {
     table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, 
"1").commit();
 
     ManifestFile manifest1 = writeManifestWithName("manifest-file-1.avro", 
FILE_A, FILE_B);
-    table.newAppend().appendManifest(manifest1).commit();
+    Snapshot snap1 = commit(table, 
table.newAppend().appendManifest(manifest1), branch);
 
-    Snapshot snap1 = table.currentSnapshot();
     long commitId1 = snap1.snapshotId();
     validateSnapshot(null, snap1, 1, FILE_A, FILE_B);
 
     Assert.assertEquals("Should have only 1 manifest", 1, 
snap1.allManifests(table.io()).size());
     validateManifest(
-        table.currentSnapshot().allManifests(table.io()).get(0),
+        snap1.allManifests(table.io()).get(0),
         seqs(1, 1),
         ids(commitId1, commitId1),
         files(FILE_A, FILE_B),
@@ -1096,12 +1095,10 @@ public class TestMergeAppend extends TableTestBase {
         "Unmerged append manifest should not be deleted", new 
File(manifest1.path()).exists());
 
     ManifestFile manifest2 = writeManifestWithName("manifest-file-2.avro", 
FILE_C, FILE_D);
-    table.newAppend().appendManifest(manifest2).commit();
+    Snapshot snap2 = commit(table, 
table.newAppend().appendManifest(manifest2), branch);
 
-    Snapshot snap2 = table.currentSnapshot();
     long commitId2 = snap2.snapshotId();
-    V2Assert.assertEquals(
-        "Snapshot sequence number should be 2", 2, 
table.currentSnapshot().sequenceNumber());
+    V2Assert.assertEquals("Snapshot sequence number should be 2", 2, 
snap2.sequenceNumber());
     V2Assert.assertEquals(
         "Last sequence number should be 2", 2, 
readMetadata().lastSequenceNumber());
     V1Assert.assertEquals(
@@ -1110,7 +1107,7 @@ public class TestMergeAppend extends TableTestBase {
     Assert.assertEquals(
         "Manifests should be merged into 1", 1, 
snap2.allManifests(table.io()).size());
     validateManifest(
-        table.currentSnapshot().allManifests(table.io()).get(0),
+        latestSnapshot(table, branch).allManifests(table.io()).get(0),
         seqs(2, 2, 1, 1),
         ids(commitId2, commitId2, commitId1, commitId1),
         files(FILE_C, FILE_D, FILE_A, FILE_B),
@@ -1140,7 +1137,10 @@ public class TestMergeAppend extends TableTestBase {
     append.appendManifest(manifest);
 
     AssertHelpers.assertThrows(
-        "Should reject commit", CommitFailedException.class, "Injected 
failure", append::commit);
+        "Should reject commit",
+        CommitFailedException.class,
+        "Injected failure",
+        () -> commit(table, append, branch));
 
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
     Assert.assertTrue("Append manifest should not be deleted", new 
File(manifest.path()).exists());
@@ -1159,7 +1159,7 @@ public class TestMergeAppend extends TableTestBase {
         "Should reject commit",
         IllegalArgumentException.class,
         "Cannot append manifest with existing files",
-        () -> 
table.newAppend().appendManifest(manifestWithExistingFiles).commit());
+        () -> commit(table, 
table.newAppend().appendManifest(manifestWithExistingFiles), branch));
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
 
     ManifestFile manifestWithDeletedFiles =
@@ -1168,7 +1168,7 @@ public class TestMergeAppend extends TableTestBase {
         "Should reject commit",
         IllegalArgumentException.class,
         "Cannot append manifest with deleted files",
-        () -> 
table.newAppend().appendManifest(manifestWithDeletedFiles).commit());
+        () -> commit(table, 
table.newAppend().appendManifest(manifestWithDeletedFiles), branch));
     Assert.assertEquals("Last sequence number should be 0", 0, 
readMetadata().lastSequenceNumber());
   }
 
@@ -1217,18 +1217,15 @@ public class TestMergeAppend extends TableTestBase {
 
   @Test
   public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() {
-    table.newAppend().appendFile(FILE_A).commit();
+    Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A), 
branch);
 
-    Snapshot snap = table.currentSnapshot();
     long commitId = snap.snapshotId();
     validateSnapshot(null, snap, 1, FILE_A);
     TableMetadata base = readMetadata();
 
     Assert.assertEquals(
-        "Should create 1 manifest for initial write",
-        1,
-        base.currentSnapshot().allManifests(table.io()).size());
-    ManifestFile initialManifest = 
base.currentSnapshot().allManifests(table.io()).get(0);
+        "Should create 1 manifest for initial write", 1, 
snap.allManifests(table.io()).size());
+    ManifestFile initialManifest = snap.allManifests(table.io()).get(0);
     validateManifest(
         initialManifest, seqs(1), ids(commitId), files(FILE_A), 
statuses(Status.ADDED));
 
@@ -1252,8 +1249,7 @@ public class TestMergeAppend extends TableTestBase {
             .withRecordCount(1)
             .build();
 
-    table.newAppend().appendFile(newFile).commit();
-    Snapshot committedSnapshot = table.currentSnapshot();
+    Snapshot committedSnapshot = commit(table, 
table.newAppend().appendFile(newFile), branch);
 
     V2Assert.assertEquals(
         "Snapshot sequence number should be 2", 2, 
committedSnapshot.sequenceNumber());
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java 
b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index e8fb5e2658..53e5af520d 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -1150,10 +1151,7 @@ public class TestRemoveSnapshots extends TableTestBase {
             .add(firstSnapshot.manifestListLocation())
             .add(secondSnapshot.manifestListLocation())
             .add(thirdSnapshot.manifestListLocation())
-            .addAll(
-                secondSnapshot.allManifests(FILE_IO).stream()
-                    .map(ManifestFile::path)
-                    .collect(Collectors.toList()))
+            .addAll(manifestPaths(secondSnapshot, table.io()))
             .addAll(
                 manifestOfDeletedFiles.stream()
                     .map(ManifestFile::path)
@@ -1455,6 +1453,64 @@ public class TestRemoveSnapshots extends TableTestBase {
     Assert.assertNull(table.ops().current().snapshot(initialSnapshotId));
   }
 
+  @Test
+  public void testRetainFilesOnRetainedBranches() {
+    // Append a file to main and test branch
+    String testBranch = "test-branch";
+    table.newAppend().appendFile(FILE_A).commit();
+    Snapshot appendA = table.currentSnapshot();
+    table.manageSnapshots().createBranch(testBranch, 
appendA.snapshotId()).commit();
+
+    // Delete A from main
+    table.newDelete().deleteFile(FILE_A).commit();
+    Snapshot deletionA = table.currentSnapshot();
+    // Add B to main
+    table.newAppend().appendFile(FILE_B).commit();
+    long tAfterCommits = 
waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+    Set<String> deletedFiles = Sets.newHashSet();
+    Set<String> expectedDeletes = Sets.newHashSet();
+
+    // Only deletionA's manifest list and manifests should be removed
+    expectedDeletes.add(deletionA.manifestListLocation());
+    expectedDeletes.addAll(manifestPaths(deletionA, table.io()));
+    
table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit();
+
+    Assert.assertEquals(2, Iterables.size(table.snapshots()));
+    Assert.assertEquals(expectedDeletes, deletedFiles);
+
+    // Delete A on test branch
+    table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit();
+    Snapshot branchDelete = table.snapshot(testBranch);
+
+    // Append C on test branch
+    table.newAppend().appendFile(FILE_C).toBranch(testBranch).commit();
+    Snapshot testBranchHead = table.snapshot(testBranch);
+
+    deletedFiles = Sets.newHashSet();
+    expectedDeletes = Sets.newHashSet();
+
+    waitUntilAfter(testBranchHead.timestampMillis());
+    table
+        .expireSnapshots()
+        .expireOlderThan(testBranchHead.timestampMillis())
+        .deleteWith(deletedFiles::add)
+        .commit();
+
+    expectedDeletes.add(appendA.manifestListLocation());
+    expectedDeletes.addAll(manifestPaths(appendA, table.io()));
+    expectedDeletes.add(branchDelete.manifestListLocation());
+    expectedDeletes.addAll(manifestPaths(branchDelete, table.io()));
+    expectedDeletes.add(FILE_A.path().toString());
+
+    Assert.assertEquals(2, Iterables.size(table.snapshots()));
+    Assert.assertEquals(expectedDeletes, deletedFiles);
+  }
+
+  private Set<String> manifestPaths(Snapshot snapshot, FileIO io) {
+    return 
snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet());
+  }
+
   private RemoveSnapshots removeSnapshots(Table table) {
     RemoveSnapshots removeSnapshots = (RemoveSnapshots) 
table.expireSnapshots();
     return (RemoveSnapshots) 
removeSnapshots.withIncrementalCleanup(incrementalCleanup);

Reply via email to