alexprosak commented on code in PR #16679:
URL: https://github.com/apache/iceberg/pull/16679#discussion_r3358030996


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java:
##########
@@ -31,14 +35,43 @@ class MicroBatchUtils {
   private MicroBatchUtils() {}
 
   static StreamingOffset determineStartingOffset(Table table, long 
fromTimestamp) {
+    return determineStartingOffset(table, fromTimestamp, null);
+  }
+
+  static StreamingOffset determineStartingOffset(
+      Table table, long fromTimestamp, String fromSnapshot) {
+    ValidationException.check(
+        fromSnapshot == null || fromTimestamp == Long.MIN_VALUE,
+        "Cannot set both '%s' and '%s' options",
+        SparkReadOptions.STREAM_FROM_SNAPSHOT,
+        SparkReadOptions.STREAM_FROM_TIMESTAMP);
+
     if (table.currentSnapshot() == null) {
       return StreamingOffset.START_OFFSET;
     }
 
+    if (fromSnapshot != null) {
+      return startingOffsetFromSnapshotOption(table, fromSnapshot);
+    }
+
     if (fromTimestamp == Long.MIN_VALUE) {
-      // start from the oldest snapshot, since default value is MIN_VALUE
-      // avoids looping to find first snapshot
-      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+      // read the initial snapshot in full, then continue with incremental 
changes
+      Snapshot current = table.currentSnapshot();
+
+      // Refuse row-level deletes rather than silently emit them
+      ValidationException.check(
+          current.deleteManifests(table.io()).isEmpty(),
+          "Cannot stream initial snapshot %d in full: snapshot has row-level 
deletes "
+              + "(V2 positional/equality delete files or V3 deletion vectors), 
which the "
+              + "Iceberg streaming source does not apply. Set '%s' to one of: 
'%s' "
+              + "(skip the backlog), '%s' (replay history 
snapshot-by-snapshot, with "
+              + 
"'streaming-skip-overwrite-snapshots'/'streaming-skip-delete-snapshots' as "
+              + "needed), or a specific snapshot id to start after.",
+          current.snapshotId(),
+          SparkReadOptions.STREAM_FROM_SNAPSHOT,
+          SparkReadOptions.STREAM_FROM_SNAPSHOT_LATEST,
+          SparkReadOptions.STREAM_FROM_SNAPSHOT_EARLIEST);
+      return new StreamingOffset(current.snapshotId(), 0L, true);

Review Comment:
   Agreed this is a breaking change, and I appreciate the careful read. I'm in 
agreement with you that I'd would've preferred this new behavior as the default 
from the get-go since it matches Delta's default and is probably what most 
users actually want when they start a fresh stream.
   
   That said, I'm open to make the initial snapshot load opt-in rather than the 
new default if maintainers agree. 
   
   One way to represent it could be to add `initial-snapshot` as the third 
accepted value for `stream-from-snapshot`, alongside `latest` / 
`<snapshot-id>`, where oldest ancestor would remain the default with no option 
specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to