This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 732e305413 Spark 3.4: Return empty changed rows when there are no
snapshots between start time and end time (#8133)
732e305413 is described below
commit 732e30541348f977e2c7783b1ba2ab4117721efe
Author: Manu Zhang <[email protected]>
AuthorDate: Wed Aug 16 13:51:09 2023 +0800
Spark 3.4: Return empty changed rows when there are no snapshots between
start time and end time (#8133)
---
.../java/org/apache/iceberg/util/SnapshotUtil.java | 15 +++++++++----
.../spark/extensions/TestChangelogTable.java | 26 +++++++++++++++++-----
.../iceberg/spark/source/SparkChangelogScan.java | 6 ++++-
.../iceberg/spark/source/SparkScanBuilder.java | 23 ++++++++++++-------
4 files changed, 52 insertions(+), 18 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
index 2f9590dfcd..75d4493691 100644
--- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -337,6 +337,17 @@ public class SnapshotUtil {
* timestamp
*/
public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
+ Long snapshotId = nullableSnapshotIdAsOfTime(table, timestampMillis);
+
+ Preconditions.checkArgument(
+ snapshotId != null,
+ "Cannot find a snapshot older than %s",
+ DateTimeUtil.formatTimestampMillis(timestampMillis));
+
+ return snapshotId;
+ }
+
+ public static Long nullableSnapshotIdAsOfTime(Table table, long
timestampMillis) {
Long snapshotId = null;
for (HistoryEntry logEntry : table.history()) {
if (logEntry.timestampMillis() <= timestampMillis) {
@@ -344,10 +355,6 @@ public class SnapshotUtil {
}
}
- Preconditions.checkArgument(
- snapshotId != null,
- "Cannot find a snapshot older than %s",
- DateTimeUtil.formatTimestampMillis(timestampMillis));
return snapshotId;
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index 603775eb11..cc81b4b3d3 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -136,6 +136,7 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
+ long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
assertEquals(
"Should have expected changed rows only from snapshot 3",
@@ -166,6 +167,26 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
row(2, "b", "DELETE", 1, snap3.snapshotId()),
row(-2, "b", "INSERT", 1, snap3.snapshotId())),
changelogRecords(rightAfterSnap1, null));
+
+ assertEquals(
+ "Should have empty changed rows if end time is before the first
snapshot",
+ ImmutableList.of(),
+ changelogRecords(null, snap1.timestampMillis() - 1));
+
+ assertEquals(
+ "Should have empty changed rows if start time is after the current
snapshot",
+ ImmutableList.of(),
+ changelogRecords(rightAfterSnap3, null));
+
+ assertEquals(
+ "Should have empty changed rows if end time is before the first
snapshot",
+ ImmutableList.of(),
+ changelogRecords(null, snap1.timestampMillis() - 1));
+
+ assertEquals(
+ "Should have empty changed rows if there are no snapshots between
start time and end time",
+ ImmutableList.of(),
+ changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1));
}
@Test
@@ -185,11 +206,6 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
"Should fail if start time is after end time",
IllegalArgumentException.class,
() -> changelogRecords(snap3.timestampMillis(),
snap2.timestampMillis()));
-
- assertThrows(
- "Should fail if start time is after the current snapshot",
- IllegalArgumentException.class,
- () -> changelogRecords(rightAfterSnap3, null));
}
@Test
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index d919157548..7cde3e1fbe 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -66,7 +66,8 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
IncrementalChangelogScan scan,
SparkReadConf readConf,
Schema expectedSchema,
- List<Expression> filters) {
+ List<Expression> filters,
+ boolean emptyScan) {
SparkSchemaUtil.validateMetadataColumnReferences(table.schema(),
expectedSchema);
@@ -78,6 +79,9 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
this.filters = filters != null ? filters : Collections.emptyList();
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
+ if (emptyScan) {
+ this.taskGroups = Collections.emptyList();
+ }
}
@Override
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 3ed07f92d6..3a430cd86f 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -498,6 +498,7 @@ public class SparkScanBuilder
metricsReporter::scanReport);
}
+ @SuppressWarnings("CyclomaticComplexity")
public Scan buildChangelogScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null
@@ -535,12 +536,20 @@ public class SparkScanBuilder
SparkReadOptions.END_TIMESTAMP);
}
+ boolean emptyScan = false;
if (startTimestamp != null) {
startSnapshotId = getStartSnapshotId(startTimestamp);
+ if (startSnapshotId == null && endTimestamp == null) {
+ emptyScan = true;
+ }
}
if (endTimestamp != null) {
- endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+ endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table,
endTimestamp);
+ if ((startSnapshotId == null && endSnapshotId == null)
+ || (startSnapshotId != null &&
startSnapshotId.equals(endSnapshotId))) {
+ emptyScan = true;
+ }
}
Schema expectedSchema = schemaWithMetadataColumns();
@@ -562,18 +571,16 @@ public class SparkScanBuilder
scan = configureSplitPlanning(scan);
- return new SparkChangelogScan(spark, table, scan, readConf,
expectedSchema, filterExpressions);
+ return new SparkChangelogScan(
+ spark, table, scan, readConf, expectedSchema, filterExpressions,
emptyScan);
}
private Long getStartSnapshotId(Long startTimestamp) {
Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table,
startTimestamp);
- Preconditions.checkArgument(
- oldestSnapshotAfter != null,
- "Cannot find a snapshot older than %s for table %s",
- startTimestamp,
- table.name());
- if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
+ if (oldestSnapshotAfter == null) {
+ return null;
+ } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
return oldestSnapshotAfter.snapshotId();
} else {
return oldestSnapshotAfter.parentId();