DanielCarter-stack commented on issue #10571: URL: https://github.com/apache/seatunnel/issues/10571#issuecomment-4011127496
<!-- code-pr-reviewer --> Thanks for reporting this. Based on the codebase analysis, this appears to be a real issue with offset recovery after Flink restart. **Root Cause (preliminary)**: When Flink restarts from a checkpoint, the saved LSN offset may not be correctly passed to Debezium's `SqlServerOffsetContext`. The `loadStartingOffsetState()` method in `SqlServerSourceFetchTaskContext.java:268-278` retrieves the offset from `split.asIncrementalSplit().getStartupOffset()` (the initial configured offset for `startup.mode = "latest"`) instead of the checkpoint-restored offset. This causes the CDC connector to restart from a fixed position, while the `streamingExecutionContext`'s `shouldIncreaseFromLsn` flag remains `true`, leading to LSN increments that skip subsequent records. **To verify and diagnose**: 1. Check Flink logs for "Last position recorded in offsets" to confirm the LSN value after restart 2. Verify the actual data written to Paimon (is it exactly one record after each restart?) 3. Confirm whether this occurs **only** after Flink restart/savepoint restore, or also on first run 4. Provide your Flink checkpoint storage location (HDFS/local filesystem) **Temporary workaround**: Consider changing `startup.mode` from `latest` to `initial` to test if the issue is specific to `latest` mode initialization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
