This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6a594546b0 Spark 3.4: Only traverse ancestors of current snapshot when
building changelog scan (#10405)
6a594546b0 is described below
commit 6a594546b06df9fb75dd7e9713a8dc173e67c870
Author: Manu Zhang <[email protected]>
AuthorDate: Sun Jun 2 00:12:54 2024 +0800
Spark 3.4: Only traverse ancestors of current snapshot when building
changelog scan (#10405)
---
.../spark/extensions/TestChangelogTable.java | 63 ++++++++++++++++++++++
.../iceberg/spark/source/SparkScanBuilder.java | 13 ++++-
2 files changed, 75 insertions(+), 1 deletion(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index ab22eee006..d82ed11455 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -295,6 +295,69 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
rows);
}
+ @Test
+ public void testQueryWithRollback() {
+ createTable();
+
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+ long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis());
+
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+ long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis());
+
+ sql(
+ "CALL %s.system.rollback_to_snapshot('%s', %d)",
+ catalogName, tableIdent, snap1.snapshotId());
+ table.refresh();
+ Assert.assertEquals("Snapshot should match after rollback",
table.currentSnapshot(), snap1);
+
+ sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName);
+ table.refresh();
+ Snapshot snap3 = table.currentSnapshot();
+ long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
+
+ assertEquals(
+ "Should have expected changed rows up to snapshot 3",
+ ImmutableList.of(
+ row(1, "a", "INSERT", 0, snap1.snapshotId()),
+ row(1, "a", "DELETE", 1, snap3.snapshotId()),
+ row(-2, "a", "INSERT", 1, snap3.snapshotId())),
+ changelogRecords(null, rightAfterSnap3));
+
+ assertEquals(
+ "Should have expected changed rows up to snapshot 2",
+ ImmutableList.of(row(1, "a", "INSERT", 0, snap1.snapshotId())),
+ changelogRecords(null, rightAfterSnap2));
+
+ assertEquals(
+ "Should have expected changed rows from snapshot 3 only since snapshot
2 is on a different branch.",
+ ImmutableList.of(
+ row(1, "a", "DELETE", 0, snap3.snapshotId()),
+ row(-2, "a", "INSERT", 0, snap3.snapshotId())),
+ changelogRecords(rightAfterSnap1, snap3.timestampMillis()));
+
+ assertEquals(
+ "Should have expected changed rows from snapshot 3",
+ ImmutableList.of(
+ row(1, "a", "DELETE", 0, snap3.snapshotId()),
+ row(-2, "a", "INSERT", 0, snap3.snapshotId())),
+ changelogRecords(rightAfterSnap2, null));
+
+ sql(
+ "CALL %s.system.set_current_snapshot('%s', %d)",
+ catalogName, tableIdent, snap2.snapshotId());
+ table.refresh();
+ Assert.assertEquals("Snapshot should match after reset",
table.currentSnapshot(), snap2);
+ assertEquals(
+ "Should have expected changed rows from snapshot 2 only since snapshot
3 is on a different branch.",
+ ImmutableList.of(row(2, "b", "INSERT", 0, snap2.snapshotId())),
+ changelogRecords(rightAfterSnap1, null));
+ }
+
private void createTableWithDefaultRows() {
createTable();
insertDefaultRows();
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 09c09c6caa..d6f34231ae 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -546,7 +546,7 @@ public class SparkScanBuilder
}
if (endTimestamp != null) {
- endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table,
endTimestamp);
+ endSnapshotId = getEndSnapshotId(endTimestamp);
if ((startSnapshotId == null && endSnapshotId == null)
|| (startSnapshotId != null &&
startSnapshotId.equals(endSnapshotId))) {
emptyScan = true;
@@ -589,6 +589,17 @@ public class SparkScanBuilder
}
}
+ private Long getEndSnapshotId(Long endTimestamp) {
+ Long endSnapshotId = null;
+ for (Snapshot snapshot : SnapshotUtil.currentAncestors(table)) {
+ if (snapshot.timestampMillis() <= endTimestamp) {
+ endSnapshotId = snapshot.snapshotId();
+ break;
+ }
+ }
+ return endSnapshotId;
+ }
+
public Scan buildMergeOnReadScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null && readConf.asOfTimestamp() == null &&
readConf.tag() == null,