Hi, thanks for the proposal!
It makes a lot of sense to me.

I have a few questions:

1. Per my understanding, regions are not always independent; they can be
connected via BLOCKING/HYBRID exchanges - even in Streaming.
So the proposal should be limited to embarrassingly parallel regions only
(and verify that at runtime).

2. What actually happens when max-consecutive-failures are exceeded - does
it trigger checkpoint failure or job failure?

3. Can you please describe the proposed layout of checkpoint metadata?

4. Can you please describe in more detail the recovery procedure (JM side)?
I'm concerned about loading >1 checkpoint metadata files and restoring
ByteStreamStateHandles.

5. How exactly are we going to keep the old state, in particular in the
SharedStateRegistry?
Are we going to re-register it or just keep `max-consecutive-failures`
checkpoints alive?

6. Checkpoint abortion notifications
I can see why we don't want to send those (mostly because unforeseen side
effects in external systems - as a whole checkpoint is neither aborted nor
completed).
But on the other hand, TMs might not release some resources and/or
accumulate garbage as a result.
Would it make sense to introduce a new notification type?

7. Local Recovery cleanup
In particular, will TaskLocalStateStore cleanup work as expected?

8. What happens if a task never acknowledges nor declines the checkpoint?
I'm wondering whether it makes sense to have per-region checkpoint timeouts.
That would "regionalize" checkpoint timeouts similar to other checkpoint
failures.

9. How would this feature work with FLIP-306 (Unified File Merging
Mechanism for Checkpoints) and FLINK-26803 (Merge small ChannelState file
for Unaligned Checkpoint)
Per my understanding, tasks from different regions may end up on the same
TM; so we may end up deleting shared files?

9. How would this feature work with FLINK-26803 (Merge small ChannelState
file for Unaligned Checkpoint)
Per my understanding, a shared channel state writer would fail the entire
checkpoint anyways.
cc @Rui Fan

11. How would this feature work with finished operators? What if a region
has some finished operators?
For example, if we skip checkpoint completion notification for a partially
completed checkpoint;
but that checkpoint turns out to be the last one?

12. NO_CLAIM mode - do we need to warn users that they can't delete the
original checkpoint, even after the completion of new checkpoints?

13. Is changelog state backend supported?
I guess not, because it has its own lifecycle for state.

14. How would this feature work with checkpointing during recovery?
(FLIP-547)
I don't see any issues but it would be great to confirm.
cc @Rui Fan

Regards,
Roman


On Thu, Jun 4, 2026 at 5:04 PM 熊饶饶 <[email protected]> wrote:

> Hi Zihao and Gen,
>
> Thank you for the thoughtful questions! Here are my responses.
>
> 1. Should CheckpointListener be extended to carry region-level information?
>
> In the current design, when a Regional Checkpoint completes:
>
> Healthy region tasks: receive notifyCheckpointComplete(checkpointId) —
> they can safely commit external transactions.
> Failed region tasks: receive nothing — they are undergoing Region Failover
> (already cancelled/restarting) and cannot receive any notification.
> OperatorCoordinators: have already been corrected via
> checkpointCoordinatorForRegionFallback before receiving
> notifyCheckpointComplete.
> I share the concern about potential misunderstandings, so we should extend
> the interface with a parameter that indicates Regional Checkpoint
> information. Here is the proposed extension:
>
> // In CheckpointListener (default method, backward compatible)
> default void notifyCheckpointComplete(long checkpointId,
> RegionalCheckpointInfo info) {
>     notifyCheckpointComplete(checkpointId);
> }
>
> // Regional checkpoint info — provides global regional checkpoint context
> to the listener
> // so it can distinguish between a global checkpoint and a regional
> checkpoint.
> public class RegionalCheckpointInfo {
>     // (old) checkpointId -> execution vertices using that old checkpointId
>     private Map> regionalExecutionVertexMap;
>     // Other properties for future extension.
> }
> As a result, listeners can distinguish between global checkpoints and
> regional checkpoints.
>
> 2. Task that fails to acknowledge for a long period (e.g., transient
> network issue)
>
> This is a good question. After further analysis, I believe Regional
> Checkpoint can cover the timeout scenario as well. Here is the reasoning:
>
> Current behavior (without Regional Checkpoint): When a checkpoint expires,
> CheckpointCanceller triggers abortPendingCheckpoint — the entire checkpoint
> is discarded regardless of how many tasks have already acknowledged.
> Proposed enhancement — Timeout-Aware Regional Checkpoint: When the
> checkpoint timer expires and Regional Checkpoint is enabled, we can treat
> un-acknowledged tasks as failed tasks and apply the same regional logic.
> 3. Correctness Argument — Why merging states from different checkpoints is
> valid
>
> As you mentioned, a snapshot is valid if it matches some instantaneous
> state that could actually occur during execution.
>
> Regional Checkpoint operates at the Region granularity, allowing states
> from different checkpoint IDs to be freely composed while still maintaining
> consistency. This is possible due to the following conditions:
>
> Data isolation between Pipeline Regions: Barriers emitted by source
> operators and business data do not flow across region boundaries.
> Therefore, it is sufficient to ensure that all operators within a single
> region have their state completed under the same checkpoint ID. Upon
> recovery, consistency within each region is guaranteed independently.
> State isolation between Pipeline Regions: Flink operator state is
> typically managed on the TaskManager side. The exception is
> OperatorCoordinator, whose state is generally used to coordinate across
> different subtask instances of the same operator. This requires additional
> state correction during Regional Checkpoint — the logic for which is
> described in the FLIP document.
> 4. Should failed Regions receive a checkpointAborted notification for
> checkpoint N?
>
> I believe there is no need to notify failed regions with a
> checkpoint-aborted message. Here's why:
>
> First, we extend the notifyCheckpointComplete interface with a new
> RegionalCheckpointInfo parameter, which includes a global mapping of failed
> operators to their fallback checkpoint IDs. Developers can use this
> information to handle regional checkpoints appropriately.
> Second, the failed region tasks may be undergoing failover and their
> execution attempts have been cancelled. If there is no region failover
> (i.e., just an expired checkpoint acknowledgment), it is better to skip
> checkpoint 100 and wait for the next checkpoint 101 to complete, rather
> than sending a checkpoint-aborted notification.
> Third, it has no effect on data consistency whether or not a
> checkpoint-aborted notification is sent. If the task undergoes failover, it
> will recover from checkpoint 99. Otherwise, it will continue running and
> wait for the next checkpoint to complete or abort (in case of a global
> checkpoint failure).
> 5. Ecosystem Compatibility — Connectors and external OperatorCoordinators
>
> This is a very good question — we should ensure that most Flink connectors
> work well with this feature. For the Kafka connector, I believe it will
> support Regional Checkpoint. Here is the reasoning:
>
> It uses the standard FLIP-27 Source API with KafkaSourceEnumerator and
> KafkaSourceReader.
> Offsets are tracked in KafkaPartitionSplit (operator state), not in
> coordinator state.
> addSplitsBack() correctly moves partitions back to unassignedSplits for
> re-assignment.
> In unbounded mode, periodic partition discovery continues normally after
> recovery.
> The only minor observation: if COMMIT_OFFSETS_ON_CHECKPOINT is enabled,
> healthy regions commit their Kafka consumer group offsets while failed
> regions don't. This causes a brief consumer-lag reporting inconsistency in
> external monitoring tools, but has no correctness impact since Flink always
> recovers from its own state, not from Kafka broker offsets.
> The flink-cdc, flink-paimon, and flink-fluss connectors all use the
> standard FLIP-27 Source, so their SourceCoordinator will support Regional
> Checkpoint. In the current latest code, Paimon and Fluss implement custom
> OperatorCoordinators (WriteOperatorCoordinator and
> DataStatisticsCoordinator), but these are used only for operator
> coordination and do not persist any checkpoint state, so they should be
> compatible with Regional Checkpoint.
>
> Of course, other common connectors will be verified and supported in
> subsequent work. The conten will be updated in the FLIP document.
>
> Looking forward to your feedback!
>
> Best regards,
>
> Raorao Xiong
>
>
> > 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
> >
> > Hi devs,
> >
> > I would like to start a discussion on FLIP-XXX: Independent Checkpoint
> Based On Pipeline Region.
> >
> > In high-parallelism streaming jobs, a single Task's checkpoint failure
> causes the entire global Checkpoint to abort, leading to degraded
> checkpoint success rates and wasted compute resources (especially for GPU
> operators).
> >
> > We propose Regional Checkpoint: when some Regions fail to checkpoint,
> the framework combines the historical state of the failed Regions with the
> current state of the healthy Regions to produce a logically complete
> Completed Checkpoint — while preserving state consistency. The key changes
> are:
> >
> > 1. Snapshot Collection — Allow partial region failures; combine last
> successful state of failed Regions with current state of normal Regions.
> >
> > 2. State Correction — New checkpointCoordinatorForRegionFallback
> interface for OperatorCoordinators to produce consistent snapshots against
> the mixed view.
> >
> > 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
> premature cleanup of referenced historical checkpoints.
> >
> > The detailed design is described in the FLIP document:
> >
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
> >
> > Looking forward to your feedback!
> >
> > Best regards,
> >
> > Raorao Xiong
>
>

Reply via email to