yugeeklab opened a new issue, #8205: URL: https://github.com/apache/paimon/issues/8205
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Paimon version master (1.5-SNAPSHOT), also present in release-1.4 ### Compute Engine Spark 3.5, Structured Streaming (micro-batch), `scan.mode = latest-full` ### Minimal reproduce step 1. Start a streaming query reading a primary-key table (`spark.readStream.table(...)` with `scan.mode=latest-full`), let it commit a few batches. 2. Kill the query and restart it with the same checkpoint. 3. The first batch after restart is empty; the second batch re-reads the **entire snapshot** instead of resuming from the checkpointed offset, and the changelog between the checkpointed offset and the restart-time snapshot is skipped. Offset WAL across one observed restart (5-minute trigger, ~300k-row source): ``` batch N {"snapshotId":134483,"index":20,"scanSnapshot":false} <- committed before stop batch N+1 {"snapshotId":134491,"index":29,"scanSnapshot":false} <- 0 rows, 2s batch N+2 {"snapshotId":134511,"index":29,"scanSnapshot":false} <- 1,055,509 input rows ``` ### What doesn't meet your expectations? `PaimonMicroBatchStream#planInputPartitions` clamps the checkpointed start offset up to `initOffset` whenever it compares lower: ```scala if (startOffset0.compareTo(initOffset) < 0) { initOffset } else { startOffset0 } ``` `initOffset` is a lazy val recomputed from the **current** table state on every restart; with `scan.mode=latest-full` it always points at the current snapshot with `scanSnapshot=true`. After any downtime the checkpointed offset compares lower, so a perfectly valid checkpoint is silently discarded: - the batch planned against the old WAL end offset becomes empty (all splits filtered by the `(start, end]` range check), - the next batch re-emits the whole snapshot as `+I` rows carrying their stored field values (e.g. old event-time columns), and - the changelog gap between the checkpointed offset and the restart-time snapshot is never read. For stateful consumers (e.g. SCD2 history building) the re-emission applies historical rows a second time and corrupted downstream state in our production pipeline. ### Anything else? Proposed fix (PR follows): fall back to `initOffset` only when the checkpointed snapshot has actually expired (`startOffset0.snapshotId < earliestSnapshotId()`); otherwise resume from the checkpointed offset as-is. ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
