DanielCarter-stack commented on issue #10574:
URL: https://github.com/apache/seatunnel/issues/10574#issuecomment-4016018631

   <!-- code-pr-reviewer -->
   This data loss scenario is plausible based on the checkpoint coordination 
logic. The core issue appears to be a race condition in the checkpoint 
lifecycle:
   
   **Root cause:** When checkpoint barrier arrives, Kafka Source calls 
`snapshotState()` and persists offset (e.g., 20339) to checkpoint state via 
`CheckpointCoordinator.completePendingCheckpoint()` (line 942-964). Sink's 
`prepareCommit()` executes after source snapshot. If sink prepareCommit fails 
(schema mismatch, constraint violation), the checkpoint state (including source 
offset) is already acknowledged and persisted.
   
   **Code evidence:**
   - `CheckpointCoordinator.java`: Persists checkpoint once all tasks 
acknowledge
   - `KafkaSourceReader.java`: Saves offset to `checkpointOffsetMap` at 
`snapshotState()` (line 104-124)
   - `JdbcSinkWriter.java`: Non-XA `prepareCommit()` directly flushes without 
rollback capability (line 156-166)
   - `KafkaPartitionSplitReader.java`: Restores from last acknowledged 
checkpoint offset via `seekToStartingOffsets()` (line 263-268)
   
   **Expected behavior:** Recovery should seek to offset 20000 (last successful 
checkpoint), but it seeks to 20339 because that checkpoint was acknowledged 
despite sink failure.
   
   **Questions for clarification:**
   1. Can you confirm `env.exactly_once: true` with `sink.is_exactly_once: 
false` in your config?
   2. Do logs show checkpoint 6 marked as completed/acknowledged before the 
sink failure?
   3. Is the StarRocks sink using XA transactions (`is_exactly_once: true` is 
not supported for StarRocks)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to