manuzhang commented on code in PR #15240:
URL: https://github.com/apache/iceberg/pull/15240#discussion_r2871180542


##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -343,209 +336,61 @@ private org.apache.iceberg.Scan 
buildIncrementalAppendScan(
     return configureSplitPlanning(scan);
   }
 
-  @SuppressWarnings("CyclomaticComplexity")
-  public Scan buildChangelogScan() {
-    Preconditions.checkArgument(
-        readConf().snapshotId() == null
-            && readConf().asOfTimestamp() == null
-            && readConf().branch() == null
-            && readConf().tag() == null,
-        "Cannot set neither %s, %s, %s and %s for changelogs",
-        SparkReadOptions.SNAPSHOT_ID,
-        SparkReadOptions.AS_OF_TIMESTAMP,
-        SparkReadOptions.BRANCH,
-        SparkReadOptions.TAG);
-
-    Long startSnapshotId = readConf().startSnapshotId();
-    Long endSnapshotId = readConf().endSnapshotId();
-    Long startTimestamp = readConf().startTimestamp();
-    Long endTimestamp = readConf().endTimestamp();
-
-    Preconditions.checkArgument(
-        !(startSnapshotId != null && startTimestamp != null),
-        "Cannot set both %s and %s for changelogs",
-        SparkReadOptions.START_SNAPSHOT_ID,
-        SparkReadOptions.START_TIMESTAMP);
-
-    Preconditions.checkArgument(
-        !(endSnapshotId != null && endTimestamp != null),
-        "Cannot set both %s and %s for changelogs",
-        SparkReadOptions.END_SNAPSHOT_ID,
-        SparkReadOptions.END_TIMESTAMP);
-
-    if (startTimestamp != null && endTimestamp != null) {
-      Preconditions.checkArgument(
-          startTimestamp < endTimestamp,
-          "Cannot set %s to be greater than %s for changelogs",
-          SparkReadOptions.START_TIMESTAMP,
-          SparkReadOptions.END_TIMESTAMP);
-    }
-
-    boolean emptyScan = false;
-    if (startTimestamp != null) {
-      if (table().currentSnapshot() == null
-          || startTimestamp > table().currentSnapshot().timestampMillis()) {
-        emptyScan = true;
-      }
-      startSnapshotId = getStartSnapshotId(startTimestamp);
-    }
-
-    if (endTimestamp != null) {
-      endSnapshotId = getEndSnapshotId(endTimestamp);
-      if ((startSnapshotId == null && endSnapshotId == null)
-          || (startSnapshotId != null && 
startSnapshotId.equals(endSnapshotId))) {
-        emptyScan = true;
-      }
+  private BatchScan buildIcebergBatchScan(
+      Schema projection, boolean ignoreResiduals, boolean withStats) {
+    if (shouldPinSnapshot() && snapshot == null) {
+      return null;
     }
 
-    Schema projection = projectionWithMetadataColumns();
-
-    IncrementalChangelogScan scan =
-        table()
-            .newIncrementalChangelogScan()
+    BatchScan scan =
+        newIcebergBatchScan()
             .caseSensitive(caseSensitive())
             .filter(filter())
             .project(projection)
             .metricsReporter(metricsReporter());
 
-    if (startSnapshotId != null) {
-      scan = scan.fromSnapshotExclusive(startSnapshotId);
+    if (shouldPinSnapshot() || timeTravel != null) {
+      scan = scan.useSnapshot(snapshot.snapshotId());
     }
 
-    if (endSnapshotId != null) {
-      scan = scan.toSnapshot(endSnapshotId);
+    Preconditions.checkState(
+        Objects.equals(snapshot, scan.snapshot()),
+        "Failed to enforce scan consistency: resolved Spark table snapshot 
(%s) vs scan snapshot (%s)",
+        snapshot,
+        scan.snapshot());
+
+    if (ignoreResiduals) {
+      scan = scan.ignoreResiduals();
     }
 
-    scan = configureSplitPlanning(scan);
+    if (withStats) {
+      scan = scan.includeColumnStats();
+    }
 
-    return new SparkChangelogScan(
-        spark(), table(), scan, readConf(), projection, filters(), emptyScan);
+    return configureSplitPlanning(scan);
   }
 
-  private Long getStartSnapshotId(Long startTimestamp) {
-    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table(), 
startTimestamp);
-
-    if (oldestSnapshotAfter == null) {
-      return null;
-    } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
-      return oldestSnapshotAfter.snapshotId();
+  private BatchScan newIcebergBatchScan() {
+    if (readConf().distributedPlanningEnabled()) {
+      return new SparkDistributedDataScan(spark(), table(), readConf());
     } else {
-      return oldestSnapshotAfter.parentId();
+      return table().newBatchScan();
     }
   }
 
-  private Long getEndSnapshotId(Long endTimestamp) {
-    Long endSnapshotId = null;
-    for (Snapshot snapshot : SnapshotUtil.currentAncestors(table())) {
-      if (snapshot.timestampMillis() <= endTimestamp) {
-        endSnapshotId = snapshot.snapshotId();
-        break;
-      }
-    }
-    return endSnapshotId;
+  private boolean shouldPinSnapshot() {
+    return isMainTable() || isMetadataTableWithTimeTravel();

Review Comment:
   Not sure that I get the meaning of `mainTable` here. Would it be possible to 
add `supportsTimeTravel` to the `Table` interface (returning `false` by 
default), such that we don't need to check the table type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to