liucao-dd commented on code in PR #16679:
URL: https://github.com/apache/iceberg/pull/16679#discussion_r3354342817


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java:
##########
@@ -31,14 +35,43 @@ class MicroBatchUtils {
   private MicroBatchUtils() {}
 
   static StreamingOffset determineStartingOffset(Table table, long 
fromTimestamp) {
+    return determineStartingOffset(table, fromTimestamp, null);
+  }
+
+  static StreamingOffset determineStartingOffset(
+      Table table, long fromTimestamp, String fromSnapshot) {
+    ValidationException.check(
+        fromSnapshot == null || fromTimestamp == Long.MIN_VALUE,
+        "Cannot set both '%s' and '%s' options",
+        SparkReadOptions.STREAM_FROM_SNAPSHOT,
+        SparkReadOptions.STREAM_FROM_TIMESTAMP);
+
     if (table.currentSnapshot() == null) {
       return StreamingOffset.START_OFFSET;
     }
 
+    if (fromSnapshot != null) {
+      return startingOffsetFromSnapshotOption(table, fromSnapshot);
+    }
+
     if (fromTimestamp == Long.MIN_VALUE) {
-      // start from the oldest snapshot, since default value is MIN_VALUE
-      // avoids looping to find first snapshot
-      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+      // read the initial snapshot in full, then continue with incremental 
changes
+      Snapshot current = table.currentSnapshot();
+
+      // Refuse row-level deletes rather than silently emit them
+      ValidationException.check(
+          current.deleteManifests(table.io()).isEmpty(),
+          "Cannot stream initial snapshot %d in full: snapshot has row-level 
deletes "
+              + "(V2 positional/equality delete files or V3 deletion vectors), 
which the "
+              + "Iceberg streaming source does not apply. Set '%s' to one of: 
'%s' "
+              + "(skip the backlog), '%s' (replay history 
snapshot-by-snapshot, with "
+              + 
"'streaming-skip-overwrite-snapshots'/'streaming-skip-delete-snapshots' as "
+              + "needed), or a specific snapshot id to start after.",
+          current.snapshotId(),
+          SparkReadOptions.STREAM_FROM_SNAPSHOT,
+          SparkReadOptions.STREAM_FROM_SNAPSHOT_LATEST,
+          SparkReadOptions.STREAM_FROM_SNAPSHOT_EARLIEST);
+      return new StreamingOffset(current.snapshotId(), 0L, true);

Review Comment:
   This changes the fresh-checkpoint default from replaying incremental history 
from the oldest ancestor to reading only the current table state. That is a 
breaking semantic change for existing streaming jobs/backfills that rely on the 
no-option behavior, and it can also turn previously valid streams into startup 
failures when the current snapshot has live row-level delete manifests.
   
   Can we keep the existing default and make initial-snapshot loading explicit 
behind the new option? If the project wants to flip the default, it seems worth 
doing with a documented migration/deprecation path rather than in the same PR 
that introduces the feature.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java:
##########
@@ -58,6 +91,57 @@ static StreamingOffset determineStartingOffset(Table table, 
long fromTimestamp)
     }
   }
 
+  private static StreamingOffset startingOffsetFromSnapshotOption(
+      Table table, String fromSnapshot) {
+    String normalized = fromSnapshot.toLowerCase(Locale.ROOT);
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    if (SparkReadOptions.STREAM_FROM_SNAPSHOT_LATEST.equals(normalized)) {
+      // start from current snapshot, skipping backlog
+      return new StreamingOffset(
+          currentSnapshot.snapshotId(), addedFilesCount(table, 
currentSnapshot), false);
+    }
+
+    if (SparkReadOptions.STREAM_FROM_SNAPSHOT_EARLIEST.equals(normalized)) {
+      // start from oldest snapshot
+      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0L, false);
+    }
+
+    long fromSnapshotId;
+    try {
+      fromSnapshotId = Long.parseLong(fromSnapshot);
+    } catch (NumberFormatException e) {
+      throw new ValidationException(
+          "Invalid value for '%s': %s. Expected a snapshot id, '%s', or '%s'.",
+          SparkReadOptions.STREAM_FROM_SNAPSHOT,
+          fromSnapshot,
+          SparkReadOptions.STREAM_FROM_SNAPSHOT_LATEST,
+          SparkReadOptions.STREAM_FROM_SNAPSHOT_EARLIEST);
+    }
+
+    ValidationException.check(
+        table.snapshot(fromSnapshotId) != null,
+        "Cannot find snapshot for '%s': %s",
+        SparkReadOptions.STREAM_FROM_SNAPSHOT,
+        fromSnapshotId);
+    ValidationException.check(
+        SnapshotUtil.isAncestorOf(table, currentSnapshot.snapshotId(), 
fromSnapshotId),
+        "Snapshot %s is not an ancestor of the current snapshot",
+        fromSnapshotId);
+
+    if (fromSnapshotId == currentSnapshot.snapshotId()) {
+      // The requested snapshot is the current snapshot, nothing after it yet
+      return StreamingOffset.START_OFFSET;

Review Comment:
   I think this breaks `stream-from-snapshot=<current snapshot id>`. This 
persists `START_OFFSET` (`snapshotId=-1`, `position=-1`) even though the 
current snapshot is a valid exclusive starting point. The sync planner can 
later resolve this through `table.snapshot(-1)` and fail, while async paths may 
fall back into default starting-offset behavior.
   
   This should probably return an offset at the end of the current snapshot, 
similar to `latest`, so the stream has no backlog and waits for future 
snapshots.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java:
##########
@@ -31,14 +35,43 @@ class MicroBatchUtils {
   private MicroBatchUtils() {}
 
   static StreamingOffset determineStartingOffset(Table table, long 
fromTimestamp) {
+    return determineStartingOffset(table, fromTimestamp, null);

Review Comment:
   This overload now drops the new `stream-from-snapshot` option by always 
passing `null`. There is still an async planner path 
(`BaseSparkMicroBatchPlanner.nextValidSnapshot(null)`) that calls the two-arg 
overload, so async planning can bypass the new option's validation and 
semantics.
   
   Can we plumb `readConf.streamFromSnapshot()` through that path instead of 
relying on this overload? A test with async planning enabled and an explicit 
`stream-from-snapshot` value would help catch this.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -80,6 +80,15 @@ private SparkReadOptions() {}
   // Timestamp in milliseconds; start a stream from the snapshot that occurs 
after this timestamp
   public static final String STREAM_FROM_TIMESTAMP = "stream-from-timestamp";
 
+  // Starting point for a streaming read. Accepts a snapshot id, "latest", or 
"earliest"
+  public static final String STREAM_FROM_SNAPSHOT = "stream-from-snapshot";

Review Comment:
   TODO reminder for later: before this graduates from draft/merge, please 
update the Spark structured streaming/read-options docs and either port this 
option to the other active Spark source trees (`v4.0`/`v4.1`) or clearly 
document why this PR is intentionally scoped to Spark 3.5 only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to