Hi Zihao and Gen,
Thank you for the thoughtful questions! Here are my responses.
1. Should CheckpointListener be extended to carry region-level information?
In the current design, when a Regional Checkpoint completes:
Healthy region tasks: receive notifyCheckpointComplete(checkpointId) — they can
safely commit external transactions.
Failed region tasks: receive nothing — they are undergoing Region Failover
(already cancelled/restarting) and cannot receive any notification.
OperatorCoordinators: have already been corrected via
checkpointCoordinatorForRegionFallback before receiving
notifyCheckpointComplete.
I share the concern about potential misunderstandings, so we should extend the
interface with a parameter that indicates Regional Checkpoint information. Here
is the proposed extension:
// In CheckpointListener (default method, backward compatible)
default void notifyCheckpointComplete(long checkpointId, RegionalCheckpointInfo
info) {
notifyCheckpointComplete(checkpointId);
}
// Regional checkpoint info — provides global regional checkpoint context to
the listener
// so it can distinguish between a global checkpoint and a regional checkpoint.
public class RegionalCheckpointInfo {
// (old) checkpointId -> execution vertices using that old checkpointId
private Map> regionalExecutionVertexMap;
// Other properties for future extension.
}
As a result, listeners can distinguish between global checkpoints and regional
checkpoints.
2. Task that fails to acknowledge for a long period (e.g., transient network
issue)
This is a good question. After further analysis, I believe Regional Checkpoint
can cover the timeout scenario as well. Here is the reasoning:
Current behavior (without Regional Checkpoint): When a checkpoint expires,
CheckpointCanceller triggers abortPendingCheckpoint — the entire checkpoint is
discarded regardless of how many tasks have already acknowledged.
Proposed enhancement — Timeout-Aware Regional Checkpoint: When the checkpoint
timer expires and Regional Checkpoint is enabled, we can treat un-acknowledged
tasks as failed tasks and apply the same regional logic.
3. Correctness Argument — Why merging states from different checkpoints is valid
As you mentioned, a snapshot is valid if it matches some instantaneous state
that could actually occur during execution.
Regional Checkpoint operates at the Region granularity, allowing states from
different checkpoint IDs to be freely composed while still maintaining
consistency. This is possible due to the following conditions:
Data isolation between Pipeline Regions: Barriers emitted by source operators
and business data do not flow across region boundaries. Therefore, it is
sufficient to ensure that all operators within a single region have their state
completed under the same checkpoint ID. Upon recovery, consistency within each
region is guaranteed independently.
State isolation between Pipeline Regions: Flink operator state is typically
managed on the TaskManager side. The exception is OperatorCoordinator, whose
state is generally used to coordinate across different subtask instances of the
same operator. This requires additional state correction during Regional
Checkpoint — the logic for which is described in the FLIP document.
4. Should failed Regions receive a checkpointAborted notification for
checkpoint N?
I believe there is no need to notify failed regions with a checkpoint-aborted
message. Here's why:
First, we extend the notifyCheckpointComplete interface with a new
RegionalCheckpointInfo parameter, which includes a global mapping of failed
operators to their fallback checkpoint IDs. Developers can use this information
to handle regional checkpoints appropriately.
Second, the failed region tasks may be undergoing failover and their execution
attempts have been cancelled. If there is no region failover (i.e., just an
expired checkpoint acknowledgment), it is better to skip checkpoint 100 and
wait for the next checkpoint 101 to complete, rather than sending a
checkpoint-aborted notification.
Third, it has no effect on data consistency whether or not a checkpoint-aborted
notification is sent. If the task undergoes failover, it will recover from
checkpoint 99. Otherwise, it will continue running and wait for the next
checkpoint to complete or abort (in case of a global checkpoint failure).
5. Ecosystem Compatibility — Connectors and external OperatorCoordinators
This is a very good question — we should ensure that most Flink connectors work
well with this feature. For the Kafka connector, I believe it will support
Regional Checkpoint. Here is the reasoning:
It uses the standard FLIP-27 Source API with KafkaSourceEnumerator and
KafkaSourceReader.
Offsets are tracked in KafkaPartitionSplit (operator state), not in coordinator
state.
addSplitsBack() correctly moves partitions back to unassignedSplits for
re-assignment.
In unbounded mode, periodic partition discovery continues normally after
recovery.
The only minor observation: if COMMIT_OFFSETS_ON_CHECKPOINT is enabled, healthy
regions commit their Kafka consumer group offsets while failed regions don't.
This causes a brief consumer-lag reporting inconsistency in external monitoring
tools, but has no correctness impact since Flink always recovers from its own
state, not from Kafka broker offsets.
The flink-cdc, flink-paimon, and flink-fluss connectors all use the standard
FLIP-27 Source, so their SourceCoordinator will support Regional Checkpoint. In
the current latest code, Paimon and Fluss implement custom OperatorCoordinators
(WriteOperatorCoordinator and DataStatisticsCoordinator), but these are used
only for operator coordination and do not persist any checkpoint state, so they
should be compatible with Regional Checkpoint.
Of course, other common connectors will be verified and supported in subsequent
work. The conten will be updated in the FLIP document.
Looking forward to your feedback!
Best regards,
Raorao Xiong
> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
>
> Hi devs,
>
> I would like to start a discussion on FLIP-XXX: Independent Checkpoint Based
> On Pipeline Region.
>
> In high-parallelism streaming jobs, a single Task's checkpoint failure causes
> the entire global Checkpoint to abort, leading to degraded checkpoint success
> rates and wasted compute resources (especially for GPU operators).
>
> We propose Regional Checkpoint: when some Regions fail to checkpoint, the
> framework combines the historical state of the failed Regions with the
> current state of the healthy Regions to produce a logically complete
> Completed Checkpoint — while preserving state consistency. The key changes
> are:
>
> 1. Snapshot Collection — Allow partial region failures; combine last
> successful state of failed Regions with current state of normal Regions.
>
> 2. State Correction — New checkpointCoordinatorForRegionFallback interface
> for OperatorCoordinators to produce consistent snapshots against the mixed
> view.
>
> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
> premature cleanup of referenced historical checkpoints.
>
> The detailed design is described in the FLIP document:
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
>
> Looking forward to your feedback!
>
> Best regards,
>
> Raorao Xiong