liucao-dd commented on code in PR #16679:
URL: https://github.com/apache/iceberg/pull/16679#discussion_r3354342817


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java:
##########
@@ -31,14 +35,43 @@ class MicroBatchUtils {
   private MicroBatchUtils() {}
 
   static StreamingOffset determineStartingOffset(Table table, long 
fromTimestamp) {
+    return determineStartingOffset(table, fromTimestamp, null);
+  }
+
+  static StreamingOffset determineStartingOffset(
+      Table table, long fromTimestamp, String fromSnapshot) {
+    ValidationException.check(
+        fromSnapshot == null || fromTimestamp == Long.MIN_VALUE,
+        "Cannot set both '%s' and '%s' options",
+        SparkReadOptions.STREAM_FROM_SNAPSHOT,
+        SparkReadOptions.STREAM_FROM_TIMESTAMP);
+
     if (table.currentSnapshot() == null) {
       return StreamingOffset.START_OFFSET;
     }
 
+    if (fromSnapshot != null) {
+      return startingOffsetFromSnapshotOption(table, fromSnapshot);
+    }
+
     if (fromTimestamp == Long.MIN_VALUE) {
-      // start from the oldest snapshot, since default value is MIN_VALUE
-      // avoids looping to find first snapshot
-      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+      // read the initial snapshot in full, then continue with incremental 
changes
+      Snapshot current = table.currentSnapshot();
+
+      // Refuse row-level deletes rather than silently emit them
+      ValidationException.check(
+          current.deleteManifests(table.io()).isEmpty(),
+          "Cannot stream initial snapshot %d in full: snapshot has row-level 
deletes "
+              + "(V2 positional/equality delete files or V3 deletion vectors), 
which the "
+              + "Iceberg streaming source does not apply. Set '%s' to one of: 
'%s' "
+              + "(skip the backlog), '%s' (replay history 
snapshot-by-snapshot, with "
+              + 
"'streaming-skip-overwrite-snapshots'/'streaming-skip-delete-snapshots' as "
+              + "needed), or a specific snapshot id to start after.",
+          current.snapshotId(),
+          SparkReadOptions.STREAM_FROM_SNAPSHOT,
+          SparkReadOptions.STREAM_FROM_SNAPSHOT_LATEST,
+          SparkReadOptions.STREAM_FROM_SNAPSHOT_EARLIEST);
+      return new StreamingOffset(current.snapshotId(), 0L, true);

Review Comment:
   This changes the fresh-checkpoint default from replaying incremental history 
from the oldest ancestor to reading only the current table state. That is a 
breaking semantic change for existing streaming jobs/backfills that rely on the 
no-option behavior.
   
   One related compatibility concern is the row-level-delete failure mode: the 
streaming source does not apply delete manifests today, but the new default 
adds a startup guard when the current snapshot has live row-level delete 
manifests. That can fail jobs at startup in cases where the previous 
incremental path would have used the existing delete/overwrite snapshot 
handling options instead. This seems much safer as an explicit opt-in mode than 
as the new default.
   
   Can we keep the existing default and make initial-snapshot loading explicit 
behind the new option? If the project wants to flip the default, it seems worth 
doing with a documented migration/deprecation path rather than in the same PR 
that introduces the feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to