Bowen Li created FLINK-39946:
--------------------------------

             Summary: Unaligned checkpoint restore can mis-map union input 
channel state after reordering UIDed sources
                 Key: FLINK-39946
                 URL: https://issues.apache.org/jira/browse/FLINK-39946
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.19.1
            Reporter: Bowen Li


When a Flink job restores from an unaligned checkpoint, input channel state is 
restored by positional input gate/channel indexes rather than by a stable edge 
identity.

This can break when a job has multiple source operators feeding a union and the 
source declarations are reordered between job versions. Even if the source 
operators have stable UIDs, the generated {{JobEdge}} / input gate order can 
still change because initial source traversal is based on transient stream node 
ordering. After restore, in-flight channel state from one logical source may be 
applied to another logical input gate.

This is especially problematic for jobs that:
 * use unaligned checkpoints or aligned checkpoints that fall back to unaligned,
 * restore from checkpoint/savepoint state containing in-flight channel data,
 * reorder UIDed union inputs, for example by reordering Kafka source 
declarations.

Expected behavior: if union inputs have stable operator identities, restoring 
channel state should preserve the logical source-to-input-gate mapping across 
source declaration reorders.

Observed behavior: source operator UIDs remain stable, but union input gate 
positions can change, so channel state restore is tied to the wrong logical 
input.


For this specific bug, it is an unaligned-checkpoint channel-state restore 
issue, not a normal aligned-checkpoint restore issue.

Why:
 * Aligned checkpoints do not persist in-flight input channel buffers; barriers 
align first, so there is no channel state to remap on restore.
 * Unaligned checkpoints do persist in-flight channel state, and Flink restores 
that by positional {{{}gateIdx/inputChannelIdx{}}}. That is where reordered 
union inputs can restore the wrong logical channel state.

One caveat: an “aligned” job can still hit it if it uses aligned checkpoint 
timeout and falls back to unaligned. In that case the affected checkpoint 
contains channel state, so the same bug applies.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to