This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a7608281c0 Spark3.2, Spark3.3: Return empty changed rows when there 
are no snapshots between start time and end time (#8391)
a7608281c0 is described below

commit a7608281c032d88fdacb91245311a874f22fa78d
Author: Manu Zhang <[email protected]>
AuthorDate: Fri Aug 25 22:47:07 2023 +0800

    Spark3.2, Spark3.3: Return empty changed rows when there are no snapshots 
between start time and end time (#8391)
    
    Back-port of https://github.com/apache/iceberg/pull/8133 to `spark/v3.3` 
and `spark/v3.2`
---
 .../spark/extensions/TestChangelogTable.java       | 26 +++++++++++++++++-----
 .../iceberg/spark/source/SparkChangelogScan.java   |  6 ++++-
 .../iceberg/spark/source/SparkScanBuilder.java     | 23 ++++++++++++-------
 .../spark/extensions/TestChangelogTable.java       | 26 +++++++++++++++++-----
 .../iceberg/spark/source/SparkChangelogScan.java   |  6 ++++-
 .../iceberg/spark/source/SparkScanBuilder.java     | 23 ++++++++++++-------
 6 files changed, 82 insertions(+), 28 deletions(-)

diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index 603775eb11..cc81b4b3d3 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -136,6 +136,7 @@ public class TestChangelogTable extends 
SparkExtensionsTestBase {
     sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
     table.refresh();
     Snapshot snap3 = table.currentSnapshot();
+    long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
 
     assertEquals(
         "Should have expected changed rows only from snapshot 3",
@@ -166,6 +167,26 @@ public class TestChangelogTable extends 
SparkExtensionsTestBase {
             row(2, "b", "DELETE", 1, snap3.snapshotId()),
             row(-2, "b", "INSERT", 1, snap3.snapshotId())),
         changelogRecords(rightAfterSnap1, null));
+
+    assertEquals(
+        "Should have empty changed rows if end time is before the first 
snapshot",
+        ImmutableList.of(),
+        changelogRecords(null, snap1.timestampMillis() - 1));
+
+    assertEquals(
+        "Should have empty changed rows if start time is after the current 
snapshot",
+        ImmutableList.of(),
+        changelogRecords(rightAfterSnap3, null));
+
+    assertEquals(
+        "Should have empty changed rows if end time is before the first 
snapshot",
+        ImmutableList.of(),
+        changelogRecords(null, snap1.timestampMillis() - 1));
+
+    assertEquals(
+        "Should have empty changed rows if there are no snapshots between 
start time and end time",
+        ImmutableList.of(),
+        changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1));
   }
 
   @Test
@@ -185,11 +206,6 @@ public class TestChangelogTable extends 
SparkExtensionsTestBase {
         "Should fail if start time is after end time",
         IllegalArgumentException.class,
         () -> changelogRecords(snap3.timestampMillis(), 
snap2.timestampMillis()));
-
-    assertThrows(
-        "Should fail if start time is after the current snapshot",
-        IllegalArgumentException.class,
-        () -> changelogRecords(rightAfterSnap3, null));
   }
 
   @Test
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 3f7927c6d6..e68bc8aee2 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -66,7 +66,8 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
       IncrementalChangelogScan scan,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
+      List<Expression> filters,
+      boolean emptyScan) {
 
     SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), 
expectedSchema);
 
@@ -79,6 +80,9 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
     this.startSnapshotId = readConf.startSnapshotId();
     this.endSnapshotId = readConf.endSnapshotId();
     this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+    if (emptyScan) {
+      this.taskGroups = Collections.emptyList();
+    }
   }
 
   @Override
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index c1144d944a..7f628483b2 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -249,6 +249,7 @@ public class SparkScanBuilder
     return new SparkBatchQueryScan(spark, table, scan, readConf, 
expectedSchema, filterExpressions);
   }
 
+  @SuppressWarnings("CyclomaticComplexity")
   public Scan buildChangelogScan() {
     Preconditions.checkArgument(
         readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
@@ -281,12 +282,20 @@ public class SparkScanBuilder
           SparkReadOptions.END_TIMESTAMP);
     }
 
+    boolean emptyScan = false;
     if (startTimestamp != null) {
       startSnapshotId = getStartSnapshotId(startTimestamp);
+      if (startSnapshotId == null && endTimestamp == null) {
+        emptyScan = true;
+      }
     }
 
     if (endTimestamp != null) {
-      endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+      endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, 
endTimestamp);
+      if ((startSnapshotId == null && endSnapshotId == null)
+          || (startSnapshotId != null && 
startSnapshotId.equals(endSnapshotId))) {
+        emptyScan = true;
+      }
     }
 
     Schema expectedSchema = schemaWithMetadataColumns();
@@ -308,18 +317,16 @@ public class SparkScanBuilder
 
     scan = configureSplitPlanning(scan);
 
-    return new SparkChangelogScan(spark, table, scan, readConf, 
expectedSchema, filterExpressions);
+    return new SparkChangelogScan(
+        spark, table, scan, readConf, expectedSchema, filterExpressions, 
emptyScan);
   }
 
   private Long getStartSnapshotId(Long startTimestamp) {
     Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, 
startTimestamp);
-    Preconditions.checkArgument(
-        oldestSnapshotAfter != null,
-        "Cannot find a snapshot older than %s for table %s",
-        startTimestamp,
-        table.name());
 
-    if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
+    if (oldestSnapshotAfter == null) {
+      return null;
+    } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
       return oldestSnapshotAfter.snapshotId();
     } else {
       return oldestSnapshotAfter.parentId();
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index 603775eb11..cc81b4b3d3 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -136,6 +136,7 @@ public class TestChangelogTable extends 
SparkExtensionsTestBase {
     sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
     table.refresh();
     Snapshot snap3 = table.currentSnapshot();
+    long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
 
     assertEquals(
         "Should have expected changed rows only from snapshot 3",
@@ -166,6 +167,26 @@ public class TestChangelogTable extends 
SparkExtensionsTestBase {
             row(2, "b", "DELETE", 1, snap3.snapshotId()),
             row(-2, "b", "INSERT", 1, snap3.snapshotId())),
         changelogRecords(rightAfterSnap1, null));
+
+    assertEquals(
+        "Should have empty changed rows if end time is before the first 
snapshot",
+        ImmutableList.of(),
+        changelogRecords(null, snap1.timestampMillis() - 1));
+
+    assertEquals(
+        "Should have empty changed rows if start time is after the current 
snapshot",
+        ImmutableList.of(),
+        changelogRecords(rightAfterSnap3, null));
+
+    assertEquals(
+        "Should have empty changed rows if end time is before the first 
snapshot",
+        ImmutableList.of(),
+        changelogRecords(null, snap1.timestampMillis() - 1));
+
+    assertEquals(
+        "Should have empty changed rows if there are no snapshots between 
start time and end time",
+        ImmutableList.of(),
+        changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1));
   }
 
   @Test
@@ -185,11 +206,6 @@ public class TestChangelogTable extends 
SparkExtensionsTestBase {
         "Should fail if start time is after end time",
         IllegalArgumentException.class,
         () -> changelogRecords(snap3.timestampMillis(), 
snap2.timestampMillis()));
-
-    assertThrows(
-        "Should fail if start time is after the current snapshot",
-        IllegalArgumentException.class,
-        () -> changelogRecords(rightAfterSnap3, null));
   }
 
   @Test
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 54fdd186d4..0ce8d5c29e 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -69,7 +69,8 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
       IncrementalChangelogScan scan,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
+      List<Expression> filters,
+      boolean emptyScan) {
 
     SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), 
expectedSchema);
 
@@ -82,6 +83,9 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
     this.startSnapshotId = readConf.startSnapshotId();
     this.endSnapshotId = readConf.endSnapshotId();
     this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+    if (emptyScan) {
+      this.taskGroups = Collections.emptyList();
+    }
   }
 
   @Override
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 2653b9eab1..c24bcaad58 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -480,6 +480,7 @@ public class SparkScanBuilder
     return new SparkBatchQueryScan(spark, table, scan, readConf, 
expectedSchema, filterExpressions);
   }
 
+  @SuppressWarnings("CyclomaticComplexity")
   public Scan buildChangelogScan() {
     Preconditions.checkArgument(
         readConf.snapshotId() == null
@@ -517,12 +518,20 @@ public class SparkScanBuilder
           SparkReadOptions.END_TIMESTAMP);
     }
 
+    boolean emptyScan = false;
     if (startTimestamp != null) {
       startSnapshotId = getStartSnapshotId(startTimestamp);
+      if (startSnapshotId == null && endTimestamp == null) {
+        emptyScan = true;
+      }
     }
 
     if (endTimestamp != null) {
-      endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+      endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, 
endTimestamp);
+      if ((startSnapshotId == null && endSnapshotId == null)
+          || (startSnapshotId != null && 
startSnapshotId.equals(endSnapshotId))) {
+        emptyScan = true;
+      }
     }
 
     Schema expectedSchema = schemaWithMetadataColumns();
@@ -544,18 +553,16 @@ public class SparkScanBuilder
 
     scan = configureSplitPlanning(scan);
 
-    return new SparkChangelogScan(spark, table, scan, readConf, 
expectedSchema, filterExpressions);
+    return new SparkChangelogScan(
+        spark, table, scan, readConf, expectedSchema, filterExpressions, 
emptyScan);
   }
 
   private Long getStartSnapshotId(Long startTimestamp) {
     Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, 
startTimestamp);
-    Preconditions.checkArgument(
-        oldestSnapshotAfter != null,
-        "Cannot find a snapshot older than %s for table %s",
-        startTimestamp,
-        table.name());
 
-    if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
+    if (oldestSnapshotAfter == null) {
+      return null;
+    } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
       return oldestSnapshotAfter.snapshotId();
     } else {
       return oldestSnapshotAfter.parentId();

Reply via email to