This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6e239bc4c2 Core, API: Support incremental scanning with branch (#5984)
6e239bc4c2 is described below
commit 6e239bc4c23abfca12ff56198bc0b21f0f2b3609
Author: Liwei Li <[email protected]>
AuthorDate: Wed Aug 23 15:20:41 2023 +0800
Core, API: Support incremental scanning with branch (#5984)
---
.../java/org/apache/iceberg/IncrementalScan.java | 61 ++++-
.../org/apache/iceberg/BaseIncrementalScan.java | 49 +++-
.../java/org/apache/iceberg/TableScanContext.java | 7 +
.../iceberg/TestBaseIncrementalAppendScan.java | 265 +++++++++++++++++++++
4 files changed, 378 insertions(+), 4 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/IncrementalScan.java
b/api/src/main/java/org/apache/iceberg/IncrementalScan.java
index 1f7a8dff66..0be0849c6a 100644
--- a/api/src/main/java/org/apache/iceberg/IncrementalScan.java
+++ b/api/src/main/java/org/apache/iceberg/IncrementalScan.java
@@ -24,7 +24,7 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G
extends ScanTaskGr
/**
* Instructs this scan to look for changes starting from a particular
snapshot (inclusive).
*
- * <p>If the start snapshot is not configured, it is defaulted to the oldest
ancestor of the end
+ * <p>If the start snapshot is not configured, it defaults to the oldest
ancestor of the end
* snapshot (inclusive).
*
* @param fromSnapshotId the start snapshot ID (inclusive)
@@ -33,10 +33,25 @@ public interface IncrementalScan<ThisT, T extends ScanTask,
G extends ScanTaskGr
*/
ThisT fromSnapshotInclusive(long fromSnapshotId);
+ /**
+ * Instructs this scan to look for changes starting from a particular
snapshot (inclusive).
+ *
+ * <p>If the start snapshot is not configured, it defaults to the oldest
ancestor of the end
+ * snapshot (inclusive).
+ *
+ * @param ref the start ref name that points to a particular snapshot ID
(inclusive)
+ * @return this for method chaining
+ * @throws IllegalArgumentException if the start snapshot is not an ancestor
of the end snapshot
+ */
+ default ThisT fromSnapshotInclusive(String ref) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement
fromSnapshotInclusive");
+ }
+
/**
* Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
*
- * <p>If the start snapshot is not configured, it is defaulted to the oldest
ancestor of the end
+ * <p>If the start snapshot is not configured, it defaults to the oldest
ancestor of the end
* snapshot (inclusive).
*
* @param fromSnapshotId the start snapshot ID (exclusive)
@@ -45,14 +60,54 @@ public interface IncrementalScan<ThisT, T extends ScanTask,
G extends ScanTaskGr
*/
ThisT fromSnapshotExclusive(long fromSnapshotId);
+ /**
+ * Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
+ *
+ * <p>If the start snapshot is not configured, it defaults to the oldest
ancestor of the end
+ * snapshot (inclusive).
+ *
+ * @param ref the start ref name that points to a particular snapshot ID
(exclusive)
+ * @return this for method chaining
+ * @throws IllegalArgumentException if the start snapshot is not an ancestor
of the end snapshot
+ */
+ default ThisT fromSnapshotExclusive(String ref) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement
fromSnapshotExclusive");
+ }
+
/**
* Instructs this scan to look for changes up to a particular snapshot
(inclusive).
*
- * <p>If the end snapshot is not configured, it is defaulted to the current
table snapshot
+ * <p>If the end snapshot is not configured, it defaults to the current
table snapshot
* (inclusive).
*
* @param toSnapshotId the end snapshot ID (inclusive)
* @return this for method chaining
*/
ThisT toSnapshot(long toSnapshotId);
+
+ /**
+ * Instructs this scan to look for changes up to a particular snapshot ref
(inclusive).
+ *
+ * <p>If the end snapshot is not configured, it defaults to the current
table snapshot
+ * (inclusive).
+ *
+ * @param ref the end snapshot Ref (inclusive)
+ * @return this for method chaining
+ */
+ default ThisT toSnapshot(String ref) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement toSnapshot");
+ }
+
+ /**
+ * Use the specified branch
+ *
+ * @param branch the branch name
+ * @return this for method chaining
+ */
+ default ThisT useBranch(String branch) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement useBranch");
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
index 8bcb538164..1eef1bd540 100644
--- a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
@@ -34,6 +34,14 @@ abstract class BaseIncrementalScan<ThisT, T extends
ScanTask, G extends ScanTask
protected abstract CloseableIterable<T> doPlanFiles(
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+ @Override
+ public ThisT fromSnapshotInclusive(String ref) {
+ SnapshotRef snapshotRef = table().refs().get(ref);
+ Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s",
ref);
+ Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag",
ref);
+ return fromSnapshotInclusive(snapshotRef.snapshotId());
+ }
+
@Override
public ThisT fromSnapshotInclusive(long fromSnapshotId) {
Preconditions.checkArgument(
@@ -44,6 +52,14 @@ abstract class BaseIncrementalScan<ThisT, T extends
ScanTask, G extends ScanTask
return newRefinedScan(table(), schema(), newContext);
}
+ @Override
+ public ThisT fromSnapshotExclusive(String ref) {
+ SnapshotRef snapshotRef = table().refs().get(ref);
+ Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s",
ref);
+ Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag",
ref);
+ return fromSnapshotExclusive(snapshotRef.snapshotId());
+ }
+
@Override
public ThisT fromSnapshotExclusive(long fromSnapshotId) {
// for exclusive behavior, table().snapshot(fromSnapshotId) check can't be
applied
@@ -60,6 +76,22 @@ abstract class BaseIncrementalScan<ThisT, T extends
ScanTask, G extends ScanTask
return newRefinedScan(table(), schema(), newContext);
}
+ @Override
+ public ThisT toSnapshot(String ref) {
+ SnapshotRef snapshotRef = table().refs().get(ref);
+ Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s",
ref);
+ Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag",
ref);
+ return toSnapshot(snapshotRef.snapshotId());
+ }
+
+ @Override
+ public ThisT useBranch(String branch) {
+ SnapshotRef snapshotRef = table().refs().get(branch);
+ Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s",
branch);
+ Preconditions.checkArgument(snapshotRef.isBranch(), "Ref %s is not a
branch", branch);
+ return newRefinedScan(table(), schema(), context().useBranch(branch));
+ }
+
@Override
public CloseableIterable<T> planFiles() {
if (scanCurrentLineage() && table().currentSnapshot() == null) {
@@ -100,9 +132,24 @@ abstract class BaseIncrementalScan<ThisT, T extends
ScanTask, G extends ScanTask
private long toSnapshotIdInclusive() {
if (context().toSnapshotId() != null) {
+ if (context().branch() != null) {
+ Snapshot currentSnapshot = table().snapshot(context().branch());
+ Preconditions.checkArgument(
+ SnapshotUtil.isAncestorOf(
+ table(), currentSnapshot.snapshotId(),
context().toSnapshotId()),
+ "End snapshot is not a valid snapshot on the current branch: %s",
+ context().branch());
+ }
+
return context().toSnapshotId();
} else {
- Snapshot currentSnapshot = table().currentSnapshot();
+ Snapshot currentSnapshot;
+ if (context().branch() != null) {
+ currentSnapshot = table().snapshot(context().branch());
+ } else {
+ currentSnapshot = table().currentSnapshot();
+ }
+
Preconditions.checkArgument(
currentSnapshot != null, "End snapshot is not set and table has no
current snapshot");
return currentSnapshot.snapshotId();
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index 87a2f59f6c..b59446a717 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -96,6 +96,9 @@ abstract class TableScanContext {
return LoggingMetricsReporter.instance();
}
+ @Nullable
+ public abstract String branch();
+
TableScanContext useSnapshotId(Long scanSnapshotId) {
return
ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build();
}
@@ -172,6 +175,10 @@ abstract class TableScanContext {
.build();
}
+ TableScanContext useBranch(String ref) {
+ return ImmutableTableScanContext.builder().from(this).branch(ref).build();
+ }
+
public static TableScanContext empty() {
return ImmutableTableScanContext.builder().build();
}
diff --git
a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
index 5836555593..c40cc35d24 100644
--- a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
@@ -51,6 +51,170 @@ public class TestBaseIncrementalAppendScan
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
}
+ @Test
+ public void fromSnapshotInclusiveWithNonExistingRef() {
+ Assertions.assertThatThrownBy(() ->
newScan().fromSnapshotInclusive("nonExistingRef"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot find ref: nonExistingRef");
+ }
+
+ @Test
+ public void fromSnapshotInclusiveWithTag() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ String tagSnapshotBName = "t2";
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+
+ /*
+ files:FILE_A files:FILE_B FILE_B files:FILE_C
FILE_C
+ ---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
+ */
+ IncrementalAppendScan scan =
newScan().fromSnapshotInclusive(tagSnapshotAName);
+ Assertions.assertThat(scan.planFiles()).hasSize(5);
+
+ IncrementalAppendScan scanWithToSnapshot =
+
newScan().fromSnapshotInclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName);
+ Assertions.assertThat(scanWithToSnapshot.planFiles()).hasSize(3);
+ }
+
+ @Test
+ public void fromSnapshotInclusiveWithBranchShouldFail() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ Assertions.assertThatThrownBy(() ->
newScan().fromSnapshotInclusive(branchName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(String.format("Ref %s is not a tag", branchName));
+
+ Assertions.assertThatThrownBy(
+ () ->
newScan().fromSnapshotInclusive(snapshotAId).toSnapshot(branchName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(String.format("Ref %s is not a tag", branchName));
+ }
+
+ @Test
+ public void testUseBranch() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ String tagName2 = "t2";
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotMainBId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagName2, snapshotMainBId).commit();
+
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+
+ table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+ long snapshotBranchBId = table.snapshot(branchName).snapshotId();
+
+ table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+ long snapshotBranchCId = table.snapshot(branchName).snapshotId();
+
+ /*
+
+ files:FILE_A files:FILE_B FILE_B files:FILE_B FILE_B
+ ---- snapshotA(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
+ \
+ \
+ \files:FILE_C
+ snapshotBranchB
+ \
+ \
+ \files:FILE_C
+ snapshotBranchC(branch:b1)
+ */
+ IncrementalAppendScan scan =
newScan().fromSnapshotInclusive(tagSnapshotAName);
+ Assertions.assertThat(scan.planFiles()).hasSize(5);
+
+ IncrementalAppendScan scan2 =
+
newScan().fromSnapshotInclusive(tagSnapshotAName).useBranch(branchName);
+ Assertions.assertThat(scan2.planFiles()).hasSize(3);
+
+ IncrementalAppendScan scan3 =
newScan().toSnapshot(snapshotBranchBId).useBranch(branchName);
+ Assertions.assertThat(scan3.planFiles()).hasSize(2);
+
+ IncrementalAppendScan scan4 =
newScan().toSnapshot(snapshotBranchCId).useBranch(branchName);
+ Assertions.assertThat(scan4.planFiles()).hasSize(3);
+
+ IncrementalAppendScan scan5 =
+ newScan()
+ .fromSnapshotExclusive(tagSnapshotAName)
+ .toSnapshot(snapshotBranchBId)
+ .useBranch(branchName);
+ Assertions.assertThat(scan5.planFiles()).hasSize(1);
+ }
+
+ @Test
+ public void testUseBranchWithTagShouldFail() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ Assertions.assertThatThrownBy(
+ () ->
newScan().fromSnapshotInclusive(snapshotAId).useBranch(tagSnapshotAName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(String.format("Ref %s is not a branch", tagSnapshotAName));
+ }
+
+ @Test
+ public void testUseBranchWithInvalidSnapshotShouldFail() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotMainBId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+ long snapshotBranchBId = table.snapshot(branchName).snapshotId();
+
+ /*
+
+ files:FILE_A files:FILE_B FILE_B
+ ---- snapshotA ------ snapshotMainB
+ \
+ \
+ \files:FILE_C
+ snapshotBranchB(branch:b1)
+ */
+ Assertions.assertThatThrownBy(
+ () ->
newScan().toSnapshot(snapshotMainBId).useBranch(branchName).planFiles())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("End snapshot is not a valid snapshot on the
current branch");
+
+ Assertions.assertThatThrownBy(
+ () ->
+
newScan().fromSnapshotInclusive(snapshotMainBId).useBranch(branchName).planFiles())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ String.format(
+ "Starting snapshot (inclusive) %s is not an ancestor of end
snapshot %s",
+ snapshotMainBId, snapshotBranchBId));
+ }
+
+ @Test
+ public void testUseBranchWithNonExistingRef() {
+ Assertions.assertThatThrownBy(() -> newScan().useBranch("nonExistingRef"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot find ref: nonExistingRef");
+ }
+
@Test
public void testFromSnapshotExclusive() {
table.newFastAppend().appendFile(FILE_A).commit();
@@ -88,6 +252,51 @@ public class TestBaseIncrementalAppendScan
Assert.assertEquals(1, Iterables.size(scanWithToSnapshot.planFiles()));
}
+ @Test
+ public void fromSnapshotExclusiveWithNonExistingRef() {
+ Assertions.assertThatThrownBy(() ->
newScan().fromSnapshotExclusive("nonExistingRef"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot find ref: nonExistingRef");
+ }
+
+ @Test
+ public void testFromSnapshotExclusiveWithTag() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ String tagSnapshotBName = "t2";
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+
+ /*
+ files:FILE_A files:FILE_B FILE_B files:FILE_C
FILE_C
+ ---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
+ */
+ IncrementalAppendScan scan =
newScan().fromSnapshotExclusive(tagSnapshotAName);
+ Assertions.assertThat(scan.planFiles()).hasSize(4);
+
+ IncrementalAppendScan scanWithToSnapshot =
+
newScan().fromSnapshotExclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName);
+ Assertions.assertThat(scanWithToSnapshot.planFiles()).hasSize(2);
+ }
+
+ @Test
+ public void fromSnapshotExclusiveWithBranchShouldFail() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ Assertions.assertThatThrownBy(() ->
newScan().fromSnapshotExclusive(branchName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(String.format("Ref %s is not a tag", branchName));
+ }
+
@Test
public void testToSnapshot() {
table.newFastAppend().appendFile(FILE_A).commit();
@@ -101,6 +310,62 @@ public class TestBaseIncrementalAppendScan
Assert.assertEquals(2, Iterables.size(scan.planFiles()));
}
+ @Test
+ public void testToSnapshotWithTag() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+ table.newFastAppend().appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName, snapshotBId).commit();
+
+ String tagSnapshotMainBName = "t1";
+ table.manageSnapshots().createTag(tagSnapshotMainBName,
snapshotBId).commit();
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+
+ String tagSnapshotBranchBName = "t2";
+ table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+ long snapshotBranchBId = table.snapshot(branchName).snapshotId();
+ table.manageSnapshots().createTag(tagSnapshotBranchBName,
snapshotBranchBId).commit();
+
+ /*
+
+ files:FILE_A files:FILE_B files:FILE_B FILE_B
+ ----snapshotA ------ snapshotMainB(tag:t1) -------- currentSnapshot
+ \
+ \
+ \files:FILE_C
+ snapshotBranchB(branch:b1, tag:t2)
+ */
+ IncrementalAppendScan scan = newScan().toSnapshot(tagSnapshotMainBName);
+ Assertions.assertThat(scan.planFiles()).hasSize(2);
+
+ IncrementalAppendScan scan2 = newScan().toSnapshot(tagSnapshotBranchBName);
+ Assertions.assertThat(scan2.planFiles()).hasSize(3);
+ }
+
+ @Test
+ public void testToSnapshotWithNonExistingRef() {
+ Assertions.assertThatThrownBy(() -> newScan().toSnapshot("nonExistingRef"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot find ref: nonExistingRef");
+ }
+
+ @Test
+ public void testToSnapshotWithBranchShouldFail() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ table.newFastAppend().appendFile(FILE_B).commit();
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName, snapshotId).commit();
+
+ Assertions.assertThatThrownBy(() -> newScan().toSnapshot(branchName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(String.format("Ref %s is not a tag", branchName));
+ }
+
@Test
public void testMultipleRootSnapshots() throws Exception {
table.newFastAppend().appendFile(FILE_A).commit();