Thanks Raorao for driving this regional based checkpoint, and thanks everyone for the discussion and Roman's ping.
Regarding FLIP-306 (Unified File Merging Mechanism for Checkpoints) [1][2] and FLINK-26803 (Merge small ChannelState file for Unaligned Checkpoint)[3][4]. IIRC, to make FLINK-26803 work with regional checkpoints, its file sharing would might need to change from the current job/TM granularity to region granularity, because multiple tasks share the same file and a failure of any one of them fails the channel state writing of all the others, so we have to narrow the sharing scope to a single region to avoid that. But stepping back, I'm wondering whether we can just deprecate FLINK-26803, since the Unified File Merging Mechanism (FLIP-306) has already been introduced. If FLIP-306 is stable and fully covers channel state, this FLIP wouldn't need to handle FLINK-26803's granularity at all. That brings it back to a question for the FLIP-306 authors — whether channel state support there is complete and stable. I raised the same concern (a `// TODO support channel state restore` with no JIRA tracking it) here[5] Regards, Rui [1] https://issues.apache.org/jira/browse/FLINK-32070 [2] https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/config/#execution-checkpointing-file-merging-enabled [3] https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/config/#execution-checkpointing-unaligned-interruptible-timers-enabled [4] https://issues.apache.org/jira/browse/FLINK-26803 [5] https://issues.apache.org/jira/browse/FLINK-32070?focusedCommentId=18087875&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-18087875 On Mon, Jun 8, 2026 at 1:46 PM Roman Khachatryan <[email protected]> wrote: > Hi, thanks for the proposal! > It makes a lot of sense to me. > > I have a few questions: > > 1. Per my understanding, regions are not always independent; they can be > connected via BLOCKING/HYBRID exchanges - even in Streaming. > So the proposal should be limited to embarrassingly parallel regions only > (and verify that at runtime). > > 2. What actually happens when max-consecutive-failures are exceeded - does > it trigger checkpoint failure or job failure? > > 3. Can you please describe the proposed layout of checkpoint metadata? > > 4. Can you please describe in more detail the recovery procedure (JM side)? > I'm concerned about loading >1 checkpoint metadata files and restoring > ByteStreamStateHandles. > > 5. How exactly are we going to keep the old state, in particular in the > SharedStateRegistry? > Are we going to re-register it or just keep `max-consecutive-failures` > checkpoints alive? > > 6. Checkpoint abortion notifications > I can see why we don't want to send those (mostly because unforeseen side > effects in external systems - as a whole checkpoint is neither aborted nor > completed). > But on the other hand, TMs might not release some resources and/or > accumulate garbage as a result. > Would it make sense to introduce a new notification type? > > 7. Local Recovery cleanup > In particular, will TaskLocalStateStore cleanup work as expected? > > 8. What happens if a task never acknowledges nor declines the checkpoint? > I'm wondering whether it makes sense to have per-region checkpoint > timeouts. > That would "regionalize" checkpoint timeouts similar to other checkpoint > failures. > > 9. How would this feature work with FLIP-306 (Unified File Merging > Mechanism for Checkpoints) and FLINK-26803 (Merge small ChannelState file > for Unaligned Checkpoint) > Per my understanding, tasks from different regions may end up on the same > TM; so we may end up deleting shared files? > > 9. How would this feature work with FLINK-26803 (Merge small ChannelState > file for Unaligned Checkpoint) > Per my understanding, a shared channel state writer would fail the entire > checkpoint anyways. > cc @Rui Fan > > 11. How would this feature work with finished operators? What if a region > has some finished operators? > For example, if we skip checkpoint completion notification for a partially > completed checkpoint; > but that checkpoint turns out to be the last one? > > 12. NO_CLAIM mode - do we need to warn users that they can't delete the > original checkpoint, even after the completion of new checkpoints? > > 13. Is changelog state backend supported? > I guess not, because it has its own lifecycle for state. > > 14. How would this feature work with checkpointing during recovery? > (FLIP-547) > I don't see any issues but it would be great to confirm. > cc @Rui Fan > > Regards, > Roman > > > On Thu, Jun 4, 2026 at 5:04 PM 熊饶饶 <[email protected]> wrote: > > > Hi Zihao and Gen, > > > > Thank you for the thoughtful questions! Here are my responses. > > > > 1. Should CheckpointListener be extended to carry region-level > information? > > > > In the current design, when a Regional Checkpoint completes: > > > > Healthy region tasks: receive notifyCheckpointComplete(checkpointId) — > > they can safely commit external transactions. > > Failed region tasks: receive nothing — they are undergoing Region > Failover > > (already cancelled/restarting) and cannot receive any notification. > > OperatorCoordinators: have already been corrected via > > checkpointCoordinatorForRegionFallback before receiving > > notifyCheckpointComplete. > > I share the concern about potential misunderstandings, so we should > extend > > the interface with a parameter that indicates Regional Checkpoint > > information. Here is the proposed extension: > > > > // In CheckpointListener (default method, backward compatible) > > default void notifyCheckpointComplete(long checkpointId, > > RegionalCheckpointInfo info) { > > notifyCheckpointComplete(checkpointId); > > } > > > > // Regional checkpoint info — provides global regional checkpoint context > > to the listener > > // so it can distinguish between a global checkpoint and a regional > > checkpoint. > > public class RegionalCheckpointInfo { > > // (old) checkpointId -> execution vertices using that old > checkpointId > > private Map> regionalExecutionVertexMap; > > // Other properties for future extension. > > } > > As a result, listeners can distinguish between global checkpoints and > > regional checkpoints. > > > > 2. Task that fails to acknowledge for a long period (e.g., transient > > network issue) > > > > This is a good question. After further analysis, I believe Regional > > Checkpoint can cover the timeout scenario as well. Here is the reasoning: > > > > Current behavior (without Regional Checkpoint): When a checkpoint > expires, > > CheckpointCanceller triggers abortPendingCheckpoint — the entire > checkpoint > > is discarded regardless of how many tasks have already acknowledged. > > Proposed enhancement — Timeout-Aware Regional Checkpoint: When the > > checkpoint timer expires and Regional Checkpoint is enabled, we can treat > > un-acknowledged tasks as failed tasks and apply the same regional logic. > > 3. Correctness Argument — Why merging states from different checkpoints > is > > valid > > > > As you mentioned, a snapshot is valid if it matches some instantaneous > > state that could actually occur during execution. > > > > Regional Checkpoint operates at the Region granularity, allowing states > > from different checkpoint IDs to be freely composed while still > maintaining > > consistency. This is possible due to the following conditions: > > > > Data isolation between Pipeline Regions: Barriers emitted by source > > operators and business data do not flow across region boundaries. > > Therefore, it is sufficient to ensure that all operators within a single > > region have their state completed under the same checkpoint ID. Upon > > recovery, consistency within each region is guaranteed independently. > > State isolation between Pipeline Regions: Flink operator state is > > typically managed on the TaskManager side. The exception is > > OperatorCoordinator, whose state is generally used to coordinate across > > different subtask instances of the same operator. This requires > additional > > state correction during Regional Checkpoint — the logic for which is > > described in the FLIP document. > > 4. Should failed Regions receive a checkpointAborted notification for > > checkpoint N? > > > > I believe there is no need to notify failed regions with a > > checkpoint-aborted message. Here's why: > > > > First, we extend the notifyCheckpointComplete interface with a new > > RegionalCheckpointInfo parameter, which includes a global mapping of > failed > > operators to their fallback checkpoint IDs. Developers can use this > > information to handle regional checkpoints appropriately. > > Second, the failed region tasks may be undergoing failover and their > > execution attempts have been cancelled. If there is no region failover > > (i.e., just an expired checkpoint acknowledgment), it is better to skip > > checkpoint 100 and wait for the next checkpoint 101 to complete, rather > > than sending a checkpoint-aborted notification. > > Third, it has no effect on data consistency whether or not a > > checkpoint-aborted notification is sent. If the task undergoes failover, > it > > will recover from checkpoint 99. Otherwise, it will continue running and > > wait for the next checkpoint to complete or abort (in case of a global > > checkpoint failure). > > 5. Ecosystem Compatibility — Connectors and external OperatorCoordinators > > > > This is a very good question — we should ensure that most Flink > connectors > > work well with this feature. For the Kafka connector, I believe it will > > support Regional Checkpoint. Here is the reasoning: > > > > It uses the standard FLIP-27 Source API with KafkaSourceEnumerator and > > KafkaSourceReader. > > Offsets are tracked in KafkaPartitionSplit (operator state), not in > > coordinator state. > > addSplitsBack() correctly moves partitions back to unassignedSplits for > > re-assignment. > > In unbounded mode, periodic partition discovery continues normally after > > recovery. > > The only minor observation: if COMMIT_OFFSETS_ON_CHECKPOINT is enabled, > > healthy regions commit their Kafka consumer group offsets while failed > > regions don't. This causes a brief consumer-lag reporting inconsistency > in > > external monitoring tools, but has no correctness impact since Flink > always > > recovers from its own state, not from Kafka broker offsets. > > The flink-cdc, flink-paimon, and flink-fluss connectors all use the > > standard FLIP-27 Source, so their SourceCoordinator will support Regional > > Checkpoint. In the current latest code, Paimon and Fluss implement custom > > OperatorCoordinators (WriteOperatorCoordinator and > > DataStatisticsCoordinator), but these are used only for operator > > coordination and do not persist any checkpoint state, so they should be > > compatible with Regional Checkpoint. > > > > Of course, other common connectors will be verified and supported in > > subsequent work. The conten will be updated in the FLIP document. > > > > Looking forward to your feedback! > > > > Best regards, > > > > Raorao Xiong > > > > > > > 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道: > > > > > > Hi devs, > > > > > > I would like to start a discussion on FLIP-XXX: Independent Checkpoint > > Based On Pipeline Region. > > > > > > In high-parallelism streaming jobs, a single Task's checkpoint failure > > causes the entire global Checkpoint to abort, leading to degraded > > checkpoint success rates and wasted compute resources (especially for GPU > > operators). > > > > > > We propose Regional Checkpoint: when some Regions fail to checkpoint, > > the framework combines the historical state of the failed Regions with > the > > current state of the healthy Regions to produce a logically complete > > Completed Checkpoint — while preserving state consistency. The key > changes > > are: > > > > > > 1. Snapshot Collection — Allow partial region failures; combine last > > successful state of failed Regions with current state of normal Regions. > > > > > > 2. State Correction — New checkpointCoordinatorForRegionFallback > > interface for OperatorCoordinators to produce consistent snapshots > against > > the mixed view. > > > > > > 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent > > premature cleanup of referenced historical checkpoints. > > > > > > The detailed design is described in the FLIP document: > > > > > > https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing > > > > > > Looking forward to your feedback! > > > > > > Best regards, > > > > > > Raorao Xiong > > > > >
