Hi everyone, Thank you all for the thorough review and the great discussion on this FLIP — especially Roman, Gen, Rui Fan, Zakelly and Zihao for the detailed feedback and suggestions.
I've updated the FLIP document to incorporate the points raised during the discussion. I believe we've reached consensus on the key issues: - Scope limited to embarrassingly-parallel regions, opt-in behind the `execution.checkpointing.region.enabled` flag (default false). - Operator state safety assumptions documented in the Limitations section. - OperatorCoordinator and CheckpointListener interface extensions. - Finished Operators handled via forced global checkpoint on job termination. - FLINK-26803 documented as incompatible; FLIP-306 integration deferred to future work. The updated FLIP is at: https://cwiki.apache.org/confluence/spaces/FLINK/pages/438010016/FLIP-600+Independent+Checkpoint+Based+On+Pipeline+Region If there are no further concerns, I'll start the [VOTE] thread next Tuesday (July 7). Regards, Raorao > 2026年7月3日 18:35,Roman Khachatryan <[email protected]> 写道: > > Hi Raorao, > > Sounds good, thank you! > > Regards, > Roman > > > On Fri, Jul 3, 2026 at 9:06 AM 熊饶饶 <[email protected]> wrote: > >> Hi Roman, Gen, >> >> Thanks both for the discussion. >> >> Roman, I agree with the safety boundary you've defined: each region's >> state must be a deterministic function of consumed data, with no >> cross-subtask agreement or cross-element temporal invariant. >> >> Since Regional Checkpoint is opt-in >> (`execution.checkpointing.region.enabled = false` by default), users who >> know their job has such invariants simply won't enable it. I'll document >> the safety conditions in the FLIP's Limitations section so the expectation >> is clear before anyone turns the flag on. No additional API or framework >> guard needed. >> >> Let me know if this sounds right. >> >> Regards, >> Raorao >> >>> 2026年7月3日 04:57,Roman Khachatryan <[email protected]> 写道: >>> >>> Hi Gen, >>> >>> Thanks for your reply, >>> >>> I don't think the "stalled subtask" equivalence fully covers this case. A >>> stalled-but-alive subtask still acknowledges checkpoints, so it would >>> contribute its current snapshot, not an old one. The composition only >>> arises because the failed region >>> never acked the current checkpoint — which in normal Flink fails the >> whole >>> checkpoint (exactly the case this FLIP is introducing). So the "slow >>> subtask" analogy holds only when a subtask that consumed no new data >> would >>> produce an identical snapshot >>> at the next barrier — i.e. state that's a deterministic function of the >>> consumed data. It doesn't hold for state that mutates in >>> snapshotState()/open(), on timers/wall-clock, or that's meant to be >>> globally agreed — e.g. an epoch or recovery >>> counter kept as union state that all subtasks must agree on. There the >>> composed view can be {100, 101}, which no real execution produces. >>> >>> More generally: a Flink checkpoint today is globally atomic — every >>> subtask's state comes from the same barrier, one logical time. This FLIP >> is >>> the first thing to relax that and mix state from different logical times >>> across subtasks, so an >>> assumption operators can rely on today — "all of my state comes from one >>> consistent snapshot" — no longer holds. That applies to even-split state >>> too, not just union: ordinary rescaling only requires operators to >> tolerate >>> regrouping (which elements >>> land together), never temporal mixing (elements from different times). >>> Self-contained elements like Kafka splits are fine; an operator with any >>> cross-element or cross-subtask temporal invariant is not, and would be >>> broken only by this feature. >>> >>> Since the runtime can't inspect operator semantics, I'd suggest: >>> 1. Make the feature opt-in and document the assumption explicitly — each >>> region's state must be a deterministic function of the data it consumed, >>> with no cross-subtask agreement or cross-element temporal invariant. >>> 2. As an automatic backstop, guard union and broadcast state (contract: >>> every subtask sees the same/complete view) — either disable regional >>> checkpointing when the job uses it, or force a global checkpoint when >> such >>> a region fails. This covers the >>> common case but isn't a soundness proof, which is why (1) matters — >>> even-split isn't automatically safe either. >>> >>> Could we capture this in the Limitations/Assumptions section of the FLIP >> if >>> you agree? >>> >>> Regards, >>> Roman >>> >>> >>> On Thu, Jul 2, 2026 at 9:19 AM Gen Luo <[email protected]> wrote: >>> >>>> Hi Roman and Raorao, >>>> >>>> >>>> Regarding the union operator state, I'd like to share some thoughts. >>>> >>>> In my understanding, union operator state is not a separate state type, >> but >>>> a different recovery semantic of OperatorState. The underlying state is >> the >>>> same; the only difference is that regular operator state redistributes >>>> state partitions, while union operator state restores the full state >> list >>>> to every subtask. >>>> >>>> From this perspective, I think combining newer snapshots from healthy >>>> regions with an older snapshot from a failed region is still valid. The >>>> snapshots of different regions are independent, so the newer snapshots >> are >>>> unaffected by whether the failed region contributes its latest or >> previous >>>> snapshot. >>>> >>>> Another way to see this is to assume the failed subtask simply stopped >>>> making progress after the previous checkpoint instead of failing. The >>>> resulting checkpoint would naturally contain the latest snapshots from >>>> healthy subtasks and the previous snapshot from that subtask, which is >>>> exactly the same composition as the proposed regional checkpoint. >>>> >>>> What do you think? >>>> >>>> >>>> Best, >>>> >>>> Gen >>>> >>>> On Mon, Jun 29, 2026 at 4:45 AM Roman Khachatryan <[email protected]> >>>> wrote: >>>> >>>>> Hi Raorao, >>>>> >>>>> Thanks for the walk-through. >>>>> That partially resolves my concern about re-assigning state: it doesn't >>>>> have to be recomputed on every partial checkpoint — but the restored >>>>> (redistributed) assignment has to be retained until the first >> checkpoint >>>>> completes at the new parallelism. >>>>> >>>>> My other concern still stands: the isolation argument doesn't obviously >>>>> cover union operator state, which every subtask sees in full across all >>>>> regions — so a partial checkpoint stores a temporal mix of elements, >> and >>>>> rescaling reshuffles them across new region boundaries. >>>>> >>>>>> I'll cover all this in the FLIP document. >>>>> >>>>> Yes — I think it's better to capture these edge cases in a Rescaling >>>>> section of the FLIP and discuss in more detail there. Looking forward >> to >>>>> it. >>>>> >>>>> Regards, >>>>> Roman >>>>> >>>>> >>>>> On Fri, Jun 26, 2026 at 10:06 AM 熊饶饶 <[email protected]> wrote: >>>>> >>>>>> Hi Roman, thanks for the detailed follow-up. >>>>>> Rescaling and state redistribution >>>>>> StateAssignmentOperation runs only once per restore — it doesn’t run >>>> per >>>>>> referenced checkpoint. The subtask states entering >>>>> StateAssignmentOperation >>>>>> have already been pre-merged during the Regional Checkpoint completion >>>>>> phase. Each OperatorSubtaskState already physically contains the state >>>>>> handles from whichever checkpoint it originated from. The >>>> refCheckpointId >>>>>> is just metadata for the cleaner. >>>>>> Example: P=4 → P=2 → P=1, two rounds of downscaling >>>>>> Initial state (P=4, even-split operator state): >>>>>> ckp100 (T1, GLOBAL): all 4 regions ack >>>>>> subtask 0: [A0, A1] region 0 >>>>>> subtask 1: [B0, B1] region 1 >>>>>> subtask 2: [C0, C1] region 2 >>>>>> subtask 3: [D0, D1] region 3 >>>>>> First downscale: P=4 → P=2 (recover from ckp100 after failover) >>>>>> Even-split redistribution: >>>>>> concatenate: [A0,A1, B0,B1, C0,C1, D0,D1] → 8 elements >>>>>> split into 2: >>>>>> new subtask 0: [A0, A1, B0, B1] >>>>>> new subtask 1: [C0, C1, D0, D1] >>>>>> Now running at P=2: >>>>>> ckp101 (T2, REGIONAL): >>>>>> new-subtask 0 ack: [E0, E1, E2, E3] >>>>>> new-subtask 1 decline, fallback to ckp100 → [C0,C1,D0,D1] (ref=100) >>>>>> >>>>>> ckp101 metadata (pre-merged): >>>>>> new-subtask 0: stateHandles=[E0,E1,E2,E3], refCheckpointId=null >>>>>> new-subtask 1: stateHandles=[C0,C1,D0,D1], refCheckpointId=100 >>>>>> Second downscale: P=2 → P=1 (recover from ckp101) >>>>>> Even-split redistribution: >>>>>> concatenate: [E0,E1,E2,E3, C0,C1,D0,D1] → 8 elements >>>>>> split into 1: >>>>>> new subtask 0: [E0,E1,E2,E3, C0,C1,D0,D1] >>>>>> At this point, the single subtask’s state contains elements from two >>>> time >>>>>> points — this is safe per the partition isolation argument above. Now >>>> the >>>>>> job continues running at P=1: >>>>>> ckp102 (T3, REGIONAL): >>>>>> new subtask 0 decline, fallback to ckp101's redistributed state >>>>>> → ref=101 >>>>>> >>>>>> ckp102 metadata (pre-merged): >>>>>> subtask 0: stateHandles=[E0,E1,E2,E3, C0,C1,D0,D1], >>>> refCheckpointId=101 >>>>>> >>>>>> ref chain: ckp102(ref=101) → ckp101(ref=100) → ckp100 >>>>>> The ref chain is tracked entirely in the CompletedCheckpoint store >>>>>> metadata. Rescaling is a restore operation — it redistributes state >>>>> handles >>>>>> to TMs but doesn't create new checkpoint metadata. The next Regional >>>>>> Checkpoint (ckp102) references the previous CompletedCheckpoint >>>> (ckp101) >>>>>> from the store, which still has its original ref=100 annotation in the >>>>>> metadata. >>>>>> >>>>>> Union state, same scenario: >>>>>> ckp100 (T1, GLOBAL): P=4, union operator state >>>>>> subtask 0: [A0, A1] >>>>>> subtask 1: [B0, B1] >>>>>> subtask 2: [C0, C1] >>>>>> subtask 3: [D0, D1] >>>>>> >>>>>> P=4 → P=2 (union): >>>>>> new subtask 0 gets FULL concatenated: [A0,A1, B0,B1, C0,C1, D0,D1] >>>>>> new subtask 1 gets FULL concatenated: same >>>>>> >>>>>> ckp101 (T2, REGIONAL): >>>>>> new-subtask 0 ack: [E0, ..., E7] >>>>>> new-subtask 1 decline, fallback → [A0,A1,B0,B1,C0,C1,D0,D1] >>>> (ref=100) >>>>>> >>>>>> P=2 → P=1 (union): >>>>>> new subtask 0 gets: [E0,...,E7, A0,...,D1] ← complete set, safe >>>>>> Union is even simpler — each subtask always gets the complete state, >> so >>>>>> mixing doesn’t add new complexity. >>>>>> refCheckpointId chain bounded by maxConsecutiveFailures >>>>>> >>>>>> ckp100 (global) >>>>>> → ckp101 (regional, ref=100) consecutive=1 >>>>>> → ckp102 (regional, ref=101) consecutive=2 >>>>>> → ckp103: consecutive >= maxConsecutiveFailures(2) >>>>>> → FORCED GLOBAL → if success, chain resets >>>>>> The reference chain can only grow as deep as maxConsecutiveFailures. >>>> Once >>>>>> a global checkpoint succeeds, it’s a fresh start. No inflation beyond >>>> the >>>>>> configured limit. >>>>>> Channel state redistribution >>>>>> Each channel state handle carries InputChannelInfo / >>>>>> ResultSubpartitionInfo. TaskStateAssignment maps handles by their >>>> channel >>>>>> metadata — time of origin is irrelevant. Per-subtask channel state and >>>>>> operator state come from the same checkpoint snapshot, ensuring >>>> internal >>>>>> consistency regardless of which checkpoint other subtasks reference. >>>>>> Summary >>>>>> 1. One StateAssignmentOperation per restore — states are pre-merged >>>>>> 2. POINTWISE partition isolation makes mixing states across time >>>> points >>>>>> safe for both even-split and union >>>>>> 3. maxConsecutiveFailures bounds refCheckpointId chain depth >>>>>> 4. Channel state redistribution is time-agnostic >>>>>> >>>>>> I’ll cover all this in the FLIP document. >>>>>> >>>>>> Thanks again for the detailed replies, looking forward to your >>>> feedback. >>>>>> >>>>>> Regards, >>>>>> Raorao >>>>>> >>>>>>> 2026年6月24日 18:38,Roman Khachatryan <[email protected]> 写道: >>>>>>> >>>>>>> Hi Raorao, thanks for your answers >>>>>>> >>>>>>>>> 2. Rescaling for OperatorSubtaskState >>>>>>> >>>>>>>> `refCheckpointId` is checkpoint-level metadata stored in the >>>> completed >>>>>>> checkpoint — >>>>>>>> it doesn't participate in state redistribution during rescaling. The >>>>>>> `TaskStateAssignment` >>>>>>>> redistributes only the state handles to new subtasks, while the >>>>>>> `CompletedCheckpointStore` >>>>>>>> retains the original metadata (including `refCheckpointId`) for the >>>>>>> cleaner to trace >>>>>>>> reference chains. So rescaling is inherently compatible. >>>>>>> >>>>>>> Do you mean that we'll run StateAssignmentOperation per every >>>>> referenced >>>>>>> checkpoint on every "partial" checkpoint completion? >>>>>>> (it can be quite heavy) >>>>>>> >>>>>>> I don't think that "redistributes only the state handles to new >>>>> subtasks" >>>>>>> part is true - on rescaling, distribution changes for (potentially) >>>>> every >>>>>>> sub-task. >>>>>>> >>>>>>> From the docs [1]: >>>>>>> - Even-split redistribution: Each operator returns a List of state >>>>>>> elements. The whole state is logically a concatenation of all lists. >>>> On >>>>>>> restore/redistribution, the list is evenly divided into as many >>>>> sublists >>>>>> as >>>>>>> there are parallel operators. Each operator gets a sublist, which can >>>>> be >>>>>>> empty, or contain one or more elements. >>>>>>> - Union redistribution: Each operator returns a List of state >>>> elements. >>>>>> The >>>>>>> whole state is logically a concatenation of all lists. On >>>>>>> restore/redistribution, each operator gets the complete list of state >>>>>>> elements. ... >>>>>>> - (we said earlier that keyed stated is not supported) >>>>>>> >>>>>>> So I don't see how any of the above can work because we might mix old >>>>> and >>>>>>> new states in a sub-task. >>>>>>> >>>>>>> For example: >>>>>>> - a job is running with parallelism 2 and has even-split state >>>>>> distribution >>>>>>> - 1st checkpoint is completes by 2/2 sub-tasks >>>>>>> - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the >>>>>>> checkpoint; its state in checkpoint 2 refers to "subtask 2 state in >>>>>>> checkpoint 1" >>>>>>> - job is rescaled to 1 >>>>>>> - sub-task 1 now has state from checkpoint 1 AND 2? >>>>>>> >>>>>>> Furthermore, with multiple rounds of downscaling, there can be a >>>> single >>>>>>> sub-task referring to multiple historical checkpoints. >>>>>>> >>>>>>> For Union, it's even more problematic. >>>>>>> >>>>>>> Also, what about channel state (Unaligned checkpoint) >>>> re-distribution? >>>>>>> >>>>>>> Probably it's better to have a FLIP document with the corresponding >>>>>> section >>>>>>> first and then discuss it. >>>>>>> >>>>>>> [1] >>>>>>> >>>>>> >>>>> >>>> >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state >>>>>>> >>>>>>> Regards, >>>>>>> Roman >>>>>>> >>>>>>> >>>>>>> On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote: >>>>>>> >>>>>>>> Hi Zakelly, >>>>>>>> >>>>>>>> Thanks for the feedback! I completely agree — forcing region-level >>>>>> merging >>>>>>>> would significantly reduce the effectiveness of file merging, and >>>>> that's >>>>>>>> not the right trade-off. >>>>>>>> >>>>>>>> I think TM-level merging can work correctly with Regional Checkpoint >>>>> if >>>>>> we >>>>>>>> ensure the FileMergingSnapshotManager properly handles the new >>>>>> checkpoint >>>>>>>> notification semantics. Specifically: >>>>>>>> >>>>>>>> - The FileMergingSnapshotManager already uses a notification-based >>>>>>>> lifecycle (checkpoint complete/abort/subsumed) to manage physical >>>> file >>>>>>>> deletion. >>>>>>>> - For Regional Checkpoint, we need to introduce the new notification >>>>>> type >>>>>>>> we discussed earlier (notify partially-completed) so that the >>>> merging >>>>>>>> manager knows: this checkpoint is complete but some segments belong >>>> to >>>>>>>> failed regions — keep the physical files that contain those segments >>>>>> alive >>>>>>>> until the referencing checkpoints are subsumed. >>>>>>>> - This aligns with the existing design and requires only minor >>>>>> adjustments >>>>>>>> to the merging manager's notification handling. >>>>>>>> >>>>>>>> The previous suggestion of region-level merging was overly >>>>> conservative >>>>>> — >>>>>>>> glad you pointed out the better approach. >>>>>>>> >>>>>>>> Also good to know about the plan to cover channel state in FLIP-306. >>>>>> That >>>>>>>> will simplify the compatibility matrix significantly. >>>>>>>> >>>>>>>> Best Regards, >>>>>>>> Raorao >>>>>>>> >>>>>>>>> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道: >>>>>>>>> >>>>>>>>> Hi Raorao, >>>>>>>>> >>>>>>>>> Good to see this proposal and +1 for the direction. Sorry for >>>> joining >>>>>> the >>>>>>>>> discussion late. I have seen many constructive suggestions that >>>>> largely >>>>>>>>> align with my thoughts, but I still have one concern: >>>>>>>>> >>>>>>>>> I'm one of the authors of FLIP-306 and I'm not in favor of >>>>> region-level >>>>>>>>> merging. IIUC, region-level merge files severely limit the >>>>>> effectiveness >>>>>>>> of >>>>>>>>> merging, as merging cannot happen between subtasks. I think it >>>> should >>>>>>>> still >>>>>>>>> be possible to perform TM-level merge. The only thing we should do >>>> is >>>>>> to >>>>>>>>> keep previous checkpoint files alive when a region checkpoint >>>> occurs. >>>>>>>> This >>>>>>>>> does not conflict with the current design. It is only necessary to >>>>>> ensure >>>>>>>>> that the new behavior of checkpoint notifications is compatible >>>> with >>>>>>>>> FLIP-306, or some minor adjustment needed towards the merging >>>>> manager. >>>>>>>>> >>>>>>>>> And BTW to @Rui, we still need more work to let FLIP-306 cover >>>>> channel >>>>>>>>> state and deprecate FLINK-26803, and I will look into this soon. >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Zakelly >>>>>>>>> >>>>>>>>> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Roman, thanks for the follow-up. >>>>>>>>>> >>>>>>>>>> 1. FLIP page >>>>>>>>>> >>>>>>>>>> Thanks for your reminder, I'll create the proper FLIP page before >>>>>>>> starting >>>>>>>>>> [VOTE] thread. >>>>>>>>>> >>>>>>>>>> 2. Rescaling for OperatorSubtaskState >>>>>>>>>> >>>>>>>>>> `refCheckpointId` is checkpoint-level metadata stored in the >>>>> completed >>>>>>>>>> checkpoint — it doesn't participate in state redistribution during >>>>>>>>>> rescaling. The `TaskStateAssignment` redistributes only the state >>>>>>>> handles >>>>>>>>>> to new subtasks, while the `CompletedCheckpointStore` retains the >>>>>>>> original >>>>>>>>>> metadata (including `refCheckpointId`) for the cleaner to trace >>>>>>>> reference >>>>>>>>>> chains. So rescaling is inherently compatible. >>>>>>>>>> >>>>>>>>>> 3. Finished operators >>>>>>>>>> >>>>>>>>>> You're right, my previous answer missed the real issue. For >>>> bounded >>>>>>>> source >>>>>>>>>> jobs (FLIP-147), if the final checkpoint is Regional and a failed >>>>>>>> Region's >>>>>>>>>> subtask misses `notifyCheckpointComplete`, their side effects >>>> (e.g., >>>>>>>> Kafka >>>>>>>>>> transactions) are never committed — that's data loss. >>>>>>>>>> >>>>>>>>>> Solution: force a global checkpoint when the job is about to >>>>>> terminate. >>>>>>>>>> When all sources are exhausted, the JM will mark the next >>>> checkpoint >>>>>> as >>>>>>>>>> mandatory global, requiring all regions to ack. If it fails, the >>>> job >>>>>>>>>> retries rather than terminating with a partial snapshot. >>>>>>>>>> >>>>>>>>>> 4. Limitations section >>>>>>>>>> >>>>>>>>>> I agree with you, and I'll add a section covering all limitations: >>>>>>>>>> >>>>>>>>>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime >>>>>>>>>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging) >>>>>>>>>> NO_CLAIM restore mode -> Warn (require global checkpoint before >>>>>> snapshot >>>>>>>>>> deletion) >>>>>>>>>> Changelog state backend -> Reject job submission >>>>>>>>>> Finished operators (FLIP-147) -> Force global checkpoint when >>>>> sources >>>>>>>> are >>>>>>>>>> exhausted >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks again for the review, looking forward to your feedback. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Raorao >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道: >>>>>>>>>>> >>>>>>>>>>> Hi, thanks for your replies and sorry for the delay. >>>>>>>>>>> >>>>>>>>>>> Most of my questions were answered, but I still have some >>>> concerns. >>>>>>>>>>> >>>>>>>>>>>> If there are no further concerns by next Monday (June 22), I'll >>>> go >>>>>>>> ahead >>>>>>>>>>> and start the [VOTE] thread for this FLIP. >>>>>>>>>>> >>>>>>>>>>> Isn't the actual FLIP still missing? I only saw Google Document. >>>> Do >>>>>> you >>>>>>>>>>> mind creating a page according to [1]? >>>>>>>>>>> >>>>>>>>>>> ---------------------------------------- >>>>>>>>>>> >>>>>>>>>>>> 3. Checkpoint metadata layout >>>>>>>>>>>> Regional Checkpoint recombines state from different checkpoint >>>>> IDs. >>>>>> To >>>>>>>>>>> track this, we add a refCheckpointId field to >>>> OperatorSubtaskState >>>>> in >>>>>>>> the >>>>>>>>>>> metadata, indicating which historical checkpoint a subtask’s >>>> state >>>>>>>>>>> references. >>>>>>>>>>> >>>>>>>>>>> Could you explain how do we find the right OperatorSubtaskState - >>>>>>>>>>> especially in case of rescaling? >>>>>>>>>>> Does the proposal support rescaling? >>>>>>>>>>> >>>>>>>>>>>> 9. Finished operators >>>>>>>>>>>> The concern is: a finished operator’s final commit notification >>>>> gets >>>>>>>>>>> skipped by Regional Checkpoint, and if this checkpoint is the >>>> last >>>>>> one, >>>>>>>>>> the >>>>>>>>>>> operator never receives it — could this cause data loss? >>>>>>>>>>>> In practice, the impact is limited: >>>>>>>>>>>> ● Failed Region tasks are already gone: By the time the Regional >>>>>>>>>>> Checkpoint completes, tasks in the failed Region have already >>>> been >>>>>>>>>>> restarted (decline) or cancelled (timeout). There is no task left >>>>> to >>>>>>>>>>> receive the notification anyway. >>>>>>>>>>> Checkpoint failure doesn't necessarily cause a restart >>>> (especially >>>>> if >>>>>>>>>> this >>>>>>>>>>> is limited to one region). The tasks should still be up and >>>>> running. >>>>>>>>>>> >>>>>>>>>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After >>>>>>>> reaching >>>>>>>>>>> the limit, the next checkpoint is forced to be global, ensuring >>>> all >>>>>>>> tasks >>>>>>>>>>> eventually receive notifyCheckpointComplete. We can’t skip the >>>> same >>>>>>>>>> Region >>>>>>>>>>> forever. >>>>>>>>>>> maxConsecutiveFailures might not be reached for the final >>>>> checkpoint. >>>>>>>>>>> >>>>>>>>>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the >>>> user >>>>>>>> stops >>>>>>>>>>> the job gracefully, it triggers a full global snapshot, not a >>>>>> Regional >>>>>>>>>>> Checkpoint. So the final checkpoint is always complete. >>>>>>>>>>> stop-with-savepoint should be fine, yes. >>>>>>>>>>> >>>>>>>>>>> To clarify, my concern is about jobs with bounded sources. In >>>> such >>>>>>>> cases, >>>>>>>>>>> some subtasks might finish processing but still participate in >>>>>>>>>> checkpoints. >>>>>>>>>>> After a successful checkpoint, they are guaranteed to get >>>>> checkpoint >>>>>>>>>>> completion notification - so that they can make side effects >>>>> visible >>>>>> in >>>>>>>>>>> external systems (commit Kafka transactions). >>>>>>>>>>> See FLIP-147 [2] >>>>>>>>>>> >>>>>>>>>>> However, with the current proposal, the job might complete with >>>>> some >>>>>>>>>>> subtasks/regions failing the final checkpoint unless I'm missing >>>>>>>>>> something. >>>>>>>>>>> This is essentially data loss. >>>>>>>>>>> To prevent this, the final checkpoint must always be acked by all >>>>>>>>>>> subtasks/regions. >>>>>>>>>>> >>>>>>>>>>> ---------------------------------------- >>>>>>>>>>> >>>>>>>>>>> There are quite some limitations in this proposal. >>>>>>>>>>> Could you add a section describing how each of them is handled? >>>>>>>>>>> 1. Reject job submission >>>>>>>>>>> 2. Force all-region checkpoint >>>>>>>>>>> 3. Warn in documentation >>>>>>>>>>> >>>>>>>>>>>> 1. Region independence — BLOCKING/HYBRID edges >>>>>>>>>>>> You’re right. Our current scope is limited to embarrassingly >>>>>> parallel >>>>>>>>>>> regions. In typical ETL scenarios, each parallelism maps to an >>>>>>>>>> independent >>>>>>>>>>> Region with no edges connecting them. >>>>>>>>>>> >>>>>>>>>>>> 5. SharedStateRegistry — how are old states kept alive? >>>>>>>>>>>> Good question. In the current design, since we only target >>>>>>>>>> embarrassingly >>>>>>>>>>> parallel regions, there is typically no keyed state and no >>>>>> incremental >>>>>>>>>>> state. As a result, the SharedStateRegistry is generally empty >>>>>> (setting >>>>>>>>>>> aside File Merging and Changelog State for now, discussed on 8.), >>>>> so >>>>>>>>>>> keep-alive of files under the shared directory is not a concern. >>>>>>>>>>> >>>>>>>>>>>> 8. FLINK-26803 and FLIP-306 compatibility >>>>>>>>>>>> This is a very important point. Both features essentially merge >>>>>> small >>>>>>>>>>> files at the job level. As Rui Fan pointed out, if the merging >>>>>>>>>> granularity >>>>>>>>>>> is reduced to the Region level, compatibility with Regional >>>>>> Checkpoint >>>>>>>>>>> should be achievable in theory. I think this can be deferred to >>>>>> future >>>>>>>>>> work >>>>>>>>>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit >>>>> and >>>>>>>>>> enable >>>>>>>>>>> support. >>>>>>>>>>> >>>>>>>>>>>> 10. NO_CLAIM mode warning >>>>>>>>>>>> You’re absolutely right — this is an important reminder. After >>>>>>>> restoring >>>>>>>>>>> from a Regional Checkpoint, only a successful global checkpoint >>>>>>>>>> guarantees >>>>>>>>>>> independence from the old state. We’ll add a clear user warning >>>> in >>>>>> the >>>>>>>>>>> documentation. >>>>>>>>>>> >>>>>>>>>>>> 11. Changelog state backend — not supported >>>>>>>>>>>> As mentioned earlier, our primary target is embarrassingly >>>>> parallel >>>>>>>>>>> regions, which typically have no keyed state and therefore no >>>> slow >>>>>>>>>>> incremental state flush issues. I don’t think we need to support >>>>>>>>>> Changelog >>>>>>>>>>> state backend for now. >>>>>>>>>>> >>>>>>>>>>> ---------------------------------------- >>>>>>>>>>> >>>>>>>>>>>> 2. max-consecutive-failures exceeded — what exactly happens? >>>>>>>>>>>> The current design says “force a global checkpoint.” To clarify >>>>> the >>>>>>>>>>> two-tier behavior: >>>>>>>>>>>> ● Tier 1: When consecutiveRegionalCount >= >>>> maxConsecutiveFailures, >>>>>> the >>>>>>>>>>> next checkpoint is forced to be global. >>>>>>>>>>>> ● Tier 2: If that forced global checkpoint also fails (any task >>>>>>>>>>> declines), the checkpoint is aborted normally (not a job >>>> failure). >>>>>> The >>>>>>>>>>> counter is then reset since a global checkpoint was attempted, >>>> and >>>>>> the >>>>>>>>>> next >>>>>>>>>>> checkpoint cycle can try again. >>>>>>>>>>>> This avoids cascading into job failure while ensuring we don’t >>>>> drift >>>>>>>>>>> indefinitely on historical state. >>>>>>>>>>> >>>>>>>>>>> My assumption was that we would not allow this particular failed >>>>>> region >>>>>>>>>> to >>>>>>>>>>> fail the checkpoint again. >>>>>>>>>>> But forcing a global checkpoint works as well. >>>>>>>>>>> >>>>>>>>>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new >>>>>>>>>>> notification type >>>>>>>>>>>> This is a very insightful point. Zihao and Gen also raised this >>>> in >>>>>>>>>>> earlier discussions. The current design doesn’t address state >>>>> cleanup >>>>>>>> for >>>>>>>>>>> tasks in failed regions. I agree it’s necessary to introduce a >>>> new >>>>>>>>>>> notification type. For tasks in failed regions, local state >>>> cleanup >>>>>> can >>>>>>>>>> be >>>>>>>>>>> deferred until the next checkpoint trigger. >>>>>>>>>>> Ok, this can be some future work. >>>>>>>>>>> >>>>>>>>>>>> 7. Task that never acknowledges nor declines — per-region >>>> timeouts >>>>>>>>>>>> This was discussed in the previous thread. Network issues may >>>>> cause >>>>>> a >>>>>>>>>>> task to neither ack nor decline in time. In such cases, we treat >>>> it >>>>>> as >>>>>>>> a >>>>>>>>>>> checkpoint timeout: the affected tasks’ region is marked as >>>> failed, >>>>>> and >>>>>>>>>> the >>>>>>>>>>> process ultimately falls through to the normalRegional Checkpoint >>>>>>>>>>> processing logic. >>>>>>>>>>> Ok, this can be some future work. >>>>>>>>>>> >>>>>>>>>>> ---------------------------------------- >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP >>>>>>>>>>> >>>>>>>>>>> [2] >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Roman >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Roman >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> >>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi all, >>>>>>>>>>>> >>>>>>>>>>>> Thanks everyone for the valuable feedback. I believe all the >>>>> points >>>>>>>>>> raised >>>>>>>>>>>> above have been addressed (@Roman @Rui Fan). If there are no >>>>> further >>>>>>>>>>>> concerns by next Monday (June 22), I'll go ahead and start the >>>>>> [VOTE] >>>>>>>>>>>> thread for this FLIP. >>>>>>>>>>>> >>>>>>>>>>>> For reference, the earlier related discussion can be found here: >>>>>>>>>>>> >>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Please feel free to share any additional feedback before then. >>>>>>>>>>>> >>>>>>>>>>>> Best Regards, >>>>>>>>>>>> Raorao >>>>>>>>>>>> >>>>>>>>>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道: >>>>>>>>>>>> >>>>>>>>>>>> Hi devs, >>>>>>>>>>>> >>>>>>>>>>>> I would like to start a discussion on FLIP-XXX: Independent >>>>>> Checkpoint >>>>>>>>>>>> Based On Pipeline Region. >>>>>>>>>>>> >>>>>>>>>>>> In high-parallelism streaming jobs, a single Task's checkpoint >>>>>> failure >>>>>>>>>>>> causes the entire global Checkpoint to abort, leading to >>>> degraded >>>>>>>>>>>> checkpoint success rates and wasted compute resources >>>> (especially >>>>>> for >>>>>>>>>> GPU >>>>>>>>>>>> operators). >>>>>>>>>>>> >>>>>>>>>>>> We propose Regional Checkpoint: when some Regions fail to >>>>>> checkpoint, >>>>>>>>>> the >>>>>>>>>>>> framework combines the historical state of the failed Regions >>>> with >>>>>> the >>>>>>>>>>>> current state of the healthy Regions to produce a logically >>>>> complete >>>>>>>>>>>> Completed Checkpoint — while preserving state consistency. The >>>> key >>>>>>>>>> changes >>>>>>>>>>>> are: >>>>>>>>>>>> >>>>>>>>>>>> 1. Snapshot Collection — Allow partial region failures; combine >>>>> last >>>>>>>>>>>> successful state of failed Regions with current state of normal >>>>>>>> Regions. >>>>>>>>>>>> >>>>>>>>>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback >>>>>>>>>> interface >>>>>>>>>>>> for OperatorCoordinators to produce consistent snapshots against >>>>> the >>>>>>>>>> mixed >>>>>>>>>>>> view. >>>>>>>>>>>> >>>>>>>>>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to >>>>> prevent >>>>>>>>>>>> premature cleanup of referenced historical checkpoints. >>>>>>>>>>>> >>>>>>>>>>>> The detailed design is described in the FLIP document: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing >>>>>>>>>>>> >>>>>>>>>>>> Looking forward to your feedback! >>>>>>>>>>>> >>>>>>>>>>>> Best regards, >>>>>>>>>>>> >>>>>>>>>>>> Raorao Xiong >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>>> >> >>
