This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a7608281c0 Spark3.2, Spark3.3: Return empty changed rows when there
are no snapshots between start time and end time (#8391)
a7608281c0 is described below
commit a7608281c032d88fdacb91245311a874f22fa78d
Author: Manu Zhang <[email protected]>
AuthorDate: Fri Aug 25 22:47:07 2023 +0800
Spark3.2, Spark3.3: Return empty changed rows when there are no snapshots
between start time and end time (#8391)
Back-port of https://github.com/apache/iceberg/pull/8133 to `spark/v3.3`
and `spark/v3.2`
---
.../spark/extensions/TestChangelogTable.java | 26 +++++++++++++++++-----
.../iceberg/spark/source/SparkChangelogScan.java | 6 ++++-
.../iceberg/spark/source/SparkScanBuilder.java | 23 ++++++++++++-------
.../spark/extensions/TestChangelogTable.java | 26 +++++++++++++++++-----
.../iceberg/spark/source/SparkChangelogScan.java | 6 ++++-
.../iceberg/spark/source/SparkScanBuilder.java | 23 ++++++++++++-------
6 files changed, 82 insertions(+), 28 deletions(-)
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index 603775eb11..cc81b4b3d3 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -136,6 +136,7 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
+ long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
assertEquals(
"Should have expected changed rows only from snapshot 3",
@@ -166,6 +167,26 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
row(2, "b", "DELETE", 1, snap3.snapshotId()),
row(-2, "b", "INSERT", 1, snap3.snapshotId())),
changelogRecords(rightAfterSnap1, null));
+
+ assertEquals(
+ "Should have empty changed rows if end time is before the first
snapshot",
+ ImmutableList.of(),
+ changelogRecords(null, snap1.timestampMillis() - 1));
+
+ assertEquals(
+ "Should have empty changed rows if start time is after the current
snapshot",
+ ImmutableList.of(),
+ changelogRecords(rightAfterSnap3, null));
+
+ assertEquals(
+ "Should have empty changed rows if end time is before the first
snapshot",
+ ImmutableList.of(),
+ changelogRecords(null, snap1.timestampMillis() - 1));
+
+ assertEquals(
+ "Should have empty changed rows if there are no snapshots between
start time and end time",
+ ImmutableList.of(),
+ changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1));
}
@Test
@@ -185,11 +206,6 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
"Should fail if start time is after end time",
IllegalArgumentException.class,
() -> changelogRecords(snap3.timestampMillis(),
snap2.timestampMillis()));
-
- assertThrows(
- "Should fail if start time is after the current snapshot",
- IllegalArgumentException.class,
- () -> changelogRecords(rightAfterSnap3, null));
}
@Test
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 3f7927c6d6..e68bc8aee2 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -66,7 +66,8 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
IncrementalChangelogScan scan,
SparkReadConf readConf,
Schema expectedSchema,
- List<Expression> filters) {
+ List<Expression> filters,
+ boolean emptyScan) {
SparkSchemaUtil.validateMetadataColumnReferences(table.schema(),
expectedSchema);
@@ -79,6 +80,9 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+ if (emptyScan) {
+ this.taskGroups = Collections.emptyList();
+ }
}
@Override
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index c1144d944a..7f628483b2 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -249,6 +249,7 @@ public class SparkScanBuilder
return new SparkBatchQueryScan(spark, table, scan, readConf,
expectedSchema, filterExpressions);
}
+ @SuppressWarnings("CyclomaticComplexity")
public Scan buildChangelogScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
@@ -281,12 +282,20 @@ public class SparkScanBuilder
SparkReadOptions.END_TIMESTAMP);
}
+ boolean emptyScan = false;
if (startTimestamp != null) {
startSnapshotId = getStartSnapshotId(startTimestamp);
+ if (startSnapshotId == null && endTimestamp == null) {
+ emptyScan = true;
+ }
}
if (endTimestamp != null) {
- endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+ endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table,
endTimestamp);
+ if ((startSnapshotId == null && endSnapshotId == null)
+ || (startSnapshotId != null &&
startSnapshotId.equals(endSnapshotId))) {
+ emptyScan = true;
+ }
}
Schema expectedSchema = schemaWithMetadataColumns();
@@ -308,18 +317,16 @@ public class SparkScanBuilder
scan = configureSplitPlanning(scan);
- return new SparkChangelogScan(spark, table, scan, readConf,
expectedSchema, filterExpressions);
+ return new SparkChangelogScan(
+ spark, table, scan, readConf, expectedSchema, filterExpressions,
emptyScan);
}
private Long getStartSnapshotId(Long startTimestamp) {
Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table,
startTimestamp);
- Preconditions.checkArgument(
- oldestSnapshotAfter != null,
- "Cannot find a snapshot older than %s for table %s",
- startTimestamp,
- table.name());
- if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
+ if (oldestSnapshotAfter == null) {
+ return null;
+ } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
return oldestSnapshotAfter.snapshotId();
} else {
return oldestSnapshotAfter.parentId();
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index 603775eb11..cc81b4b3d3 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -136,6 +136,7 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
+ long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
assertEquals(
"Should have expected changed rows only from snapshot 3",
@@ -166,6 +167,26 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
row(2, "b", "DELETE", 1, snap3.snapshotId()),
row(-2, "b", "INSERT", 1, snap3.snapshotId())),
changelogRecords(rightAfterSnap1, null));
+
+ assertEquals(
+ "Should have empty changed rows if end time is before the first
snapshot",
+ ImmutableList.of(),
+ changelogRecords(null, snap1.timestampMillis() - 1));
+
+ assertEquals(
+ "Should have empty changed rows if start time is after the current
snapshot",
+ ImmutableList.of(),
+ changelogRecords(rightAfterSnap3, null));
+
+ assertEquals(
+ "Should have empty changed rows if end time is before the first
snapshot",
+ ImmutableList.of(),
+ changelogRecords(null, snap1.timestampMillis() - 1));
+
+ assertEquals(
+ "Should have empty changed rows if there are no snapshots between
start time and end time",
+ ImmutableList.of(),
+ changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1));
}
@Test
@@ -185,11 +206,6 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
"Should fail if start time is after end time",
IllegalArgumentException.class,
() -> changelogRecords(snap3.timestampMillis(),
snap2.timestampMillis()));
-
- assertThrows(
- "Should fail if start time is after the current snapshot",
- IllegalArgumentException.class,
- () -> changelogRecords(rightAfterSnap3, null));
}
@Test
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 54fdd186d4..0ce8d5c29e 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -69,7 +69,8 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
IncrementalChangelogScan scan,
SparkReadConf readConf,
Schema expectedSchema,
- List<Expression> filters) {
+ List<Expression> filters,
+ boolean emptyScan) {
SparkSchemaUtil.validateMetadataColumnReferences(table.schema(),
expectedSchema);
@@ -82,6 +83,9 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+ if (emptyScan) {
+ this.taskGroups = Collections.emptyList();
+ }
}
@Override
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 2653b9eab1..c24bcaad58 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -480,6 +480,7 @@ public class SparkScanBuilder
return new SparkBatchQueryScan(spark, table, scan, readConf,
expectedSchema, filterExpressions);
}
+ @SuppressWarnings("CyclomaticComplexity")
public Scan buildChangelogScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null
@@ -517,12 +518,20 @@ public class SparkScanBuilder
SparkReadOptions.END_TIMESTAMP);
}
+ boolean emptyScan = false;
if (startTimestamp != null) {
startSnapshotId = getStartSnapshotId(startTimestamp);
+ if (startSnapshotId == null && endTimestamp == null) {
+ emptyScan = true;
+ }
}
if (endTimestamp != null) {
- endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+ endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table,
endTimestamp);
+ if ((startSnapshotId == null && endSnapshotId == null)
+ || (startSnapshotId != null &&
startSnapshotId.equals(endSnapshotId))) {
+ emptyScan = true;
+ }
}
Schema expectedSchema = schemaWithMetadataColumns();
@@ -544,18 +553,16 @@ public class SparkScanBuilder
scan = configureSplitPlanning(scan);
- return new SparkChangelogScan(spark, table, scan, readConf,
expectedSchema, filterExpressions);
+ return new SparkChangelogScan(
+ spark, table, scan, readConf, expectedSchema, filterExpressions,
emptyScan);
}
private Long getStartSnapshotId(Long startTimestamp) {
Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table,
startTimestamp);
- Preconditions.checkArgument(
- oldestSnapshotAfter != null,
- "Cannot find a snapshot older than %s for table %s",
- startTimestamp,
- table.name());
- if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
+ if (oldestSnapshotAfter == null) {
+ return null;
+ } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
return oldestSnapshotAfter.snapshotId();
} else {
return oldestSnapshotAfter.parentId();