Thanks Raorao for driving this regional based checkpoint,
and thanks everyone for the discussion and Roman's ping.

Regarding FLIP-306 (Unified File Merging Mechanism for Checkpoints) [1][2]
and FLINK-26803 (Merge small ChannelState file for Unaligned
Checkpoint)[3][4].

IIRC, to make FLINK-26803 work with regional checkpoints, its file sharing
would might need to change from the current job/TM granularity to
region granularity, because multiple tasks share the same file and a
failure of any one of them fails the channel state writing of all the
others,
so we have to narrow the sharing scope to a single region to avoid that.

But stepping back, I'm wondering whether we can just deprecate FLINK-26803,
since the Unified File Merging Mechanism (FLIP-306) has already been
introduced.
If FLIP-306 is stable and fully covers channel state, this FLIP wouldn't
need to handle
FLINK-26803's granularity at all.

That brings it back to a question for the FLIP-306 authors — whether
channel state support there is complete and stable. I raised the same
concern
(a `// TODO support channel state restore` with no JIRA tracking it) here[5]


Regards,
Rui

[1] https://issues.apache.org/jira/browse/FLINK-32070
[2]
https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/config/#execution-checkpointing-file-merging-enabled
[3]
https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/config/#execution-checkpointing-unaligned-interruptible-timers-enabled
[4] https://issues.apache.org/jira/browse/FLINK-26803
[5]
https://issues.apache.org/jira/browse/FLINK-32070?focusedCommentId=18087875&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-18087875

On Mon, Jun 8, 2026 at 1:46 PM Roman Khachatryan <[email protected]> wrote:

> Hi, thanks for the proposal!
> It makes a lot of sense to me.
>
> I have a few questions:
>
> 1. Per my understanding, regions are not always independent; they can be
> connected via BLOCKING/HYBRID exchanges - even in Streaming.
> So the proposal should be limited to embarrassingly parallel regions only
> (and verify that at runtime).
>
> 2. What actually happens when max-consecutive-failures are exceeded - does
> it trigger checkpoint failure or job failure?
>
> 3. Can you please describe the proposed layout of checkpoint metadata?
>
> 4. Can you please describe in more detail the recovery procedure (JM side)?
> I'm concerned about loading >1 checkpoint metadata files and restoring
> ByteStreamStateHandles.
>
> 5. How exactly are we going to keep the old state, in particular in the
> SharedStateRegistry?
> Are we going to re-register it or just keep `max-consecutive-failures`
> checkpoints alive?
>
> 6. Checkpoint abortion notifications
> I can see why we don't want to send those (mostly because unforeseen side
> effects in external systems - as a whole checkpoint is neither aborted nor
> completed).
> But on the other hand, TMs might not release some resources and/or
> accumulate garbage as a result.
> Would it make sense to introduce a new notification type?
>
> 7. Local Recovery cleanup
> In particular, will TaskLocalStateStore cleanup work as expected?
>
> 8. What happens if a task never acknowledges nor declines the checkpoint?
> I'm wondering whether it makes sense to have per-region checkpoint
> timeouts.
> That would "regionalize" checkpoint timeouts similar to other checkpoint
> failures.
>
> 9. How would this feature work with FLIP-306 (Unified File Merging
> Mechanism for Checkpoints) and FLINK-26803 (Merge small ChannelState file
> for Unaligned Checkpoint)
> Per my understanding, tasks from different regions may end up on the same
> TM; so we may end up deleting shared files?
>
> 9. How would this feature work with FLINK-26803 (Merge small ChannelState
> file for Unaligned Checkpoint)
> Per my understanding, a shared channel state writer would fail the entire
> checkpoint anyways.
> cc @Rui Fan
>
> 11. How would this feature work with finished operators? What if a region
> has some finished operators?
> For example, if we skip checkpoint completion notification for a partially
> completed checkpoint;
> but that checkpoint turns out to be the last one?
>
> 12. NO_CLAIM mode - do we need to warn users that they can't delete the
> original checkpoint, even after the completion of new checkpoints?
>
> 13. Is changelog state backend supported?
> I guess not, because it has its own lifecycle for state.
>
> 14. How would this feature work with checkpointing during recovery?
> (FLIP-547)
> I don't see any issues but it would be great to confirm.
> cc @Rui Fan
>
> Regards,
> Roman
>
>
> On Thu, Jun 4, 2026 at 5:04 PM 熊饶饶 <[email protected]> wrote:
>
> > Hi Zihao and Gen,
> >
> > Thank you for the thoughtful questions! Here are my responses.
> >
> > 1. Should CheckpointListener be extended to carry region-level
> information?
> >
> > In the current design, when a Regional Checkpoint completes:
> >
> > Healthy region tasks: receive notifyCheckpointComplete(checkpointId) —
> > they can safely commit external transactions.
> > Failed region tasks: receive nothing — they are undergoing Region
> Failover
> > (already cancelled/restarting) and cannot receive any notification.
> > OperatorCoordinators: have already been corrected via
> > checkpointCoordinatorForRegionFallback before receiving
> > notifyCheckpointComplete.
> > I share the concern about potential misunderstandings, so we should
> extend
> > the interface with a parameter that indicates Regional Checkpoint
> > information. Here is the proposed extension:
> >
> > // In CheckpointListener (default method, backward compatible)
> > default void notifyCheckpointComplete(long checkpointId,
> > RegionalCheckpointInfo info) {
> >     notifyCheckpointComplete(checkpointId);
> > }
> >
> > // Regional checkpoint info — provides global regional checkpoint context
> > to the listener
> > // so it can distinguish between a global checkpoint and a regional
> > checkpoint.
> > public class RegionalCheckpointInfo {
> >     // (old) checkpointId -> execution vertices using that old
> checkpointId
> >     private Map> regionalExecutionVertexMap;
> >     // Other properties for future extension.
> > }
> > As a result, listeners can distinguish between global checkpoints and
> > regional checkpoints.
> >
> > 2. Task that fails to acknowledge for a long period (e.g., transient
> > network issue)
> >
> > This is a good question. After further analysis, I believe Regional
> > Checkpoint can cover the timeout scenario as well. Here is the reasoning:
> >
> > Current behavior (without Regional Checkpoint): When a checkpoint
> expires,
> > CheckpointCanceller triggers abortPendingCheckpoint — the entire
> checkpoint
> > is discarded regardless of how many tasks have already acknowledged.
> > Proposed enhancement — Timeout-Aware Regional Checkpoint: When the
> > checkpoint timer expires and Regional Checkpoint is enabled, we can treat
> > un-acknowledged tasks as failed tasks and apply the same regional logic.
> > 3. Correctness Argument — Why merging states from different checkpoints
> is
> > valid
> >
> > As you mentioned, a snapshot is valid if it matches some instantaneous
> > state that could actually occur during execution.
> >
> > Regional Checkpoint operates at the Region granularity, allowing states
> > from different checkpoint IDs to be freely composed while still
> maintaining
> > consistency. This is possible due to the following conditions:
> >
> > Data isolation between Pipeline Regions: Barriers emitted by source
> > operators and business data do not flow across region boundaries.
> > Therefore, it is sufficient to ensure that all operators within a single
> > region have their state completed under the same checkpoint ID. Upon
> > recovery, consistency within each region is guaranteed independently.
> > State isolation between Pipeline Regions: Flink operator state is
> > typically managed on the TaskManager side. The exception is
> > OperatorCoordinator, whose state is generally used to coordinate across
> > different subtask instances of the same operator. This requires
> additional
> > state correction during Regional Checkpoint — the logic for which is
> > described in the FLIP document.
> > 4. Should failed Regions receive a checkpointAborted notification for
> > checkpoint N?
> >
> > I believe there is no need to notify failed regions with a
> > checkpoint-aborted message. Here's why:
> >
> > First, we extend the notifyCheckpointComplete interface with a new
> > RegionalCheckpointInfo parameter, which includes a global mapping of
> failed
> > operators to their fallback checkpoint IDs. Developers can use this
> > information to handle regional checkpoints appropriately.
> > Second, the failed region tasks may be undergoing failover and their
> > execution attempts have been cancelled. If there is no region failover
> > (i.e., just an expired checkpoint acknowledgment), it is better to skip
> > checkpoint 100 and wait for the next checkpoint 101 to complete, rather
> > than sending a checkpoint-aborted notification.
> > Third, it has no effect on data consistency whether or not a
> > checkpoint-aborted notification is sent. If the task undergoes failover,
> it
> > will recover from checkpoint 99. Otherwise, it will continue running and
> > wait for the next checkpoint to complete or abort (in case of a global
> > checkpoint failure).
> > 5. Ecosystem Compatibility — Connectors and external OperatorCoordinators
> >
> > This is a very good question — we should ensure that most Flink
> connectors
> > work well with this feature. For the Kafka connector, I believe it will
> > support Regional Checkpoint. Here is the reasoning:
> >
> > It uses the standard FLIP-27 Source API with KafkaSourceEnumerator and
> > KafkaSourceReader.
> > Offsets are tracked in KafkaPartitionSplit (operator state), not in
> > coordinator state.
> > addSplitsBack() correctly moves partitions back to unassignedSplits for
> > re-assignment.
> > In unbounded mode, periodic partition discovery continues normally after
> > recovery.
> > The only minor observation: if COMMIT_OFFSETS_ON_CHECKPOINT is enabled,
> > healthy regions commit their Kafka consumer group offsets while failed
> > regions don't. This causes a brief consumer-lag reporting inconsistency
> in
> > external monitoring tools, but has no correctness impact since Flink
> always
> > recovers from its own state, not from Kafka broker offsets.
> > The flink-cdc, flink-paimon, and flink-fluss connectors all use the
> > standard FLIP-27 Source, so their SourceCoordinator will support Regional
> > Checkpoint. In the current latest code, Paimon and Fluss implement custom
> > OperatorCoordinators (WriteOperatorCoordinator and
> > DataStatisticsCoordinator), but these are used only for operator
> > coordination and do not persist any checkpoint state, so they should be
> > compatible with Regional Checkpoint.
> >
> > Of course, other common connectors will be verified and supported in
> > subsequent work. The conten will be updated in the FLIP document.
> >
> > Looking forward to your feedback!
> >
> > Best regards,
> >
> > Raorao Xiong
> >
> >
> > > 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
> > >
> > > Hi devs,
> > >
> > > I would like to start a discussion on FLIP-XXX: Independent Checkpoint
> > Based On Pipeline Region.
> > >
> > > In high-parallelism streaming jobs, a single Task's checkpoint failure
> > causes the entire global Checkpoint to abort, leading to degraded
> > checkpoint success rates and wasted compute resources (especially for GPU
> > operators).
> > >
> > > We propose Regional Checkpoint: when some Regions fail to checkpoint,
> > the framework combines the historical state of the failed Regions with
> the
> > current state of the healthy Regions to produce a logically complete
> > Completed Checkpoint — while preserving state consistency. The key
> changes
> > are:
> > >
> > > 1. Snapshot Collection — Allow partial region failures; combine last
> > successful state of failed Regions with current state of normal Regions.
> > >
> > > 2. State Correction — New checkpointCoordinatorForRegionFallback
> > interface for OperatorCoordinators to produce consistent snapshots
> against
> > the mixed view.
> > >
> > > 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
> > premature cleanup of referenced historical checkpoints.
> > >
> > > The detailed design is described in the FLIP document:
> > >
> >
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
> > >
> > > Looking forward to your feedback!
> > >
> > > Best regards,
> > >
> > > Raorao Xiong
> >
> >
>

Reply via email to