yugeeklab opened a new issue, #8205:
URL: https://github.com/apache/paimon/issues/8205

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   ### Paimon version
   
   master (1.5-SNAPSHOT), also present in release-1.4
   
   ### Compute Engine
   
   Spark 3.5, Structured Streaming (micro-batch), `scan.mode = latest-full`
   
   ### Minimal reproduce step
   
   1. Start a streaming query reading a primary-key table 
(`spark.readStream.table(...)` with `scan.mode=latest-full`), let it commit a 
few batches.
   2. Kill the query and restart it with the same checkpoint.
   3. The first batch after restart is empty; the second batch re-reads the 
**entire snapshot** instead of resuming from the checkpointed offset, and the 
changelog between the checkpointed offset and the restart-time snapshot is 
skipped.
   
   Offset WAL across one observed restart (5-minute trigger, ~300k-row source):
   
   ```
   batch N    {"snapshotId":134483,"index":20,"scanSnapshot":false}   <- 
committed before stop
   batch N+1  {"snapshotId":134491,"index":29,"scanSnapshot":false}   <- 0 
rows, 2s
   batch N+2  {"snapshotId":134511,"index":29,"scanSnapshot":false}   <- 
1,055,509 input rows
   ```
   
   ### What doesn't meet your expectations?
   
   `PaimonMicroBatchStream#planInputPartitions` clamps the checkpointed start 
offset up to `initOffset` whenever it compares lower:
   
   ```scala
   if (startOffset0.compareTo(initOffset) < 0) { initOffset } else { 
startOffset0 }
   ```
   
   `initOffset` is a lazy val recomputed from the **current** table state on 
every restart; with `scan.mode=latest-full` it always points at the current 
snapshot with `scanSnapshot=true`. After any downtime the checkpointed offset 
compares lower, so a perfectly valid checkpoint is silently discarded:
   
   - the batch planned against the old WAL end offset becomes empty (all splits 
filtered by the `(start, end]` range check),
   - the next batch re-emits the whole snapshot as `+I` rows carrying their 
stored field values (e.g. old event-time columns), and
   - the changelog gap between the checkpointed offset and the restart-time 
snapshot is never read.
   
   For stateful consumers (e.g. SCD2 history building) the re-emission applies 
historical rows a second time and corrupted downstream state in our production 
pipeline.
   
   ### Anything else?
   
   Proposed fix (PR follows): fall back to `initOffset` only when the 
checkpointed snapshot has actually expired (`startOffset0.snapshotId < 
earliestSnapshotId()`); otherwise resume from the checkpointed offset as-is.
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to