Hi Roman and Rui, Thanks again for the thorough review. Let me address each point: 1. Region independence — BLOCKING/HYBRID edges You’re right. Our current scope is limited to embarrassingly parallel regions. In typical ETL scenarios, each parallelism maps to an independent Region with no edges connecting them. 2. max-consecutive-failures exceeded — what exactly happens? The current design says “force a global checkpoint.” To clarify the two-tier behavior: ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the next checkpoint is forced to be global. ● Tier 2: If that forced global checkpoint also fails (any task declines), the checkpoint is aborted normally (not a job failure). The counter is then reset since a global checkpoint was attempted, and the next checkpoint cycle can try again. This avoids cascading into job failure while ensuring we don’t drift indefinitely on historical state. 3. Checkpoint metadata layout Regional Checkpoint recombines state from different checkpoint IDs. To track this, we add a refCheckpointId field to OperatorSubtaskState in the metadata, indicating which historical checkpoint a subtask’s state references. 4. JM recovery procedure In fact, the operator state handles are already updated during the Regional Checkpoint completion phase. The generated checkpoint metadata may contain state handles originating from historical checkpoints, but we guarantee the internal consistency of the combined state. Therefore, JM recovery only needs to read a single checkpoint metadata file. 5. SharedStateRegistry — how are old states kept alive? Good question. In the current design, since we only target embarrassingly parallel regions, there is typically no keyed state and no incremental state. As a result, the SharedStateRegistry is generally empty (setting aside File Merging and Changelog State for now, discussed on 8.), so keep-alive of files under the shared directory is not a concern. 6. Checkpoint abort notifications & Local Recovery cleanup — new notification type This is a very insightful point. Zihao and Gen also raised this in earlier discussions. The current design doesn’t address state cleanup for tasks in failed regions. I agree it’s necessary to introduce a new notification type. For tasks in failed regions, local state cleanup can be deferred until the next checkpoint trigger. 7. Task that never acknowledges nor declines — per-region timeouts This was discussed in the previous thread. Network issues may cause a task to neither ack nor decline in time. In such cases, we treat it as a checkpoint timeout: the affected tasks’ region is marked as failed, and the process ultimately falls through to the normalRegional Checkpoint processing logic. 8. FLINK-26803 and FLIP-306 compatibility This is a very important point. Both features essentially merge small files at the job level. As Rui Fan pointed out, if the merging granularity is reduced to the Region level, compatibility with Regional Checkpoint should be achievable in theory. I think this can be deferred to future work — once FLINK-26803 is consolidated into FLIP-306, we can revisit and enable support. 9. Finished operators The concern is: a finished operator’s final commit notification gets skipped by Regional Checkpoint, and if this checkpoint is the last one, the operator never receives it — could this cause data loss? In practice, the impact is limited: ● Failed Region tasks are already gone: By the time the Regional Checkpoint completes, tasks in the failed Region have already been restarted (decline) or cancelled (timeout). There is no task left to receive the notification anyway. ● maxConsecutiveFailures guarantees a global checkpoint: After reaching the limit, the next checkpoint is forced to be global, ensuring all tasks eventually receive notifyCheckpointComplete. We can’t skip the same Region forever. ● stop-with-savepoint bypasses Regional Checkpoint: When the user stops the job gracefully, it triggers a full global snapshot, not a Regional Checkpoint. So the final checkpoint is always complete. 10. NO_CLAIM mode warning You’re absolutely right — this is an important reminder. After restoring from a Regional Checkpoint, only a successful global checkpoint guarantees independence from the old state. We’ll add a clear user warning in the documentation. 11. Changelog state backend — not supported As mentioned earlier, our primary target is embarrassingly parallel regions, which typically have no keyed state and therefore no slow incremental state flush issues. I don’t think we need to support Changelog state backend for now. 12. FLIP-547 (Checkpointing during recovery) I took a quick look at FLIP-547’s implementation. I don’t see any fundamental conflict with Regional Checkpoint — it doesn’t introduce cross-region state, so the two should be compatible in theory. cc @Rui Fan Looking forward to your feedback. Regards, Raorao
> 2026年6月10日 18:05,Rui Fan <[email protected]> 写道: > > Thanks Raorao for driving this regional based checkpoint, > and thanks everyone for the discussion and Roman's ping. > > Regarding FLIP-306 (Unified File Merging Mechanism for Checkpoints) [1][2] > and FLINK-26803 (Merge small ChannelState file for Unaligned > Checkpoint)[3][4]. > > IIRC, to make FLINK-26803 work with regional checkpoints, its file sharing > would might need to change from the current job/TM granularity to > region granularity, because multiple tasks share the same file and a > failure of any one of them fails the channel state writing of all the > others, > so we have to narrow the sharing scope to a single region to avoid that. > > But stepping back, I'm wondering whether we can just deprecate FLINK-26803, > since the Unified File Merging Mechanism (FLIP-306) has already been > introduced. > If FLIP-306 is stable and fully covers channel state, this FLIP wouldn't > need to handle > FLINK-26803's granularity at all. > > That brings it back to a question for the FLIP-306 authors — whether > channel state support there is complete and stable. I raised the same > concern > (a `// TODO support channel state restore` with no JIRA tracking it) here[5] > > > Regards, > Rui > > [1] https://issues.apache.org/jira/browse/FLINK-32070 > [2] > https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/config/#execution-checkpointing-file-merging-enabled > [3] > https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/config/#execution-checkpointing-unaligned-interruptible-timers-enabled > [4] https://issues.apache.org/jira/browse/FLINK-26803 > [5] > https://issues.apache.org/jira/browse/FLINK-32070?focusedCommentId=18087875&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-18087875 > > On Mon, Jun 8, 2026 at 1:46 PM Roman Khachatryan <[email protected]> wrote: > >> Hi, thanks for the proposal! >> It makes a lot of sense to me. >> >> I have a few questions: >> >> 1. Per my understanding, regions are not always independent; they can be >> connected via BLOCKING/HYBRID exchanges - even in Streaming. >> So the proposal should be limited to embarrassingly parallel regions only >> (and verify that at runtime). >> >> 2. What actually happens when max-consecutive-failures are exceeded - does >> it trigger checkpoint failure or job failure? >> >> 3. Can you please describe the proposed layout of checkpoint metadata? >> >> 4. Can you please describe in more detail the recovery procedure (JM side)? >> I'm concerned about loading >1 checkpoint metadata files and restoring >> ByteStreamStateHandles. >> >> 5. How exactly are we going to keep the old state, in particular in the >> SharedStateRegistry? >> Are we going to re-register it or just keep `max-consecutive-failures` >> checkpoints alive? >> >> 6. Checkpoint abortion notifications >> I can see why we don't want to send those (mostly because unforeseen side >> effects in external systems - as a whole checkpoint is neither aborted nor >> completed). >> But on the other hand, TMs might not release some resources and/or >> accumulate garbage as a result. >> Would it make sense to introduce a new notification type? >> >> 7. Local Recovery cleanup >> In particular, will TaskLocalStateStore cleanup work as expected? >> >> 8. What happens if a task never acknowledges nor declines the checkpoint? >> I'm wondering whether it makes sense to have per-region checkpoint >> timeouts. >> That would "regionalize" checkpoint timeouts similar to other checkpoint >> failures. >> >> 9. How would this feature work with FLIP-306 (Unified File Merging >> Mechanism for Checkpoints) and FLINK-26803 (Merge small ChannelState file >> for Unaligned Checkpoint) >> Per my understanding, tasks from different regions may end up on the same >> TM; so we may end up deleting shared files? >> >> 9. How would this feature work with FLINK-26803 (Merge small ChannelState >> file for Unaligned Checkpoint) >> Per my understanding, a shared channel state writer would fail the entire >> checkpoint anyways. >> cc @Rui Fan >> >> 11. How would this feature work with finished operators? What if a region >> has some finished operators? >> For example, if we skip checkpoint completion notification for a partially >> completed checkpoint; >> but that checkpoint turns out to be the last one? >> >> 12. NO_CLAIM mode - do we need to warn users that they can't delete the >> original checkpoint, even after the completion of new checkpoints? >> >> 13. Is changelog state backend supported? >> I guess not, because it has its own lifecycle for state. >> >> 14. How would this feature work with checkpointing during recovery? >> (FLIP-547) >> I don't see any issues but it would be great to confirm. >> cc @Rui Fan >> >> Regards, >> Roman >> >> >> On Thu, Jun 4, 2026 at 5:04 PM 熊饶饶 <[email protected]> wrote: >> >>> Hi Zihao and Gen, >>> >>> Thank you for the thoughtful questions! Here are my responses. >>> >>> 1. Should CheckpointListener be extended to carry region-level >> information? >>> >>> In the current design, when a Regional Checkpoint completes: >>> >>> Healthy region tasks: receive notifyCheckpointComplete(checkpointId) — >>> they can safely commit external transactions. >>> Failed region tasks: receive nothing — they are undergoing Region >> Failover >>> (already cancelled/restarting) and cannot receive any notification. >>> OperatorCoordinators: have already been corrected via >>> checkpointCoordinatorForRegionFallback before receiving >>> notifyCheckpointComplete. >>> I share the concern about potential misunderstandings, so we should >> extend >>> the interface with a parameter that indicates Regional Checkpoint >>> information. Here is the proposed extension: >>> >>> // In CheckpointListener (default method, backward compatible) >>> default void notifyCheckpointComplete(long checkpointId, >>> RegionalCheckpointInfo info) { >>> notifyCheckpointComplete(checkpointId); >>> } >>> >>> // Regional checkpoint info — provides global regional checkpoint context >>> to the listener >>> // so it can distinguish between a global checkpoint and a regional >>> checkpoint. >>> public class RegionalCheckpointInfo { >>> // (old) checkpointId -> execution vertices using that old >> checkpointId >>> private Map> regionalExecutionVertexMap; >>> // Other properties for future extension. >>> } >>> As a result, listeners can distinguish between global checkpoints and >>> regional checkpoints. >>> >>> 2. Task that fails to acknowledge for a long period (e.g., transient >>> network issue) >>> >>> This is a good question. After further analysis, I believe Regional >>> Checkpoint can cover the timeout scenario as well. Here is the reasoning: >>> >>> Current behavior (without Regional Checkpoint): When a checkpoint >> expires, >>> CheckpointCanceller triggers abortPendingCheckpoint — the entire >> checkpoint >>> is discarded regardless of how many tasks have already acknowledged. >>> Proposed enhancement — Timeout-Aware Regional Checkpoint: When the >>> checkpoint timer expires and Regional Checkpoint is enabled, we can treat >>> un-acknowledged tasks as failed tasks and apply the same regional logic. >>> 3. Correctness Argument — Why merging states from different checkpoints >> is >>> valid >>> >>> As you mentioned, a snapshot is valid if it matches some instantaneous >>> state that could actually occur during execution. >>> >>> Regional Checkpoint operates at the Region granularity, allowing states >>> from different checkpoint IDs to be freely composed while still >> maintaining >>> consistency. This is possible due to the following conditions: >>> >>> Data isolation between Pipeline Regions: Barriers emitted by source >>> operators and business data do not flow across region boundaries. >>> Therefore, it is sufficient to ensure that all operators within a single >>> region have their state completed under the same checkpoint ID. Upon >>> recovery, consistency within each region is guaranteed independently. >>> State isolation between Pipeline Regions: Flink operator state is >>> typically managed on the TaskManager side. The exception is >>> OperatorCoordinator, whose state is generally used to coordinate across >>> different subtask instances of the same operator. This requires >> additional >>> state correction during Regional Checkpoint — the logic for which is >>> described in the FLIP document. >>> 4. Should failed Regions receive a checkpointAborted notification for >>> checkpoint N? >>> >>> I believe there is no need to notify failed regions with a >>> checkpoint-aborted message. Here's why: >>> >>> First, we extend the notifyCheckpointComplete interface with a new >>> RegionalCheckpointInfo parameter, which includes a global mapping of >> failed >>> operators to their fallback checkpoint IDs. Developers can use this >>> information to handle regional checkpoints appropriately. >>> Second, the failed region tasks may be undergoing failover and their >>> execution attempts have been cancelled. If there is no region failover >>> (i.e., just an expired checkpoint acknowledgment), it is better to skip >>> checkpoint 100 and wait for the next checkpoint 101 to complete, rather >>> than sending a checkpoint-aborted notification. >>> Third, it has no effect on data consistency whether or not a >>> checkpoint-aborted notification is sent. If the task undergoes failover, >> it >>> will recover from checkpoint 99. Otherwise, it will continue running and >>> wait for the next checkpoint to complete or abort (in case of a global >>> checkpoint failure). >>> 5. Ecosystem Compatibility — Connectors and external OperatorCoordinators >>> >>> This is a very good question — we should ensure that most Flink >> connectors >>> work well with this feature. For the Kafka connector, I believe it will >>> support Regional Checkpoint. Here is the reasoning: >>> >>> It uses the standard FLIP-27 Source API with KafkaSourceEnumerator and >>> KafkaSourceReader. >>> Offsets are tracked in KafkaPartitionSplit (operator state), not in >>> coordinator state. >>> addSplitsBack() correctly moves partitions back to unassignedSplits for >>> re-assignment. >>> In unbounded mode, periodic partition discovery continues normally after >>> recovery. >>> The only minor observation: if COMMIT_OFFSETS_ON_CHECKPOINT is enabled, >>> healthy regions commit their Kafka consumer group offsets while failed >>> regions don't. This causes a brief consumer-lag reporting inconsistency >> in >>> external monitoring tools, but has no correctness impact since Flink >> always >>> recovers from its own state, not from Kafka broker offsets. >>> The flink-cdc, flink-paimon, and flink-fluss connectors all use the >>> standard FLIP-27 Source, so their SourceCoordinator will support Regional >>> Checkpoint. In the current latest code, Paimon and Fluss implement custom >>> OperatorCoordinators (WriteOperatorCoordinator and >>> DataStatisticsCoordinator), but these are used only for operator >>> coordination and do not persist any checkpoint state, so they should be >>> compatible with Regional Checkpoint. >>> >>> Of course, other common connectors will be verified and supported in >>> subsequent work. The conten will be updated in the FLIP document. >>> >>> Looking forward to your feedback! >>> >>> Best regards, >>> >>> Raorao Xiong >>> >>> >>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道: >>>> >>>> Hi devs, >>>> >>>> I would like to start a discussion on FLIP-XXX: Independent Checkpoint >>> Based On Pipeline Region. >>>> >>>> In high-parallelism streaming jobs, a single Task's checkpoint failure >>> causes the entire global Checkpoint to abort, leading to degraded >>> checkpoint success rates and wasted compute resources (especially for GPU >>> operators). >>>> >>>> We propose Regional Checkpoint: when some Regions fail to checkpoint, >>> the framework combines the historical state of the failed Regions with >> the >>> current state of the healthy Regions to produce a logically complete >>> Completed Checkpoint — while preserving state consistency. The key >> changes >>> are: >>>> >>>> 1. Snapshot Collection — Allow partial region failures; combine last >>> successful state of failed Regions with current state of normal Regions. >>>> >>>> 2. State Correction — New checkpointCoordinatorForRegionFallback >>> interface for OperatorCoordinators to produce consistent snapshots >> against >>> the mixed view. >>>> >>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent >>> premature cleanup of referenced historical checkpoints. >>>> >>>> The detailed design is described in the FLIP document: >>>> >>> >> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing >>>> >>>> Looking forward to your feedback! >>>> >>>> Best regards, >>>> >>>> Raorao Xiong >>> >>> >>
