liucao-dd commented on code in PR #16679:
URL: https://github.com/apache/iceberg/pull/16679#discussion_r3354342817
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java:
##########
@@ -31,14 +35,43 @@ class MicroBatchUtils {
private MicroBatchUtils() {}
static StreamingOffset determineStartingOffset(Table table, long
fromTimestamp) {
+ return determineStartingOffset(table, fromTimestamp, null);
+ }
+
+ static StreamingOffset determineStartingOffset(
+ Table table, long fromTimestamp, String fromSnapshot) {
+ ValidationException.check(
+ fromSnapshot == null || fromTimestamp == Long.MIN_VALUE,
+ "Cannot set both '%s' and '%s' options",
+ SparkReadOptions.STREAM_FROM_SNAPSHOT,
+ SparkReadOptions.STREAM_FROM_TIMESTAMP);
+
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}
+ if (fromSnapshot != null) {
+ return startingOffsetFromSnapshotOption(table, fromSnapshot);
+ }
+
if (fromTimestamp == Long.MIN_VALUE) {
- // start from the oldest snapshot, since default value is MIN_VALUE
- // avoids looping to find first snapshot
- return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+ // read the initial snapshot in full, then continue with incremental
changes
+ Snapshot current = table.currentSnapshot();
+
+ // Refuse row-level deletes rather than silently emit them
+ ValidationException.check(
+ current.deleteManifests(table.io()).isEmpty(),
+ "Cannot stream initial snapshot %d in full: snapshot has row-level
deletes "
+ + "(V2 positional/equality delete files or V3 deletion vectors),
which the "
+ + "Iceberg streaming source does not apply. Set '%s' to one of:
'%s' "
+ + "(skip the backlog), '%s' (replay history
snapshot-by-snapshot, with "
+ +
"'streaming-skip-overwrite-snapshots'/'streaming-skip-delete-snapshots' as "
+ + "needed), or a specific snapshot id to start after.",
+ current.snapshotId(),
+ SparkReadOptions.STREAM_FROM_SNAPSHOT,
+ SparkReadOptions.STREAM_FROM_SNAPSHOT_LATEST,
+ SparkReadOptions.STREAM_FROM_SNAPSHOT_EARLIEST);
+ return new StreamingOffset(current.snapshotId(), 0L, true);
Review Comment:
This changes the fresh-checkpoint default from replaying incremental history
from the oldest ancestor to reading only the current table state. That is a
breaking semantic change for existing streaming jobs/backfills that rely on the
no-option behavior.
A concrete compatibility failure mode is a job that currently uses the
incremental path with `streaming-skip-delete-snapshots=true` and/or
`streaming-skip-overwrite-snapshots=true` to tolerate delete/overwrite
snapshots. With this new default, a fresh checkpoint on a table whose current
live state still has row-level delete manifests fails during initial offset
creation at the `current.deleteManifests(table.io()).isEmpty()` guard. That
happens before the planner reaches `shouldProcess`, so the existing skip
options never get a chance to apply. The streaming source still does not apply
delete manifests either way; this is about changing when and how an existing
configured stream fails.
Can we keep the existing default and make initial-snapshot loading explicit
behind the new option? If the project wants to flip the default, it seems worth
doing with a documented migration/deprecation path rather than in the same PR
that introduces the feature.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -80,6 +80,15 @@ private SparkReadOptions() {}
// Timestamp in milliseconds; start a stream from the snapshot that occurs
after this timestamp
public static final String STREAM_FROM_TIMESTAMP = "stream-from-timestamp";
+ // Starting point for a streaming read. Accepts a snapshot id, "latest", or
"earliest"
+ public static final String STREAM_FROM_SNAPSHOT = "stream-from-snapshot";
Review Comment:
TODO reminder for later: before this graduates from draft/merge, please
update the Spark structured streaming/read-options docs and either port this
option to the other active Spark source trees (`v4.0`/`v4.1`) or clearly
document why this PR is intentionally scoped to Spark 3.5 only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]