Hi Zihao and Gen,

Thank you for the thoughtful questions! Here are my responses.

1. Should CheckpointListener be extended to carry region-level information?

In the current design, when a Regional Checkpoint completes:

Healthy region tasks: receive notifyCheckpointComplete(checkpointId) — they can 
safely commit external transactions.
Failed region tasks: receive nothing — they are undergoing Region Failover 
(already cancelled/restarting) and cannot receive any notification.
OperatorCoordinators: have already been corrected via 
checkpointCoordinatorForRegionFallback before receiving 
notifyCheckpointComplete.
I share the concern about potential misunderstandings, so we should extend the 
interface with a parameter that indicates Regional Checkpoint information. Here 
is the proposed extension:

// In CheckpointListener (default method, backward compatible)
default void notifyCheckpointComplete(long checkpointId, RegionalCheckpointInfo 
info) {
    notifyCheckpointComplete(checkpointId);
}

// Regional checkpoint info — provides global regional checkpoint context to 
the listener
// so it can distinguish between a global checkpoint and a regional checkpoint.
public class RegionalCheckpointInfo {
    // (old) checkpointId -> execution vertices using that old checkpointId
    private Map> regionalExecutionVertexMap;
    // Other properties for future extension.
}
As a result, listeners can distinguish between global checkpoints and regional 
checkpoints.

2. Task that fails to acknowledge for a long period (e.g., transient network 
issue)

This is a good question. After further analysis, I believe Regional Checkpoint 
can cover the timeout scenario as well. Here is the reasoning:

Current behavior (without Regional Checkpoint): When a checkpoint expires, 
CheckpointCanceller triggers abortPendingCheckpoint — the entire checkpoint is 
discarded regardless of how many tasks have already acknowledged.
Proposed enhancement — Timeout-Aware Regional Checkpoint: When the checkpoint 
timer expires and Regional Checkpoint is enabled, we can treat un-acknowledged 
tasks as failed tasks and apply the same regional logic.
3. Correctness Argument — Why merging states from different checkpoints is valid

As you mentioned, a snapshot is valid if it matches some instantaneous state 
that could actually occur during execution.

Regional Checkpoint operates at the Region granularity, allowing states from 
different checkpoint IDs to be freely composed while still maintaining 
consistency. This is possible due to the following conditions:

Data isolation between Pipeline Regions: Barriers emitted by source operators 
and business data do not flow across region boundaries. Therefore, it is 
sufficient to ensure that all operators within a single region have their state 
completed under the same checkpoint ID. Upon recovery, consistency within each 
region is guaranteed independently.
State isolation between Pipeline Regions: Flink operator state is typically 
managed on the TaskManager side. The exception is OperatorCoordinator, whose 
state is generally used to coordinate across different subtask instances of the 
same operator. This requires additional state correction during Regional 
Checkpoint — the logic for which is described in the FLIP document.
4. Should failed Regions receive a checkpointAborted notification for 
checkpoint N?

I believe there is no need to notify failed regions with a checkpoint-aborted 
message. Here's why:

First, we extend the notifyCheckpointComplete interface with a new 
RegionalCheckpointInfo parameter, which includes a global mapping of failed 
operators to their fallback checkpoint IDs. Developers can use this information 
to handle regional checkpoints appropriately.
Second, the failed region tasks may be undergoing failover and their execution 
attempts have been cancelled. If there is no region failover (i.e., just an 
expired checkpoint acknowledgment), it is better to skip checkpoint 100 and 
wait for the next checkpoint 101 to complete, rather than sending a 
checkpoint-aborted notification.
Third, it has no effect on data consistency whether or not a checkpoint-aborted 
notification is sent. If the task undergoes failover, it will recover from 
checkpoint 99. Otherwise, it will continue running and wait for the next 
checkpoint to complete or abort (in case of a global checkpoint failure).
5. Ecosystem Compatibility — Connectors and external OperatorCoordinators

This is a very good question — we should ensure that most Flink connectors work 
well with this feature. For the Kafka connector, I believe it will support 
Regional Checkpoint. Here is the reasoning:

It uses the standard FLIP-27 Source API with KafkaSourceEnumerator and 
KafkaSourceReader.
Offsets are tracked in KafkaPartitionSplit (operator state), not in coordinator 
state.
addSplitsBack() correctly moves partitions back to unassignedSplits for 
re-assignment.
In unbounded mode, periodic partition discovery continues normally after 
recovery.
The only minor observation: if COMMIT_OFFSETS_ON_CHECKPOINT is enabled, healthy 
regions commit their Kafka consumer group offsets while failed regions don't. 
This causes a brief consumer-lag reporting inconsistency in external monitoring 
tools, but has no correctness impact since Flink always recovers from its own 
state, not from Kafka broker offsets.
The flink-cdc, flink-paimon, and flink-fluss connectors all use the standard 
FLIP-27 Source, so their SourceCoordinator will support Regional Checkpoint. In 
the current latest code, Paimon and Fluss implement custom OperatorCoordinators 
(WriteOperatorCoordinator and DataStatisticsCoordinator), but these are used 
only for operator coordination and do not persist any checkpoint state, so they 
should be compatible with Regional Checkpoint.

Of course, other common connectors will be verified and supported in subsequent 
work. The conten will be updated in the FLIP document.

Looking forward to your feedback!

Best regards,

Raorao Xiong


> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
> 
> Hi devs,
> 
> I would like to start a discussion on FLIP-XXX: Independent Checkpoint Based 
> On Pipeline Region.
> 
> In high-parallelism streaming jobs, a single Task's checkpoint failure causes 
> the entire global Checkpoint to abort, leading to degraded checkpoint success 
> rates and wasted compute resources (especially for GPU operators).
> 
> We propose Regional Checkpoint: when some Regions fail to checkpoint, the 
> framework combines the historical state of the failed Regions with the 
> current state of the healthy Regions to produce a logically complete 
> Completed Checkpoint — while preserving state consistency. The key changes 
> are:
> 
> 1. Snapshot Collection — Allow partial region failures; combine last 
> successful state of failed Regions with current state of normal Regions.
> 
> 2. State Correction — New checkpointCoordinatorForRegionFallback interface 
> for OperatorCoordinators to produce consistent snapshots against the mixed 
> view.
> 
> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent 
> premature cleanup of referenced historical checkpoints.
> 
> The detailed design is described in the FLIP document: 
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
> 
> Looking forward to your feedback!
> 
> Best regards,
> 
> Raorao Xiong

Reply via email to