aokolnychyi commented on code in PR #15240:
URL: https://github.com/apache/iceberg/pull/15240#discussion_r2886898616
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -343,209 +336,61 @@ private org.apache.iceberg.Scan
buildIncrementalAppendScan(
return configureSplitPlanning(scan);
}
- @SuppressWarnings("CyclomaticComplexity")
- public Scan buildChangelogScan() {
- Preconditions.checkArgument(
- readConf().snapshotId() == null
- && readConf().asOfTimestamp() == null
- && readConf().branch() == null
- && readConf().tag() == null,
- "Cannot set neither %s, %s, %s and %s for changelogs",
- SparkReadOptions.SNAPSHOT_ID,
- SparkReadOptions.AS_OF_TIMESTAMP,
- SparkReadOptions.BRANCH,
- SparkReadOptions.TAG);
-
- Long startSnapshotId = readConf().startSnapshotId();
- Long endSnapshotId = readConf().endSnapshotId();
- Long startTimestamp = readConf().startTimestamp();
- Long endTimestamp = readConf().endTimestamp();
-
- Preconditions.checkArgument(
- !(startSnapshotId != null && startTimestamp != null),
- "Cannot set both %s and %s for changelogs",
- SparkReadOptions.START_SNAPSHOT_ID,
- SparkReadOptions.START_TIMESTAMP);
-
- Preconditions.checkArgument(
- !(endSnapshotId != null && endTimestamp != null),
- "Cannot set both %s and %s for changelogs",
- SparkReadOptions.END_SNAPSHOT_ID,
- SparkReadOptions.END_TIMESTAMP);
-
- if (startTimestamp != null && endTimestamp != null) {
- Preconditions.checkArgument(
- startTimestamp < endTimestamp,
- "Cannot set %s to be greater than %s for changelogs",
- SparkReadOptions.START_TIMESTAMP,
- SparkReadOptions.END_TIMESTAMP);
- }
-
- boolean emptyScan = false;
- if (startTimestamp != null) {
- if (table().currentSnapshot() == null
- || startTimestamp > table().currentSnapshot().timestampMillis()) {
- emptyScan = true;
- }
- startSnapshotId = getStartSnapshotId(startTimestamp);
- }
-
- if (endTimestamp != null) {
- endSnapshotId = getEndSnapshotId(endTimestamp);
- if ((startSnapshotId == null && endSnapshotId == null)
- || (startSnapshotId != null &&
startSnapshotId.equals(endSnapshotId))) {
- emptyScan = true;
- }
+ private BatchScan buildIcebergBatchScan(
+ Schema projection, boolean ignoreResiduals, boolean withStats) {
+ if (shouldPinSnapshot() && snapshot == null) {
+ return null;
}
- Schema projection = projectionWithMetadataColumns();
-
- IncrementalChangelogScan scan =
- table()
- .newIncrementalChangelogScan()
+ BatchScan scan =
+ newIcebergBatchScan()
.caseSensitive(caseSensitive())
.filter(filter())
.project(projection)
.metricsReporter(metricsReporter());
- if (startSnapshotId != null) {
- scan = scan.fromSnapshotExclusive(startSnapshotId);
+ if (shouldPinSnapshot() || timeTravel != null) {
+ scan = scan.useSnapshot(snapshot.snapshotId());
}
- if (endSnapshotId != null) {
- scan = scan.toSnapshot(endSnapshotId);
+ Preconditions.checkState(
+ Objects.equals(snapshot, scan.snapshot()),
+ "Failed to enforce scan consistency: resolved Spark table snapshot
(%s) vs scan snapshot (%s)",
+ snapshot,
+ scan.snapshot());
+
+ if (ignoreResiduals) {
+ scan = scan.ignoreResiduals();
}
- scan = configureSplitPlanning(scan);
+ if (withStats) {
+ scan = scan.includeColumnStats();
+ }
- return new SparkChangelogScan(
- spark(), table(), scan, readConf(), projection, filters(), emptyScan);
+ return configureSplitPlanning(scan);
}
- private Long getStartSnapshotId(Long startTimestamp) {
- Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table(),
startTimestamp);
-
- if (oldestSnapshotAfter == null) {
- return null;
- } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
- return oldestSnapshotAfter.snapshotId();
+ private BatchScan newIcebergBatchScan() {
+ if (readConf().distributedPlanningEnabled()) {
+ return new SparkDistributedDataScan(spark(), table(), readConf());
} else {
- return oldestSnapshotAfter.parentId();
+ return table().newBatchScan();
}
}
- private Long getEndSnapshotId(Long endTimestamp) {
- Long endSnapshotId = null;
- for (Snapshot snapshot : SnapshotUtil.currentAncestors(table())) {
- if (snapshot.timestampMillis() <= endTimestamp) {
- endSnapshotId = snapshot.snapshotId();
- break;
- }
- }
- return endSnapshotId;
+ private boolean shouldPinSnapshot() {
+ return isMainTable() || isMetadataTableWithTimeTravel();
Review Comment:
It means it scans the main / original table (i.e. data scan). I would hold
off with adding this to `Table` as that's very public and we would want to have
more evidence it is useful in many scenarios.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]