Hi everyone,

As Flink sees growing adoption for AI and multimodal workloads — where
source records are small (mainly metadata) but downstream computation
is heavy — network backpressure can cause Aligned Checkpoint to
timeout. UC is a natural fit, but currently falls back to AC on
POINTWISE edges (rescale(), forward()), preventing users from reliably
benefiting from UC. This limitation was discussed in FLINK-21936. We
propose to enable UC on POINTWISE edges via a configuration option,
and handle the associated state recovery challenges.

Regarding feasibility, FLINK-21936 raised a valid concern: UC recovery
with parallelism changes may redistribute data to different subtasks,
breaking ordering assumptions on KeyedStreams. This is real, but many
workloads treat records as independent items and don't assume
ordering. We propose FLINK-36980, keeping the default behavior
unchanged and relying on FORCE_UNALIGNED for explicit opt-in.

For state recovery, POINTWISE edges differ from ALL_TO_ALL in two
ways: (1) not all upstream-downstream subtask pairs are connected, so
state cannot be freely assigned to any new subtask; (2) local
channel/subpartition index ≠ peer's global subtask index, so
SubtaskConnectionDescriptor cannot use local indices directly. Our
approach:

- Downstream: standard ROUND_ROBIN assignment, no special handling needed.
- Upstream: a topology-aware computation traces connections —
newUpstream → consumersOf → newDownstream → ROUND_ROBIN →
oldDownstream → producersOf → oldUpstream — to find which old
upstreams hold relevant state. Each new upstream reads all associated
old subtask state, then filters at recovery time to only write buffers
destined for its actually connected downstreams. When multiple new
upstreams trace to the same old upstream, a primary-producer rule
deduplicates.
- Edge pattern persistence: the computation requires knowing the
distribution pattern of edges in both old and new job versions. We
leverage the MasterHook mechanism to record each edge's pattern into
checkpoint state. Old checkpoints without this hook are treated as
ALL_TO_ALL — the only pattern that supported UC before — enabling
seamless cross-version upgrades.

We have a POC passing end-to-end tests covering restoring and
rescaling across all four shuffle-change combinations (A2A↔A2A,
A2A↔PW, PW↔A2A, PW↔PW). However, some aspects — such as the
configuration granularity and reliance on MasterHook — may not be
optimal, and we'd appreciate community input on the overall direction.
Looking forward to your feedbacks.

Best regards,
Gen

Reply via email to