[
https://issues.apache.org/jira/browse/FLINK-39645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chen Zhang updated FLINK-39645:
-------------------------------
Description:
h3. Summary
HybridSourceReader.snapshotState() can return an empty split list during
recovery, permanently losing splits stored in the restoredSplits field. This
leads to silent data loss under repeated failover scenarios.
h3. Reproduction Scenario
1. A Flink job using HybridSource takes a checkpoint successfully.
2. The job fails and restores from the checkpoint.
3. During recovery, addSplits() is called. Since currentSourceIndex == -1,
splits are buffered into restoredSplits (not forwarded to any reader).
4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a
SwitchSourceEvent to activate the appropriate source reader.
5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
6. snapshotState() finds currentReader == null and returns
Collections.emptyList(), ignoring restoredSplits entirely.
7. If the job fails again and restores from this new checkpoint, the buffered
splits are gone forever.
h3. Root Cause
In HybridSourceReader.java line 109-114:
{code:java}
public List<HybridSourceSplit> snapshotState(long checkpointId) {
List<? extends SourceSplit> state =
currentReader != null
? currentReader.snapshotState(checkpointId)
: Collections.emptyList();
return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
switchedSources);
}
{code}
When currentReader is null (which is the normal state between recovery and
source switch), the method snapshots an empty list. The restoredSplits field –
which holds splits recovered from the previous checkpoint but not yet assigned
to a reader – is completely excluded from the snapshot.
h3. Impact
- Silent data loss: splits are dropped without any error or warning
- Most likely to surface in unstable environments with frequent restarts,
where the window between recovery and source switching is hit by consecutive
failures
- Affects all HybridSource users
h3. Suggested Fix
Throw exceptions when trying to snapshot state when current reader is null.
{code:java}
@Override
public List<HybridSourceSplit> snapshotState(long checkpointId) {
if (currentReader != null) {
List<? extends SourceSplit> state =
currentReader.snapshotState(checkpointId);
return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
switchedSources);
} else {
throw new InvalidStateException("currentReader can't be null when
snapshot");
}
}
{code}
Additionally, a unit test should be added that verifies snapshotState()
preserves restoredSplits when no reader has been activated yet.
was:
h3. Summary
HybridSourceReader.snapshotState() can return an empty split list during
recovery, permanently losing splits stored in the restoredSplits field. This
leads to silent data loss under repeated failover scenarios.
h3. Reproduction Scenario
1. A Flink job using HybridSource takes a checkpoint successfully.
2. The job fails and restores from the checkpoint.
3. During recovery, addSplits() is called. Since currentSourceIndex == -1,
splits are buffered into restoredSplits (not forwarded to any reader).
4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a
SwitchSourceEvent to activate the appropriate source reader.
5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
6. snapshotState() finds currentReader == null and returns
Collections.emptyList(), ignoring restoredSplits entirely.
7. If the job fails again and restores from this new checkpoint, the buffered
splits are gone forever.
h3. Root Cause
In HybridSourceReader.java line 109-114:
{code:java}
public List<HybridSourceSplit> snapshotState(long checkpointId) {
List<? extends SourceSplit> state =
currentReader != null
? currentReader.snapshotState(checkpointId)
: Collections.emptyList();
return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
switchedSources);
}
{code}
When currentReader is null (which is the normal state between recovery and
source switch), the method snapshots an empty list. The restoredSplits field --
which holds splits recovered from the previous checkpoint but not yet assigned
to a reader -- is completely excluded from the snapshot.
h3. Impact
- Silent data loss: splits are dropped without any error or warning
- Most likely to surface in unstable environments with frequent restarts, where
the window between recovery and source switching is hit by consecutive failures
- Affects all HybridSource users
h3. Suggested Fix
Include restoredSplits in the snapshot when currentReader is null:
{code:java}
@Override
public List<HybridSourceSplit> snapshotState(long checkpointId) {
if (currentReader != null) {
List<? extends SourceSplit> state =
currentReader.snapshotState(checkpointId);
return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
switchedSources);
} else {
return new ArrayList<>(restoredSplits);
}
}
{code}
Additionally, a unit test should be added that verifies snapshotState()
preserves restoredSplits when no reader has been activated yet.
> HybridSourceReader.snapshotState() loses recovered splits when currentReader
> is null
> ------------------------------------------------------------------------------------
>
> Key: FLINK-39645
> URL: https://issues.apache.org/jira/browse/FLINK-39645
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HybridSource
> Reporter: Chen Zhang
> Priority: Minor
>
> h3. Summary
> HybridSourceReader.snapshotState() can return an empty split list during
> recovery, permanently losing splits stored in the restoredSplits field. This
> leads to silent data loss under repeated failover scenarios.
> h3. Reproduction Scenario
> 1. A Flink job using HybridSource takes a checkpoint successfully.
> 2. The job fails and restores from the checkpoint.
> 3. During recovery, addSplits() is called. Since currentSourceIndex == -1,
> splits are buffered into restoredSplits (not forwarded to any reader).
> 4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a
> SwitchSourceEvent to activate the appropriate source reader.
> 5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
> 6. snapshotState() finds currentReader == null and returns
> Collections.emptyList(), ignoring restoredSplits entirely.
> 7. If the job fails again and restores from this new checkpoint, the buffered
> splits are gone forever.
> h3. Root Cause
> In HybridSourceReader.java line 109-114:
> {code:java}
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
> List<? extends SourceSplit> state =
> currentReader != null
> ? currentReader.snapshotState(checkpointId)
> : Collections.emptyList();
> return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
> switchedSources);
> }
> {code}
> When currentReader is null (which is the normal state between recovery and
> source switch), the method snapshots an empty list. The restoredSplits field
> – which holds splits recovered from the previous checkpoint but not yet
> assigned to a reader – is completely excluded from the snapshot.
> h3. Impact
> - Silent data loss: splits are dropped without any error or warning
> - Most likely to surface in unstable environments with frequent restarts,
> where the window between recovery and source switching is hit by consecutive
> failures
> - Affects all HybridSource users
> h3. Suggested Fix
> Throw exceptions when trying to snapshot state when current reader is null.
> {code:java}
> @Override
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
> if (currentReader != null) {
> List<? extends SourceSplit> state =
> currentReader.snapshotState(checkpointId);
> return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
> switchedSources);
> } else {
> throw new InvalidStateException("currentReader can't be null when
> snapshot");
> }
> }
> {code}
> Additionally, a unit test should be added that verifies snapshotState()
> preserves restoredSplits when no reader has been activated yet.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)