[
https://issues.apache.org/jira/browse/FLINK-39645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18081790#comment-18081790
]
袁焊忠 commented on FLINK-39645:
-----------------------------
I opened a focused PR for this: https://github.com/apache/flink/pull/28191
The regression test covers the recovery window where restored HybridSourceSplit
state has been added back, but the SwitchSourceEvent has not created the
current reader yet. Before the change, snapshotState() returned an empty list
in that window; with the patch it keeps a defensive copy of the restored splits
in the checkpoint state.
Testing:
- ./mvnw -pl flink-connectors/flink-connector-base -DskipITs
-Dtest=HybridSourceReaderTest#testReaderRecoverySnapshotBeforeSwitchSourceEvent
-Dsurefire.failIfNoSpecifiedTests=false test
- ./mvnw -pl flink-connectors/flink-connector-base -DskipITs
-Dtest=HybridSourceReaderTest -Dsurefire.failIfNoSpecifiedTests=false test
- ./mvnw -pl flink-connectors/flink-connector-base -DskipITs test
- git diff --check
The connector-base module run finished with 159 tests, 0 failures, 0 errors,
and BUILD SUCCESS.
> HybridSourceReader.snapshotState() loses recovered splits when currentReader
> is null
> ------------------------------------------------------------------------------------
>
> Key: FLINK-39645
> URL: https://issues.apache.org/jira/browse/FLINK-39645
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HybridSource
> Reporter: Chen Zhang
> Priority: Minor
> Labels: pull-request-available
>
> h3. Summary
> HybridSourceReader.snapshotState() can return an empty split list during
> recovery, permanently losing splits stored in the restoredSplits field. This
> leads to silent data loss under repeated failover scenarios.
> h3. Reproduction Scenario
> 1. A Flink job using HybridSource takes a checkpoint successfully.
> 2. The job fails and restores from the checkpoint.
> 3. During recovery, addSplits() is called. Since currentSourceIndex == -1,
> splits are buffered into restoredSplits (not forwarded to any reader).
> 4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a
> SwitchSourceEvent to activate the appropriate source reader.
> 5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
> 6. snapshotState() finds currentReader == null and returns
> Collections.emptyList(), ignoring restoredSplits entirely.
> 7. If the job fails again and restores from this new checkpoint, the buffered
> splits are gone forever.
> h3. Root Cause
> In HybridSourceReader.java line 109-114:
> {code:java}
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
> List<? extends SourceSplit> state =
> currentReader != null
> ? currentReader.snapshotState(checkpointId)
> : Collections.emptyList();
> return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
> switchedSources);
> }
> {code}
> When currentReader is null (which is the normal state between recovery and
> source switch), the method snapshots an empty list. The restoredSplits field
> – which holds splits recovered from the previous checkpoint but not yet
> assigned to a reader – is completely excluded from the snapshot.
> h3. Impact
> - Silent data loss: splits are dropped without any error or warning
> - Most likely to surface in unstable environments with frequent restarts,
> where the window between recovery and source switching is hit by consecutive
> failures
> - Affects all HybridSource users
> h3. Suggested Fix
> Throw exceptions when trying to snapshot state when current reader is null.
> {code:java}
> @Override
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
> if (currentReader != null) {
> List<? extends SourceSplit> state =
> currentReader.snapshotState(checkpointId);
> return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
> switchedSources);
> } else {
> throw new IllegalStateException("currentReader can't be null when
> snapshot");
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)