This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e239bc4c2 Core, API: Support incremental scanning with branch (#5984)
6e239bc4c2 is described below

commit 6e239bc4c23abfca12ff56198bc0b21f0f2b3609
Author: Liwei Li <[email protected]>
AuthorDate: Wed Aug 23 15:20:41 2023 +0800

    Core, API: Support incremental scanning with branch (#5984)
---
 .../java/org/apache/iceberg/IncrementalScan.java   |  61 ++++-
 .../org/apache/iceberg/BaseIncrementalScan.java    |  49 +++-
 .../java/org/apache/iceberg/TableScanContext.java  |   7 +
 .../iceberg/TestBaseIncrementalAppendScan.java     | 265 +++++++++++++++++++++
 4 files changed, 378 insertions(+), 4 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/IncrementalScan.java 
b/api/src/main/java/org/apache/iceberg/IncrementalScan.java
index 1f7a8dff66..0be0849c6a 100644
--- a/api/src/main/java/org/apache/iceberg/IncrementalScan.java
+++ b/api/src/main/java/org/apache/iceberg/IncrementalScan.java
@@ -24,7 +24,7 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G 
extends ScanTaskGr
   /**
    * Instructs this scan to look for changes starting from a particular 
snapshot (inclusive).
    *
-   * <p>If the start snapshot is not configured, it is defaulted to the oldest 
ancestor of the end
+   * <p>If the start snapshot is not configured, it defaults to the oldest 
ancestor of the end
    * snapshot (inclusive).
    *
    * @param fromSnapshotId the start snapshot ID (inclusive)
@@ -33,10 +33,25 @@ public interface IncrementalScan<ThisT, T extends ScanTask, 
G extends ScanTaskGr
    */
   ThisT fromSnapshotInclusive(long fromSnapshotId);
 
+  /**
+   * Instructs this scan to look for changes starting from a particular 
snapshot (inclusive).
+   *
+   * <p>If the start snapshot is not configured, it defaults to the oldest 
ancestor of the end
+   * snapshot (inclusive).
+   *
+   * @param ref the start ref name that points to a particular snapshot ID 
(inclusive)
+   * @return this for method chaining
+   * @throws IllegalArgumentException if the start snapshot is not an ancestor 
of the end snapshot
+   */
+  default ThisT fromSnapshotInclusive(String ref) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement 
fromSnapshotInclusive");
+  }
+
   /**
    * Instructs this scan to look for changes starting from a particular 
snapshot (exclusive).
    *
-   * <p>If the start snapshot is not configured, it is defaulted to the oldest 
ancestor of the end
+   * <p>If the start snapshot is not configured, it defaults to the oldest 
ancestor of the end
    * snapshot (inclusive).
    *
    * @param fromSnapshotId the start snapshot ID (exclusive)
@@ -45,14 +60,54 @@ public interface IncrementalScan<ThisT, T extends ScanTask, 
G extends ScanTaskGr
    */
   ThisT fromSnapshotExclusive(long fromSnapshotId);
 
+  /**
+   * Instructs this scan to look for changes starting from a particular 
snapshot (exclusive).
+   *
+   * <p>If the start snapshot is not configured, it defaults to the oldest 
ancestor of the end
+   * snapshot (inclusive).
+   *
+   * @param ref the start ref name that points to a particular snapshot ID 
(exclusive)
+   * @return this for method chaining
+   * @throws IllegalArgumentException if the start snapshot is not an ancestor 
of the end snapshot
+   */
+  default ThisT fromSnapshotExclusive(String ref) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement 
fromSnapshotExclusive");
+  }
+
   /**
    * Instructs this scan to look for changes up to a particular snapshot 
(inclusive).
    *
-   * <p>If the end snapshot is not configured, it is defaulted to the current 
table snapshot
+   * <p>If the end snapshot is not configured, it defaults to the current 
table snapshot
    * (inclusive).
    *
    * @param toSnapshotId the end snapshot ID (inclusive)
    * @return this for method chaining
    */
   ThisT toSnapshot(long toSnapshotId);
+
+  /**
+   * Instructs this scan to look for changes up to a particular snapshot ref 
(inclusive).
+   *
+   * <p>If the end snapshot is not configured, it defaults to the current 
table snapshot
+   * (inclusive).
+   *
+   * @param ref the end snapshot Ref (inclusive)
+   * @return this for method chaining
+   */
+  default ThisT toSnapshot(String ref) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement toSnapshot");
+  }
+
+  /**
+   * Use the specified branch
+   *
+   * @param branch the branch name
+   * @return this for method chaining
+   */
+  default ThisT useBranch(String branch) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement useBranch");
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java 
b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
index 8bcb538164..1eef1bd540 100644
--- a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
@@ -34,6 +34,14 @@ abstract class BaseIncrementalScan<ThisT, T extends 
ScanTask, G extends ScanTask
   protected abstract CloseableIterable<T> doPlanFiles(
       Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
 
+  @Override
+  public ThisT fromSnapshotInclusive(String ref) {
+    SnapshotRef snapshotRef = table().refs().get(ref);
+    Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", 
ref);
+    Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", 
ref);
+    return fromSnapshotInclusive(snapshotRef.snapshotId());
+  }
+
   @Override
   public ThisT fromSnapshotInclusive(long fromSnapshotId) {
     Preconditions.checkArgument(
@@ -44,6 +52,14 @@ abstract class BaseIncrementalScan<ThisT, T extends 
ScanTask, G extends ScanTask
     return newRefinedScan(table(), schema(), newContext);
   }
 
+  @Override
+  public ThisT fromSnapshotExclusive(String ref) {
+    SnapshotRef snapshotRef = table().refs().get(ref);
+    Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", 
ref);
+    Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", 
ref);
+    return fromSnapshotExclusive(snapshotRef.snapshotId());
+  }
+
   @Override
   public ThisT fromSnapshotExclusive(long fromSnapshotId) {
     // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be 
applied
@@ -60,6 +76,22 @@ abstract class BaseIncrementalScan<ThisT, T extends 
ScanTask, G extends ScanTask
     return newRefinedScan(table(), schema(), newContext);
   }
 
+  @Override
+  public ThisT toSnapshot(String ref) {
+    SnapshotRef snapshotRef = table().refs().get(ref);
+    Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", 
ref);
+    Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", 
ref);
+    return toSnapshot(snapshotRef.snapshotId());
+  }
+
+  @Override
+  public ThisT useBranch(String branch) {
+    SnapshotRef snapshotRef = table().refs().get(branch);
+    Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", 
branch);
+    Preconditions.checkArgument(snapshotRef.isBranch(), "Ref %s is not a 
branch", branch);
+    return newRefinedScan(table(), schema(), context().useBranch(branch));
+  }
+
   @Override
   public CloseableIterable<T> planFiles() {
     if (scanCurrentLineage() && table().currentSnapshot() == null) {
@@ -100,9 +132,24 @@ abstract class BaseIncrementalScan<ThisT, T extends 
ScanTask, G extends ScanTask
 
   private long toSnapshotIdInclusive() {
     if (context().toSnapshotId() != null) {
+      if (context().branch() != null) {
+        Snapshot currentSnapshot = table().snapshot(context().branch());
+        Preconditions.checkArgument(
+            SnapshotUtil.isAncestorOf(
+                table(), currentSnapshot.snapshotId(), 
context().toSnapshotId()),
+            "End snapshot is not a valid snapshot on the current branch: %s",
+            context().branch());
+      }
+
       return context().toSnapshotId();
     } else {
-      Snapshot currentSnapshot = table().currentSnapshot();
+      Snapshot currentSnapshot;
+      if (context().branch() != null) {
+        currentSnapshot = table().snapshot(context().branch());
+      } else {
+        currentSnapshot = table().currentSnapshot();
+      }
+
       Preconditions.checkArgument(
           currentSnapshot != null, "End snapshot is not set and table has no 
current snapshot");
       return currentSnapshot.snapshotId();
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java 
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index 87a2f59f6c..b59446a717 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -96,6 +96,9 @@ abstract class TableScanContext {
     return LoggingMetricsReporter.instance();
   }
 
+  @Nullable
+  public abstract String branch();
+
   TableScanContext useSnapshotId(Long scanSnapshotId) {
     return 
ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build();
   }
@@ -172,6 +175,10 @@ abstract class TableScanContext {
         .build();
   }
 
+  TableScanContext useBranch(String ref) {
+    return ImmutableTableScanContext.builder().from(this).branch(ref).build();
+  }
+
   public static TableScanContext empty() {
     return ImmutableTableScanContext.builder().build();
   }
diff --git 
a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java 
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
index 5836555593..c40cc35d24 100644
--- a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
@@ -51,6 +51,170 @@ public class TestBaseIncrementalAppendScan
     Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
   }
 
+  @Test
+  public void fromSnapshotInclusiveWithNonExistingRef() {
+    Assertions.assertThatThrownBy(() -> 
newScan().fromSnapshotInclusive("nonExistingRef"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot find ref: nonExistingRef");
+  }
+
+  @Test
+  public void fromSnapshotInclusiveWithTag() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+
+    String tagSnapshotAName = "t1";
+    table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+    String tagSnapshotBName = "t2";
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    long snapshotBId = table.currentSnapshot().snapshotId();
+    table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+    table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+
+    /*
+              files:FILE_A         files:FILE_B FILE_B       files:FILE_C 
FILE_C
+     ---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ----  currentSnapshot
+    */
+    IncrementalAppendScan scan = 
newScan().fromSnapshotInclusive(tagSnapshotAName);
+    Assertions.assertThat(scan.planFiles()).hasSize(5);
+
+    IncrementalAppendScan scanWithToSnapshot =
+        
newScan().fromSnapshotInclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName);
+    Assertions.assertThat(scanWithToSnapshot.planFiles()).hasSize(3);
+  }
+
+  @Test
+  public void fromSnapshotInclusiveWithBranchShouldFail() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+    Assertions.assertThatThrownBy(() -> 
newScan().fromSnapshotInclusive(branchName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(String.format("Ref %s is not a tag", branchName));
+
+    Assertions.assertThatThrownBy(
+            () -> 
newScan().fromSnapshotInclusive(snapshotAId).toSnapshot(branchName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(String.format("Ref %s is not a tag", branchName));
+  }
+
+  @Test
+  public void testUseBranch() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    String tagSnapshotAName = "t1";
+    table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+    table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+    String tagName2 = "t2";
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    long snapshotMainBId = table.currentSnapshot().snapshotId();
+    table.manageSnapshots().createTag(tagName2, snapshotMainBId).commit();
+
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+
+    table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+    long snapshotBranchBId = table.snapshot(branchName).snapshotId();
+
+    table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+    long snapshotBranchCId = table.snapshot(branchName).snapshotId();
+
+    /*
+
+            files:FILE_A         files:FILE_B FILE_B       files:FILE_B FILE_B
+     ---- snapshotA(tag:t1) ---- snapshotMainB(tag:t2) ----  currentSnapshot
+                        \
+                         \
+                          \files:FILE_C
+                          snapshotBranchB
+                            \
+                             \
+                              \files:FILE_C
+                          snapshotBranchC(branch:b1)
+    */
+    IncrementalAppendScan scan = 
newScan().fromSnapshotInclusive(tagSnapshotAName);
+    Assertions.assertThat(scan.planFiles()).hasSize(5);
+
+    IncrementalAppendScan scan2 =
+        
newScan().fromSnapshotInclusive(tagSnapshotAName).useBranch(branchName);
+    Assertions.assertThat(scan2.planFiles()).hasSize(3);
+
+    IncrementalAppendScan scan3 = 
newScan().toSnapshot(snapshotBranchBId).useBranch(branchName);
+    Assertions.assertThat(scan3.planFiles()).hasSize(2);
+
+    IncrementalAppendScan scan4 = 
newScan().toSnapshot(snapshotBranchCId).useBranch(branchName);
+    Assertions.assertThat(scan4.planFiles()).hasSize(3);
+
+    IncrementalAppendScan scan5 =
+        newScan()
+            .fromSnapshotExclusive(tagSnapshotAName)
+            .toSnapshot(snapshotBranchBId)
+            .useBranch(branchName);
+    Assertions.assertThat(scan5.planFiles()).hasSize(1);
+  }
+
+  @Test
+  public void testUseBranchWithTagShouldFail() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+    String tagSnapshotAName = "t1";
+    table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+    Assertions.assertThatThrownBy(
+            () -> 
newScan().fromSnapshotInclusive(snapshotAId).useBranch(tagSnapshotAName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(String.format("Ref %s is not a branch", tagSnapshotAName));
+  }
+
+  @Test
+  public void testUseBranchWithInvalidSnapshotShouldFail() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    long snapshotMainBId = table.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+    table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+    long snapshotBranchBId = table.snapshot(branchName).snapshotId();
+
+    /*
+
+          files:FILE_A            files:FILE_B FILE_B
+          ---- snapshotA  ------ snapshotMainB
+                        \
+                         \
+                          \files:FILE_C
+                          snapshotBranchB(branch:b1)
+    */
+    Assertions.assertThatThrownBy(
+            () -> 
newScan().toSnapshot(snapshotMainBId).useBranch(branchName).planFiles())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("End snapshot is not a valid snapshot on the 
current branch");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                
newScan().fromSnapshotInclusive(snapshotMainBId).useBranch(branchName).planFiles())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            String.format(
+                "Starting snapshot (inclusive) %s is not an ancestor of end 
snapshot %s",
+                snapshotMainBId, snapshotBranchBId));
+  }
+
+  @Test
+  public void testUseBranchWithNonExistingRef() {
+    Assertions.assertThatThrownBy(() -> newScan().useBranch("nonExistingRef"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot find ref: nonExistingRef");
+  }
+
   @Test
   public void testFromSnapshotExclusive() {
     table.newFastAppend().appendFile(FILE_A).commit();
@@ -88,6 +252,51 @@ public class TestBaseIncrementalAppendScan
     Assert.assertEquals(1, Iterables.size(scanWithToSnapshot.planFiles()));
   }
 
+  @Test
+  public void fromSnapshotExclusiveWithNonExistingRef() {
+    Assertions.assertThatThrownBy(() -> 
newScan().fromSnapshotExclusive("nonExistingRef"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot find ref: nonExistingRef");
+  }
+
+  @Test
+  public void testFromSnapshotExclusiveWithTag() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+
+    String tagSnapshotAName = "t1";
+    table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+    String tagSnapshotBName = "t2";
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    long snapshotBId = table.currentSnapshot().snapshotId();
+    table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+    table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+
+    /*
+              files:FILE_A         files:FILE_B FILE_B       files:FILE_C 
FILE_C
+     ---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ----  currentSnapshot
+    */
+    IncrementalAppendScan scan = 
newScan().fromSnapshotExclusive(tagSnapshotAName);
+    Assertions.assertThat(scan.planFiles()).hasSize(4);
+
+    IncrementalAppendScan scanWithToSnapshot =
+        
newScan().fromSnapshotExclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName);
+    Assertions.assertThat(scanWithToSnapshot.planFiles()).hasSize(2);
+  }
+
+  @Test
+  public void fromSnapshotExclusiveWithBranchShouldFail() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+    Assertions.assertThatThrownBy(() -> 
newScan().fromSnapshotExclusive(branchName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(String.format("Ref %s is not a tag", branchName));
+  }
+
   @Test
   public void testToSnapshot() {
     table.newFastAppend().appendFile(FILE_A).commit();
@@ -101,6 +310,62 @@ public class TestBaseIncrementalAppendScan
     Assert.assertEquals(2, Iterables.size(scan.planFiles()));
   }
 
+  @Test
+  public void testToSnapshotWithTag() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+    table.newFastAppend().appendFile(FILE_B).commit();
+    long snapshotBId = table.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName, snapshotBId).commit();
+
+    String tagSnapshotMainBName = "t1";
+    table.manageSnapshots().createTag(tagSnapshotMainBName, 
snapshotBId).commit();
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+
+    String tagSnapshotBranchBName = "t2";
+    table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+    long snapshotBranchBId = table.snapshot(branchName).snapshotId();
+    table.manageSnapshots().createTag(tagSnapshotBranchBName, 
snapshotBranchBId).commit();
+
+    /*
+
+          files:FILE_A            files:FILE_B              files:FILE_B FILE_B
+          ----snapshotA  ------ snapshotMainB(tag:t1) --------  currentSnapshot
+                                        \
+                                         \
+                                          \files:FILE_C
+                                          snapshotBranchB(branch:b1, tag:t2)
+    */
+    IncrementalAppendScan scan = newScan().toSnapshot(tagSnapshotMainBName);
+    Assertions.assertThat(scan.planFiles()).hasSize(2);
+
+    IncrementalAppendScan scan2 = newScan().toSnapshot(tagSnapshotBranchBName);
+    Assertions.assertThat(scan2.planFiles()).hasSize(3);
+  }
+
+  @Test
+  public void testToSnapshotWithNonExistingRef() {
+    Assertions.assertThatThrownBy(() -> newScan().toSnapshot("nonExistingRef"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot find ref: nonExistingRef");
+  }
+
+  @Test
+  public void testToSnapshotWithBranchShouldFail() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    table.newFastAppend().appendFile(FILE_B).commit();
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName, snapshotId).commit();
+
+    Assertions.assertThatThrownBy(() -> newScan().toSnapshot(branchName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(String.format("Ref %s is not a tag", branchName));
+  }
+
   @Test
   public void testMultipleRootSnapshots() throws Exception {
     table.newFastAppend().appendFile(FILE_A).commit();

Reply via email to