[ 
https://issues.apache.org/jira/browse/FLINK-39645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39645:
-----------------------------------
    Labels: pull-request-available  (was: )

> HybridSourceReader.snapshotState() loses recovered splits when currentReader 
> is null
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-39645
>                 URL: https://issues.apache.org/jira/browse/FLINK-39645
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HybridSource
>            Reporter: Chen Zhang
>            Priority: Minor
>              Labels: pull-request-available
>
> h3. Summary
> HybridSourceReader.snapshotState() can return an empty split list during 
> recovery, permanently losing splits stored in the restoredSplits field. This 
> leads to silent data loss under repeated failover scenarios.
> h3. Reproduction Scenario
> 1. A Flink job using HybridSource takes a checkpoint successfully.
> 2. The job fails and restores from the checkpoint.
> 3. During recovery, addSplits() is called. Since currentSourceIndex == -1, 
> splits are buffered into restoredSplits (not forwarded to any reader).
> 4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a 
> SwitchSourceEvent to activate the appropriate source reader.
> 5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
> 6. snapshotState() finds currentReader == null and returns 
> Collections.emptyList(), ignoring restoredSplits entirely.
> 7. If the job fails again and restores from this new checkpoint, the buffered 
> splits are gone forever.
> h3. Root Cause
> In HybridSourceReader.java line 109-114:
> {code:java}
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
>     List<? extends SourceSplit> state =
>             currentReader != null
>                     ? currentReader.snapshotState(checkpointId)
>                     : Collections.emptyList();
>     return HybridSourceSplit.wrapSplits(state, currentSourceIndex, 
> switchedSources);
> }
> {code}
> When currentReader is null (which is the normal state between recovery and 
> source switch), the method snapshots an empty list. The restoredSplits field 
> – which holds splits recovered from the previous checkpoint but not yet 
> assigned to a reader – is completely excluded from the snapshot.
> h3. Impact
>  - Silent data loss: splits are dropped without any error or warning
>  - Most likely to surface in unstable environments with frequent restarts, 
> where the window between recovery and source switching is hit by consecutive 
> failures
>  - Affects all HybridSource users
> h3. Suggested Fix
> Throw exceptions when trying to snapshot state when current reader is null.
> {code:java}
> @Override
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
>     if (currentReader != null) {
>         List<? extends SourceSplit> state = 
> currentReader.snapshotState(checkpointId);
>         return HybridSourceSplit.wrapSplits(state, currentSourceIndex, 
> switchedSources);
>     } else {
>         throw new IllegalStateException("currentReader can't be null when 
> snapshot");
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to