DanielCarter-stack commented on issue #10574: URL: https://github.com/apache/seatunnel/issues/10574#issuecomment-4016018631
<!-- code-pr-reviewer --> This data loss scenario is plausible based on the checkpoint coordination logic. The core issue appears to be a race condition in the checkpoint lifecycle: **Root cause:** When checkpoint barrier arrives, Kafka Source calls `snapshotState()` and persists offset (e.g., 20339) to checkpoint state via `CheckpointCoordinator.completePendingCheckpoint()` (line 942-964). Sink's `prepareCommit()` executes after source snapshot. If sink prepareCommit fails (schema mismatch, constraint violation), the checkpoint state (including source offset) is already acknowledged and persisted. **Code evidence:** - `CheckpointCoordinator.java`: Persists checkpoint once all tasks acknowledge - `KafkaSourceReader.java`: Saves offset to `checkpointOffsetMap` at `snapshotState()` (line 104-124) - `JdbcSinkWriter.java`: Non-XA `prepareCommit()` directly flushes without rollback capability (line 156-166) - `KafkaPartitionSplitReader.java`: Restores from last acknowledged checkpoint offset via `seekToStartingOffsets()` (line 263-268) **Expected behavior:** Recovery should seek to offset 20000 (last successful checkpoint), but it seeks to 20339 because that checkpoint was acknowledged despite sink failure. **Questions for clarification:** 1. Can you confirm `env.exactly_once: true` with `sink.is_exactly_once: false` in your config? 2. Do logs show checkpoint 6 marked as completed/acknowledged before the sink failure? 3. Is the StarRocks sink using XA transactions (`is_exactly_once: true` is not supported for StarRocks)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
