Hi Raorao, Sounds good, thank you!
Regards, Roman On Fri, Jul 3, 2026 at 9:06 AM 熊饶饶 <[email protected]> wrote: > Hi Roman, Gen, > > Thanks both for the discussion. > > Roman, I agree with the safety boundary you've defined: each region's > state must be a deterministic function of consumed data, with no > cross-subtask agreement or cross-element temporal invariant. > > Since Regional Checkpoint is opt-in > (`execution.checkpointing.region.enabled = false` by default), users who > know their job has such invariants simply won't enable it. I'll document > the safety conditions in the FLIP's Limitations section so the expectation > is clear before anyone turns the flag on. No additional API or framework > guard needed. > > Let me know if this sounds right. > > Regards, > Raorao > > > 2026年7月3日 04:57,Roman Khachatryan <[email protected]> 写道: > > > > Hi Gen, > > > > Thanks for your reply, > > > > I don't think the "stalled subtask" equivalence fully covers this case. A > > stalled-but-alive subtask still acknowledges checkpoints, so it would > > contribute its current snapshot, not an old one. The composition only > > arises because the failed region > > never acked the current checkpoint — which in normal Flink fails the > whole > > checkpoint (exactly the case this FLIP is introducing). So the "slow > > subtask" analogy holds only when a subtask that consumed no new data > would > > produce an identical snapshot > > at the next barrier — i.e. state that's a deterministic function of the > > consumed data. It doesn't hold for state that mutates in > > snapshotState()/open(), on timers/wall-clock, or that's meant to be > > globally agreed — e.g. an epoch or recovery > > counter kept as union state that all subtasks must agree on. There the > > composed view can be {100, 101}, which no real execution produces. > > > > More generally: a Flink checkpoint today is globally atomic — every > > subtask's state comes from the same barrier, one logical time. This FLIP > is > > the first thing to relax that and mix state from different logical times > > across subtasks, so an > > assumption operators can rely on today — "all of my state comes from one > > consistent snapshot" — no longer holds. That applies to even-split state > > too, not just union: ordinary rescaling only requires operators to > tolerate > > regrouping (which elements > > land together), never temporal mixing (elements from different times). > > Self-contained elements like Kafka splits are fine; an operator with any > > cross-element or cross-subtask temporal invariant is not, and would be > > broken only by this feature. > > > > Since the runtime can't inspect operator semantics, I'd suggest: > > 1. Make the feature opt-in and document the assumption explicitly — each > > region's state must be a deterministic function of the data it consumed, > > with no cross-subtask agreement or cross-element temporal invariant. > > 2. As an automatic backstop, guard union and broadcast state (contract: > > every subtask sees the same/complete view) — either disable regional > > checkpointing when the job uses it, or force a global checkpoint when > such > > a region fails. This covers the > > common case but isn't a soundness proof, which is why (1) matters — > > even-split isn't automatically safe either. > > > > Could we capture this in the Limitations/Assumptions section of the FLIP > if > > you agree? > > > > Regards, > > Roman > > > > > > On Thu, Jul 2, 2026 at 9:19 AM Gen Luo <[email protected]> wrote: > > > >> Hi Roman and Raorao, > >> > >> > >> Regarding the union operator state, I'd like to share some thoughts. > >> > >> In my understanding, union operator state is not a separate state type, > but > >> a different recovery semantic of OperatorState. The underlying state is > the > >> same; the only difference is that regular operator state redistributes > >> state partitions, while union operator state restores the full state > list > >> to every subtask. > >> > >> From this perspective, I think combining newer snapshots from healthy > >> regions with an older snapshot from a failed region is still valid. The > >> snapshots of different regions are independent, so the newer snapshots > are > >> unaffected by whether the failed region contributes its latest or > previous > >> snapshot. > >> > >> Another way to see this is to assume the failed subtask simply stopped > >> making progress after the previous checkpoint instead of failing. The > >> resulting checkpoint would naturally contain the latest snapshots from > >> healthy subtasks and the previous snapshot from that subtask, which is > >> exactly the same composition as the proposed regional checkpoint. > >> > >> What do you think? > >> > >> > >> Best, > >> > >> Gen > >> > >> On Mon, Jun 29, 2026 at 4:45 AM Roman Khachatryan <[email protected]> > >> wrote: > >> > >>> Hi Raorao, > >>> > >>> Thanks for the walk-through. > >>> That partially resolves my concern about re-assigning state: it doesn't > >>> have to be recomputed on every partial checkpoint — but the restored > >>> (redistributed) assignment has to be retained until the first > checkpoint > >>> completes at the new parallelism. > >>> > >>> My other concern still stands: the isolation argument doesn't obviously > >>> cover union operator state, which every subtask sees in full across all > >>> regions — so a partial checkpoint stores a temporal mix of elements, > and > >>> rescaling reshuffles them across new region boundaries. > >>> > >>>> I'll cover all this in the FLIP document. > >>> > >>> Yes — I think it's better to capture these edge cases in a Rescaling > >>> section of the FLIP and discuss in more detail there. Looking forward > to > >>> it. > >>> > >>> Regards, > >>> Roman > >>> > >>> > >>> On Fri, Jun 26, 2026 at 10:06 AM 熊饶饶 <[email protected]> wrote: > >>> > >>>> Hi Roman, thanks for the detailed follow-up. > >>>> Rescaling and state redistribution > >>>> StateAssignmentOperation runs only once per restore — it doesn’t run > >> per > >>>> referenced checkpoint. The subtask states entering > >>> StateAssignmentOperation > >>>> have already been pre-merged during the Regional Checkpoint completion > >>>> phase. Each OperatorSubtaskState already physically contains the state > >>>> handles from whichever checkpoint it originated from. The > >> refCheckpointId > >>>> is just metadata for the cleaner. > >>>> Example: P=4 → P=2 → P=1, two rounds of downscaling > >>>> Initial state (P=4, even-split operator state): > >>>> ckp100 (T1, GLOBAL): all 4 regions ack > >>>> subtask 0: [A0, A1] region 0 > >>>> subtask 1: [B0, B1] region 1 > >>>> subtask 2: [C0, C1] region 2 > >>>> subtask 3: [D0, D1] region 3 > >>>> First downscale: P=4 → P=2 (recover from ckp100 after failover) > >>>> Even-split redistribution: > >>>> concatenate: [A0,A1, B0,B1, C0,C1, D0,D1] → 8 elements > >>>> split into 2: > >>>> new subtask 0: [A0, A1, B0, B1] > >>>> new subtask 1: [C0, C1, D0, D1] > >>>> Now running at P=2: > >>>> ckp101 (T2, REGIONAL): > >>>> new-subtask 0 ack: [E0, E1, E2, E3] > >>>> new-subtask 1 decline, fallback to ckp100 → [C0,C1,D0,D1] (ref=100) > >>>> > >>>> ckp101 metadata (pre-merged): > >>>> new-subtask 0: stateHandles=[E0,E1,E2,E3], refCheckpointId=null > >>>> new-subtask 1: stateHandles=[C0,C1,D0,D1], refCheckpointId=100 > >>>> Second downscale: P=2 → P=1 (recover from ckp101) > >>>> Even-split redistribution: > >>>> concatenate: [E0,E1,E2,E3, C0,C1,D0,D1] → 8 elements > >>>> split into 1: > >>>> new subtask 0: [E0,E1,E2,E3, C0,C1,D0,D1] > >>>> At this point, the single subtask’s state contains elements from two > >> time > >>>> points — this is safe per the partition isolation argument above. Now > >> the > >>>> job continues running at P=1: > >>>> ckp102 (T3, REGIONAL): > >>>> new subtask 0 decline, fallback to ckp101's redistributed state > >>>> → ref=101 > >>>> > >>>> ckp102 metadata (pre-merged): > >>>> subtask 0: stateHandles=[E0,E1,E2,E3, C0,C1,D0,D1], > >> refCheckpointId=101 > >>>> > >>>> ref chain: ckp102(ref=101) → ckp101(ref=100) → ckp100 > >>>> The ref chain is tracked entirely in the CompletedCheckpoint store > >>>> metadata. Rescaling is a restore operation — it redistributes state > >>> handles > >>>> to TMs but doesn't create new checkpoint metadata. The next Regional > >>>> Checkpoint (ckp102) references the previous CompletedCheckpoint > >> (ckp101) > >>>> from the store, which still has its original ref=100 annotation in the > >>>> metadata. > >>>> > >>>> Union state, same scenario: > >>>> ckp100 (T1, GLOBAL): P=4, union operator state > >>>> subtask 0: [A0, A1] > >>>> subtask 1: [B0, B1] > >>>> subtask 2: [C0, C1] > >>>> subtask 3: [D0, D1] > >>>> > >>>> P=4 → P=2 (union): > >>>> new subtask 0 gets FULL concatenated: [A0,A1, B0,B1, C0,C1, D0,D1] > >>>> new subtask 1 gets FULL concatenated: same > >>>> > >>>> ckp101 (T2, REGIONAL): > >>>> new-subtask 0 ack: [E0, ..., E7] > >>>> new-subtask 1 decline, fallback → [A0,A1,B0,B1,C0,C1,D0,D1] > >> (ref=100) > >>>> > >>>> P=2 → P=1 (union): > >>>> new subtask 0 gets: [E0,...,E7, A0,...,D1] ← complete set, safe > >>>> Union is even simpler — each subtask always gets the complete state, > so > >>>> mixing doesn’t add new complexity. > >>>> refCheckpointId chain bounded by maxConsecutiveFailures > >>>> > >>>> ckp100 (global) > >>>> → ckp101 (regional, ref=100) consecutive=1 > >>>> → ckp102 (regional, ref=101) consecutive=2 > >>>> → ckp103: consecutive >= maxConsecutiveFailures(2) > >>>> → FORCED GLOBAL → if success, chain resets > >>>> The reference chain can only grow as deep as maxConsecutiveFailures. > >> Once > >>>> a global checkpoint succeeds, it’s a fresh start. No inflation beyond > >> the > >>>> configured limit. > >>>> Channel state redistribution > >>>> Each channel state handle carries InputChannelInfo / > >>>> ResultSubpartitionInfo. TaskStateAssignment maps handles by their > >> channel > >>>> metadata — time of origin is irrelevant. Per-subtask channel state and > >>>> operator state come from the same checkpoint snapshot, ensuring > >> internal > >>>> consistency regardless of which checkpoint other subtasks reference. > >>>> Summary > >>>> 1. One StateAssignmentOperation per restore — states are pre-merged > >>>> 2. POINTWISE partition isolation makes mixing states across time > >> points > >>>> safe for both even-split and union > >>>> 3. maxConsecutiveFailures bounds refCheckpointId chain depth > >>>> 4. Channel state redistribution is time-agnostic > >>>> > >>>> I’ll cover all this in the FLIP document. > >>>> > >>>> Thanks again for the detailed replies, looking forward to your > >> feedback. > >>>> > >>>> Regards, > >>>> Raorao > >>>> > >>>>> 2026年6月24日 18:38,Roman Khachatryan <[email protected]> 写道: > >>>>> > >>>>> Hi Raorao, thanks for your answers > >>>>> > >>>>>>> 2. Rescaling for OperatorSubtaskState > >>>>> > >>>>>> `refCheckpointId` is checkpoint-level metadata stored in the > >> completed > >>>>> checkpoint — > >>>>>> it doesn't participate in state redistribution during rescaling. The > >>>>> `TaskStateAssignment` > >>>>>> redistributes only the state handles to new subtasks, while the > >>>>> `CompletedCheckpointStore` > >>>>>> retains the original metadata (including `refCheckpointId`) for the > >>>>> cleaner to trace > >>>>>> reference chains. So rescaling is inherently compatible. > >>>>> > >>>>> Do you mean that we'll run StateAssignmentOperation per every > >>> referenced > >>>>> checkpoint on every "partial" checkpoint completion? > >>>>> (it can be quite heavy) > >>>>> > >>>>> I don't think that "redistributes only the state handles to new > >>> subtasks" > >>>>> part is true - on rescaling, distribution changes for (potentially) > >>> every > >>>>> sub-task. > >>>>> > >>>>> From the docs [1]: > >>>>> - Even-split redistribution: Each operator returns a List of state > >>>>> elements. The whole state is logically a concatenation of all lists. > >> On > >>>>> restore/redistribution, the list is evenly divided into as many > >>> sublists > >>>> as > >>>>> there are parallel operators. Each operator gets a sublist, which can > >>> be > >>>>> empty, or contain one or more elements. > >>>>> - Union redistribution: Each operator returns a List of state > >> elements. > >>>> The > >>>>> whole state is logically a concatenation of all lists. On > >>>>> restore/redistribution, each operator gets the complete list of state > >>>>> elements. ... > >>>>> - (we said earlier that keyed stated is not supported) > >>>>> > >>>>> So I don't see how any of the above can work because we might mix old > >>> and > >>>>> new states in a sub-task. > >>>>> > >>>>> For example: > >>>>> - a job is running with parallelism 2 and has even-split state > >>>> distribution > >>>>> - 1st checkpoint is completes by 2/2 sub-tasks > >>>>> - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the > >>>>> checkpoint; its state in checkpoint 2 refers to "subtask 2 state in > >>>>> checkpoint 1" > >>>>> - job is rescaled to 1 > >>>>> - sub-task 1 now has state from checkpoint 1 AND 2? > >>>>> > >>>>> Furthermore, with multiple rounds of downscaling, there can be a > >> single > >>>>> sub-task referring to multiple historical checkpoints. > >>>>> > >>>>> For Union, it's even more problematic. > >>>>> > >>>>> Also, what about channel state (Unaligned checkpoint) > >> re-distribution? > >>>>> > >>>>> Probably it's better to have a FLIP document with the corresponding > >>>> section > >>>>> first and then discuss it. > >>>>> > >>>>> [1] > >>>>> > >>>> > >>> > >> > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state > >>>>> > >>>>> Regards, > >>>>> Roman > >>>>> > >>>>> > >>>>> On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote: > >>>>> > >>>>>> Hi Zakelly, > >>>>>> > >>>>>> Thanks for the feedback! I completely agree — forcing region-level > >>>> merging > >>>>>> would significantly reduce the effectiveness of file merging, and > >>> that's > >>>>>> not the right trade-off. > >>>>>> > >>>>>> I think TM-level merging can work correctly with Regional Checkpoint > >>> if > >>>> we > >>>>>> ensure the FileMergingSnapshotManager properly handles the new > >>>> checkpoint > >>>>>> notification semantics. Specifically: > >>>>>> > >>>>>> - The FileMergingSnapshotManager already uses a notification-based > >>>>>> lifecycle (checkpoint complete/abort/subsumed) to manage physical > >> file > >>>>>> deletion. > >>>>>> - For Regional Checkpoint, we need to introduce the new notification > >>>> type > >>>>>> we discussed earlier (notify partially-completed) so that the > >> merging > >>>>>> manager knows: this checkpoint is complete but some segments belong > >> to > >>>>>> failed regions — keep the physical files that contain those segments > >>>> alive > >>>>>> until the referencing checkpoints are subsumed. > >>>>>> - This aligns with the existing design and requires only minor > >>>> adjustments > >>>>>> to the merging manager's notification handling. > >>>>>> > >>>>>> The previous suggestion of region-level merging was overly > >>> conservative > >>>> — > >>>>>> glad you pointed out the better approach. > >>>>>> > >>>>>> Also good to know about the plan to cover channel state in FLIP-306. > >>>> That > >>>>>> will simplify the compatibility matrix significantly. > >>>>>> > >>>>>> Best Regards, > >>>>>> Raorao > >>>>>> > >>>>>>> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道: > >>>>>>> > >>>>>>> Hi Raorao, > >>>>>>> > >>>>>>> Good to see this proposal and +1 for the direction. Sorry for > >> joining > >>>> the > >>>>>>> discussion late. I have seen many constructive suggestions that > >>> largely > >>>>>>> align with my thoughts, but I still have one concern: > >>>>>>> > >>>>>>> I'm one of the authors of FLIP-306 and I'm not in favor of > >>> region-level > >>>>>>> merging. IIUC, region-level merge files severely limit the > >>>> effectiveness > >>>>>> of > >>>>>>> merging, as merging cannot happen between subtasks. I think it > >> should > >>>>>> still > >>>>>>> be possible to perform TM-level merge. The only thing we should do > >> is > >>>> to > >>>>>>> keep previous checkpoint files alive when a region checkpoint > >> occurs. > >>>>>> This > >>>>>>> does not conflict with the current design. It is only necessary to > >>>> ensure > >>>>>>> that the new behavior of checkpoint notifications is compatible > >> with > >>>>>>> FLIP-306, or some minor adjustment needed towards the merging > >>> manager. > >>>>>>> > >>>>>>> And BTW to @Rui, we still need more work to let FLIP-306 cover > >>> channel > >>>>>>> state and deprecate FLINK-26803, and I will look into this soon. > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Zakelly > >>>>>>> > >>>>>>> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote: > >>>>>>> > >>>>>>>> Hi Roman, thanks for the follow-up. > >>>>>>>> > >>>>>>>> 1. FLIP page > >>>>>>>> > >>>>>>>> Thanks for your reminder, I'll create the proper FLIP page before > >>>>>> starting > >>>>>>>> [VOTE] thread. > >>>>>>>> > >>>>>>>> 2. Rescaling for OperatorSubtaskState > >>>>>>>> > >>>>>>>> `refCheckpointId` is checkpoint-level metadata stored in the > >>> completed > >>>>>>>> checkpoint — it doesn't participate in state redistribution during > >>>>>>>> rescaling. The `TaskStateAssignment` redistributes only the state > >>>>>> handles > >>>>>>>> to new subtasks, while the `CompletedCheckpointStore` retains the > >>>>>> original > >>>>>>>> metadata (including `refCheckpointId`) for the cleaner to trace > >>>>>> reference > >>>>>>>> chains. So rescaling is inherently compatible. > >>>>>>>> > >>>>>>>> 3. Finished operators > >>>>>>>> > >>>>>>>> You're right, my previous answer missed the real issue. For > >> bounded > >>>>>> source > >>>>>>>> jobs (FLIP-147), if the final checkpoint is Regional and a failed > >>>>>> Region's > >>>>>>>> subtask misses `notifyCheckpointComplete`, their side effects > >> (e.g., > >>>>>> Kafka > >>>>>>>> transactions) are never committed — that's data loss. > >>>>>>>> > >>>>>>>> Solution: force a global checkpoint when the job is about to > >>>> terminate. > >>>>>>>> When all sources are exhausted, the JM will mark the next > >> checkpoint > >>>> as > >>>>>>>> mandatory global, requiring all regions to ack. If it fails, the > >> job > >>>>>>>> retries rather than terminating with a partial snapshot. > >>>>>>>> > >>>>>>>> 4. Limitations section > >>>>>>>> > >>>>>>>> I agree with you, and I'll add a section covering all limitations: > >>>>>>>> > >>>>>>>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime > >>>>>>>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging) > >>>>>>>> NO_CLAIM restore mode -> Warn (require global checkpoint before > >>>> snapshot > >>>>>>>> deletion) > >>>>>>>> Changelog state backend -> Reject job submission > >>>>>>>> Finished operators (FLIP-147) -> Force global checkpoint when > >>> sources > >>>>>> are > >>>>>>>> exhausted > >>>>>>>> > >>>>>>>> > >>>>>>>> Thanks again for the review, looking forward to your feedback. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Raorao > >>>>>>>> > >>>>>>>> > >>>>>>>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道: > >>>>>>>>> > >>>>>>>>> Hi, thanks for your replies and sorry for the delay. > >>>>>>>>> > >>>>>>>>> Most of my questions were answered, but I still have some > >> concerns. > >>>>>>>>> > >>>>>>>>>> If there are no further concerns by next Monday (June 22), I'll > >> go > >>>>>> ahead > >>>>>>>>> and start the [VOTE] thread for this FLIP. > >>>>>>>>> > >>>>>>>>> Isn't the actual FLIP still missing? I only saw Google Document. > >> Do > >>>> you > >>>>>>>>> mind creating a page according to [1]? > >>>>>>>>> > >>>>>>>>> ---------------------------------------- > >>>>>>>>> > >>>>>>>>>> 3. Checkpoint metadata layout > >>>>>>>>>> Regional Checkpoint recombines state from different checkpoint > >>> IDs. > >>>> To > >>>>>>>>> track this, we add a refCheckpointId field to > >> OperatorSubtaskState > >>> in > >>>>>> the > >>>>>>>>> metadata, indicating which historical checkpoint a subtask’s > >> state > >>>>>>>>> references. > >>>>>>>>> > >>>>>>>>> Could you explain how do we find the right OperatorSubtaskState - > >>>>>>>>> especially in case of rescaling? > >>>>>>>>> Does the proposal support rescaling? > >>>>>>>>> > >>>>>>>>>> 9. Finished operators > >>>>>>>>>> The concern is: a finished operator’s final commit notification > >>> gets > >>>>>>>>> skipped by Regional Checkpoint, and if this checkpoint is the > >> last > >>>> one, > >>>>>>>> the > >>>>>>>>> operator never receives it — could this cause data loss? > >>>>>>>>>> In practice, the impact is limited: > >>>>>>>>>> ● Failed Region tasks are already gone: By the time the Regional > >>>>>>>>> Checkpoint completes, tasks in the failed Region have already > >> been > >>>>>>>>> restarted (decline) or cancelled (timeout). There is no task left > >>> to > >>>>>>>>> receive the notification anyway. > >>>>>>>>> Checkpoint failure doesn't necessarily cause a restart > >> (especially > >>> if > >>>>>>>> this > >>>>>>>>> is limited to one region). The tasks should still be up and > >>> running. > >>>>>>>>> > >>>>>>>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After > >>>>>> reaching > >>>>>>>>> the limit, the next checkpoint is forced to be global, ensuring > >> all > >>>>>> tasks > >>>>>>>>> eventually receive notifyCheckpointComplete. We can’t skip the > >> same > >>>>>>>> Region > >>>>>>>>> forever. > >>>>>>>>> maxConsecutiveFailures might not be reached for the final > >>> checkpoint. > >>>>>>>>> > >>>>>>>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the > >> user > >>>>>> stops > >>>>>>>>> the job gracefully, it triggers a full global snapshot, not a > >>>> Regional > >>>>>>>>> Checkpoint. So the final checkpoint is always complete. > >>>>>>>>> stop-with-savepoint should be fine, yes. > >>>>>>>>> > >>>>>>>>> To clarify, my concern is about jobs with bounded sources. In > >> such > >>>>>> cases, > >>>>>>>>> some subtasks might finish processing but still participate in > >>>>>>>> checkpoints. > >>>>>>>>> After a successful checkpoint, they are guaranteed to get > >>> checkpoint > >>>>>>>>> completion notification - so that they can make side effects > >>> visible > >>>> in > >>>>>>>>> external systems (commit Kafka transactions). > >>>>>>>>> See FLIP-147 [2] > >>>>>>>>> > >>>>>>>>> However, with the current proposal, the job might complete with > >>> some > >>>>>>>>> subtasks/regions failing the final checkpoint unless I'm missing > >>>>>>>> something. > >>>>>>>>> This is essentially data loss. > >>>>>>>>> To prevent this, the final checkpoint must always be acked by all > >>>>>>>>> subtasks/regions. > >>>>>>>>> > >>>>>>>>> ---------------------------------------- > >>>>>>>>> > >>>>>>>>> There are quite some limitations in this proposal. > >>>>>>>>> Could you add a section describing how each of them is handled? > >>>>>>>>> 1. Reject job submission > >>>>>>>>> 2. Force all-region checkpoint > >>>>>>>>> 3. Warn in documentation > >>>>>>>>> > >>>>>>>>>> 1. Region independence — BLOCKING/HYBRID edges > >>>>>>>>>> You’re right. Our current scope is limited to embarrassingly > >>>> parallel > >>>>>>>>> regions. In typical ETL scenarios, each parallelism maps to an > >>>>>>>> independent > >>>>>>>>> Region with no edges connecting them. > >>>>>>>>> > >>>>>>>>>> 5. SharedStateRegistry — how are old states kept alive? > >>>>>>>>>> Good question. In the current design, since we only target > >>>>>>>> embarrassingly > >>>>>>>>> parallel regions, there is typically no keyed state and no > >>>> incremental > >>>>>>>>> state. As a result, the SharedStateRegistry is generally empty > >>>> (setting > >>>>>>>>> aside File Merging and Changelog State for now, discussed on 8.), > >>> so > >>>>>>>>> keep-alive of files under the shared directory is not a concern. > >>>>>>>>> > >>>>>>>>>> 8. FLINK-26803 and FLIP-306 compatibility > >>>>>>>>>> This is a very important point. Both features essentially merge > >>>> small > >>>>>>>>> files at the job level. As Rui Fan pointed out, if the merging > >>>>>>>> granularity > >>>>>>>>> is reduced to the Region level, compatibility with Regional > >>>> Checkpoint > >>>>>>>>> should be achievable in theory. I think this can be deferred to > >>>> future > >>>>>>>> work > >>>>>>>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit > >>> and > >>>>>>>> enable > >>>>>>>>> support. > >>>>>>>>> > >>>>>>>>>> 10. NO_CLAIM mode warning > >>>>>>>>>> You’re absolutely right — this is an important reminder. After > >>>>>> restoring > >>>>>>>>> from a Regional Checkpoint, only a successful global checkpoint > >>>>>>>> guarantees > >>>>>>>>> independence from the old state. We’ll add a clear user warning > >> in > >>>> the > >>>>>>>>> documentation. > >>>>>>>>> > >>>>>>>>>> 11. Changelog state backend — not supported > >>>>>>>>>> As mentioned earlier, our primary target is embarrassingly > >>> parallel > >>>>>>>>> regions, which typically have no keyed state and therefore no > >> slow > >>>>>>>>> incremental state flush issues. I don’t think we need to support > >>>>>>>> Changelog > >>>>>>>>> state backend for now. > >>>>>>>>> > >>>>>>>>> ---------------------------------------- > >>>>>>>>> > >>>>>>>>>> 2. max-consecutive-failures exceeded — what exactly happens? > >>>>>>>>>> The current design says “force a global checkpoint.” To clarify > >>> the > >>>>>>>>> two-tier behavior: > >>>>>>>>>> ● Tier 1: When consecutiveRegionalCount >= > >> maxConsecutiveFailures, > >>>> the > >>>>>>>>> next checkpoint is forced to be global. > >>>>>>>>>> ● Tier 2: If that forced global checkpoint also fails (any task > >>>>>>>>> declines), the checkpoint is aborted normally (not a job > >> failure). > >>>> The > >>>>>>>>> counter is then reset since a global checkpoint was attempted, > >> and > >>>> the > >>>>>>>> next > >>>>>>>>> checkpoint cycle can try again. > >>>>>>>>>> This avoids cascading into job failure while ensuring we don’t > >>> drift > >>>>>>>>> indefinitely on historical state. > >>>>>>>>> > >>>>>>>>> My assumption was that we would not allow this particular failed > >>>> region > >>>>>>>> to > >>>>>>>>> fail the checkpoint again. > >>>>>>>>> But forcing a global checkpoint works as well. > >>>>>>>>> > >>>>>>>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new > >>>>>>>>> notification type > >>>>>>>>>> This is a very insightful point. Zihao and Gen also raised this > >> in > >>>>>>>>> earlier discussions. The current design doesn’t address state > >>> cleanup > >>>>>> for > >>>>>>>>> tasks in failed regions. I agree it’s necessary to introduce a > >> new > >>>>>>>>> notification type. For tasks in failed regions, local state > >> cleanup > >>>> can > >>>>>>>> be > >>>>>>>>> deferred until the next checkpoint trigger. > >>>>>>>>> Ok, this can be some future work. > >>>>>>>>> > >>>>>>>>>> 7. Task that never acknowledges nor declines — per-region > >> timeouts > >>>>>>>>>> This was discussed in the previous thread. Network issues may > >>> cause > >>>> a > >>>>>>>>> task to neither ack nor decline in time. In such cases, we treat > >> it > >>>> as > >>>>>> a > >>>>>>>>> checkpoint timeout: the affected tasks’ region is marked as > >> failed, > >>>> and > >>>>>>>> the > >>>>>>>>> process ultimately falls through to the normalRegional Checkpoint > >>>>>>>>> processing logic. > >>>>>>>>> Ok, this can be some future work. > >>>>>>>>> > >>>>>>>>> ---------------------------------------- > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP > >>>>>>>>> > >>>>>>>>> [2] > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Roman > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Roman > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> > >> wrote: > >>>>>>>>> > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> Thanks everyone for the valuable feedback. I believe all the > >>> points > >>>>>>>> raised > >>>>>>>>>> above have been addressed (@Roman @Rui Fan). If there are no > >>> further > >>>>>>>>>> concerns by next Monday (June 22), I'll go ahead and start the > >>>> [VOTE] > >>>>>>>>>> thread for this FLIP. > >>>>>>>>>> > >>>>>>>>>> For reference, the earlier related discussion can be found here: > >>>>>>>>>> > >> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Please feel free to share any additional feedback before then. > >>>>>>>>>> > >>>>>>>>>> Best Regards, > >>>>>>>>>> Raorao > >>>>>>>>>> > >>>>>>>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道: > >>>>>>>>>> > >>>>>>>>>> Hi devs, > >>>>>>>>>> > >>>>>>>>>> I would like to start a discussion on FLIP-XXX: Independent > >>>> Checkpoint > >>>>>>>>>> Based On Pipeline Region. > >>>>>>>>>> > >>>>>>>>>> In high-parallelism streaming jobs, a single Task's checkpoint > >>>> failure > >>>>>>>>>> causes the entire global Checkpoint to abort, leading to > >> degraded > >>>>>>>>>> checkpoint success rates and wasted compute resources > >> (especially > >>>> for > >>>>>>>> GPU > >>>>>>>>>> operators). > >>>>>>>>>> > >>>>>>>>>> We propose Regional Checkpoint: when some Regions fail to > >>>> checkpoint, > >>>>>>>> the > >>>>>>>>>> framework combines the historical state of the failed Regions > >> with > >>>> the > >>>>>>>>>> current state of the healthy Regions to produce a logically > >>> complete > >>>>>>>>>> Completed Checkpoint — while preserving state consistency. The > >> key > >>>>>>>> changes > >>>>>>>>>> are: > >>>>>>>>>> > >>>>>>>>>> 1. Snapshot Collection — Allow partial region failures; combine > >>> last > >>>>>>>>>> successful state of failed Regions with current state of normal > >>>>>> Regions. > >>>>>>>>>> > >>>>>>>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback > >>>>>>>> interface > >>>>>>>>>> for OperatorCoordinators to produce consistent snapshots against > >>> the > >>>>>>>> mixed > >>>>>>>>>> view. > >>>>>>>>>> > >>>>>>>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to > >>> prevent > >>>>>>>>>> premature cleanup of referenced historical checkpoints. > >>>>>>>>>> > >>>>>>>>>> The detailed design is described in the FLIP document: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing > >>>>>>>>>> > >>>>>>>>>> Looking forward to your feedback! > >>>>>>>>>> > >>>>>>>>>> Best regards, > >>>>>>>>>> > >>>>>>>>>> Raorao Xiong > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >>> > >> > >
