This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a594546b0 Spark 3.4: Only traverse ancestors of current snapshot when 
building changelog scan (#10405)
6a594546b0 is described below

commit 6a594546b06df9fb75dd7e9713a8dc173e67c870
Author: Manu Zhang <[email protected]>
AuthorDate: Sun Jun 2 00:12:54 2024 +0800

    Spark 3.4: Only traverse ancestors of current snapshot when building 
changelog scan (#10405)
---
 .../spark/extensions/TestChangelogTable.java       | 63 ++++++++++++++++++++++
 .../iceberg/spark/source/SparkScanBuilder.java     | 13 ++++-
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index ab22eee006..d82ed11455 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -295,6 +295,69 @@ public class TestChangelogTable extends 
SparkExtensionsTestBase {
         rows);
   }
 
+  @Test
+  public void testQueryWithRollback() {
+    createTable();
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+    long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis());
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+    long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis());
+
+    sql(
+        "CALL %s.system.rollback_to_snapshot('%s', %d)",
+        catalogName, tableIdent, snap1.snapshotId());
+    table.refresh();
+    Assert.assertEquals("Snapshot should match after rollback", 
table.currentSnapshot(), snap1);
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName);
+    table.refresh();
+    Snapshot snap3 = table.currentSnapshot();
+    long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
+
+    assertEquals(
+        "Should have expected changed rows up to snapshot 3",
+        ImmutableList.of(
+            row(1, "a", "INSERT", 0, snap1.snapshotId()),
+            row(1, "a", "DELETE", 1, snap3.snapshotId()),
+            row(-2, "a", "INSERT", 1, snap3.snapshotId())),
+        changelogRecords(null, rightAfterSnap3));
+
+    assertEquals(
+        "Should have expected changed rows up to snapshot 2",
+        ImmutableList.of(row(1, "a", "INSERT", 0, snap1.snapshotId())),
+        changelogRecords(null, rightAfterSnap2));
+
+    assertEquals(
+        "Should have expected changed rows from snapshot 3 only since snapshot 
2 is on a different branch.",
+        ImmutableList.of(
+            row(1, "a", "DELETE", 0, snap3.snapshotId()),
+            row(-2, "a", "INSERT", 0, snap3.snapshotId())),
+        changelogRecords(rightAfterSnap1, snap3.timestampMillis()));
+
+    assertEquals(
+        "Should have expected changed rows from snapshot 3",
+        ImmutableList.of(
+            row(1, "a", "DELETE", 0, snap3.snapshotId()),
+            row(-2, "a", "INSERT", 0, snap3.snapshotId())),
+        changelogRecords(rightAfterSnap2, null));
+
+    sql(
+        "CALL %s.system.set_current_snapshot('%s', %d)",
+        catalogName, tableIdent, snap2.snapshotId());
+    table.refresh();
+    Assert.assertEquals("Snapshot should match after reset", 
table.currentSnapshot(), snap2);
+    assertEquals(
+        "Should have expected changed rows from snapshot 2 only since snapshot 
3 is on a different branch.",
+        ImmutableList.of(row(2, "b", "INSERT", 0, snap2.snapshotId())),
+        changelogRecords(rightAfterSnap1, null));
+  }
+
   private void createTableWithDefaultRows() {
     createTable();
     insertDefaultRows();
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 09c09c6caa..d6f34231ae 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -546,7 +546,7 @@ public class SparkScanBuilder
     }
 
     if (endTimestamp != null) {
-      endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, 
endTimestamp);
+      endSnapshotId = getEndSnapshotId(endTimestamp);
       if ((startSnapshotId == null && endSnapshotId == null)
           || (startSnapshotId != null && 
startSnapshotId.equals(endSnapshotId))) {
         emptyScan = true;
@@ -589,6 +589,17 @@ public class SparkScanBuilder
     }
   }
 
+  private Long getEndSnapshotId(Long endTimestamp) {
+    Long endSnapshotId = null;
+    for (Snapshot snapshot : SnapshotUtil.currentAncestors(table)) {
+      if (snapshot.timestampMillis() <= endTimestamp) {
+        endSnapshotId = snapshot.snapshotId();
+        break;
+      }
+    }
+    return endSnapshotId;
+  }
+
   public Scan buildMergeOnReadScan() {
     Preconditions.checkArgument(
         readConf.snapshotId() == null && readConf.asOfTimestamp() == null && 
readConf.tag() == null,

Reply via email to