DanielCarter-stack commented on issue #10571:
URL: https://github.com/apache/seatunnel/issues/10571#issuecomment-4011127496

   <!-- code-pr-reviewer -->
   Thanks for reporting this. Based on the codebase analysis, this appears to 
be a real issue with offset recovery after Flink restart.
   
   **Root Cause (preliminary)**:
   When Flink restarts from a checkpoint, the saved LSN offset may not be 
correctly passed to Debezium's `SqlServerOffsetContext`. The 
`loadStartingOffsetState()` method in 
`SqlServerSourceFetchTaskContext.java:268-278` retrieves the offset from 
`split.asIncrementalSplit().getStartupOffset()` (the initial configured offset 
for `startup.mode = "latest"`) instead of the checkpoint-restored offset. This 
causes the CDC connector to restart from a fixed position, while the 
`streamingExecutionContext`'s `shouldIncreaseFromLsn` flag remains `true`, 
leading to LSN increments that skip subsequent records.
   
   **To verify and diagnose**:
   1. Check Flink logs for "Last position recorded in offsets" to confirm the 
LSN value after restart
   2. Verify the actual data written to Paimon (is it exactly one record after 
each restart?)
   3. Confirm whether this occurs **only** after Flink restart/savepoint 
restore, or also on first run
   4. Provide your Flink checkpoint storage location (HDFS/local filesystem)
   
   **Temporary workaround**:
   Consider changing `startup.mode` from `latest` to `initial` to test if the 
issue is specific to `latest` mode initialization.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to