Hi Raorao, Thanks for the walk-through. That partially resolves my concern about re-assigning state: it doesn't have to be recomputed on every partial checkpoint — but the restored (redistributed) assignment has to be retained until the first checkpoint completes at the new parallelism.
My other concern still stands: the isolation argument doesn't obviously cover union operator state, which every subtask sees in full across all regions — so a partial checkpoint stores a temporal mix of elements, and rescaling reshuffles them across new region boundaries. > I'll cover all this in the FLIP document. Yes — I think it's better to capture these edge cases in a Rescaling section of the FLIP and discuss in more detail there. Looking forward to it. Regards, Roman On Fri, Jun 26, 2026 at 10:06 AM 熊饶饶 <[email protected]> wrote: > Hi Roman, thanks for the detailed follow-up. > Rescaling and state redistribution > StateAssignmentOperation runs only once per restore — it doesn’t run per > referenced checkpoint. The subtask states entering StateAssignmentOperation > have already been pre-merged during the Regional Checkpoint completion > phase. Each OperatorSubtaskState already physically contains the state > handles from whichever checkpoint it originated from. The refCheckpointId > is just metadata for the cleaner. > Example: P=4 → P=2 → P=1, two rounds of downscaling > Initial state (P=4, even-split operator state): > ckp100 (T1, GLOBAL): all 4 regions ack > subtask 0: [A0, A1] region 0 > subtask 1: [B0, B1] region 1 > subtask 2: [C0, C1] region 2 > subtask 3: [D0, D1] region 3 > First downscale: P=4 → P=2 (recover from ckp100 after failover) > Even-split redistribution: > concatenate: [A0,A1, B0,B1, C0,C1, D0,D1] → 8 elements > split into 2: > new subtask 0: [A0, A1, B0, B1] > new subtask 1: [C0, C1, D0, D1] > Now running at P=2: > ckp101 (T2, REGIONAL): > new-subtask 0 ack: [E0, E1, E2, E3] > new-subtask 1 decline, fallback to ckp100 → [C0,C1,D0,D1] (ref=100) > > ckp101 metadata (pre-merged): > new-subtask 0: stateHandles=[E0,E1,E2,E3], refCheckpointId=null > new-subtask 1: stateHandles=[C0,C1,D0,D1], refCheckpointId=100 > Second downscale: P=2 → P=1 (recover from ckp101) > Even-split redistribution: > concatenate: [E0,E1,E2,E3, C0,C1,D0,D1] → 8 elements > split into 1: > new subtask 0: [E0,E1,E2,E3, C0,C1,D0,D1] > At this point, the single subtask’s state contains elements from two time > points — this is safe per the partition isolation argument above. Now the > job continues running at P=1: > ckp102 (T3, REGIONAL): > new subtask 0 decline, fallback to ckp101's redistributed state > → ref=101 > > ckp102 metadata (pre-merged): > subtask 0: stateHandles=[E0,E1,E2,E3, C0,C1,D0,D1], refCheckpointId=101 > > ref chain: ckp102(ref=101) → ckp101(ref=100) → ckp100 > The ref chain is tracked entirely in the CompletedCheckpoint store > metadata. Rescaling is a restore operation — it redistributes state handles > to TMs but doesn't create new checkpoint metadata. The next Regional > Checkpoint (ckp102) references the previous CompletedCheckpoint (ckp101) > from the store, which still has its original ref=100 annotation in the > metadata. > > Union state, same scenario: > ckp100 (T1, GLOBAL): P=4, union operator state > subtask 0: [A0, A1] > subtask 1: [B0, B1] > subtask 2: [C0, C1] > subtask 3: [D0, D1] > > P=4 → P=2 (union): > new subtask 0 gets FULL concatenated: [A0,A1, B0,B1, C0,C1, D0,D1] > new subtask 1 gets FULL concatenated: same > > ckp101 (T2, REGIONAL): > new-subtask 0 ack: [E0, ..., E7] > new-subtask 1 decline, fallback → [A0,A1,B0,B1,C0,C1,D0,D1] (ref=100) > > P=2 → P=1 (union): > new subtask 0 gets: [E0,...,E7, A0,...,D1] ← complete set, safe > Union is even simpler — each subtask always gets the complete state, so > mixing doesn’t add new complexity. > refCheckpointId chain bounded by maxConsecutiveFailures > > ckp100 (global) > → ckp101 (regional, ref=100) consecutive=1 > → ckp102 (regional, ref=101) consecutive=2 > → ckp103: consecutive >= maxConsecutiveFailures(2) > → FORCED GLOBAL → if success, chain resets > The reference chain can only grow as deep as maxConsecutiveFailures. Once > a global checkpoint succeeds, it’s a fresh start. No inflation beyond the > configured limit. > Channel state redistribution > Each channel state handle carries InputChannelInfo / > ResultSubpartitionInfo. TaskStateAssignment maps handles by their channel > metadata — time of origin is irrelevant. Per-subtask channel state and > operator state come from the same checkpoint snapshot, ensuring internal > consistency regardless of which checkpoint other subtasks reference. > Summary > 1. One StateAssignmentOperation per restore — states are pre-merged > 2. POINTWISE partition isolation makes mixing states across time points > safe for both even-split and union > 3. maxConsecutiveFailures bounds refCheckpointId chain depth > 4. Channel state redistribution is time-agnostic > > I’ll cover all this in the FLIP document. > > Thanks again for the detailed replies, looking forward to your feedback. > > Regards, > Raorao > > > 2026年6月24日 18:38,Roman Khachatryan <[email protected]> 写道: > > > > Hi Raorao, thanks for your answers > > > >>> 2. Rescaling for OperatorSubtaskState > > > >> `refCheckpointId` is checkpoint-level metadata stored in the completed > > checkpoint — > >> it doesn't participate in state redistribution during rescaling. The > > `TaskStateAssignment` > >> redistributes only the state handles to new subtasks, while the > > `CompletedCheckpointStore` > >> retains the original metadata (including `refCheckpointId`) for the > > cleaner to trace > >> reference chains. So rescaling is inherently compatible. > > > > Do you mean that we'll run StateAssignmentOperation per every referenced > > checkpoint on every "partial" checkpoint completion? > > (it can be quite heavy) > > > > I don't think that "redistributes only the state handles to new subtasks" > > part is true - on rescaling, distribution changes for (potentially) every > > sub-task. > > > > From the docs [1]: > > - Even-split redistribution: Each operator returns a List of state > > elements. The whole state is logically a concatenation of all lists. On > > restore/redistribution, the list is evenly divided into as many sublists > as > > there are parallel operators. Each operator gets a sublist, which can be > > empty, or contain one or more elements. > > - Union redistribution: Each operator returns a List of state elements. > The > > whole state is logically a concatenation of all lists. On > > restore/redistribution, each operator gets the complete list of state > > elements. ... > > - (we said earlier that keyed stated is not supported) > > > > So I don't see how any of the above can work because we might mix old and > > new states in a sub-task. > > > > For example: > > - a job is running with parallelism 2 and has even-split state > distribution > > - 1st checkpoint is completes by 2/2 sub-tasks > > - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the > > checkpoint; its state in checkpoint 2 refers to "subtask 2 state in > > checkpoint 1" > > - job is rescaled to 1 > > - sub-task 1 now has state from checkpoint 1 AND 2? > > > > Furthermore, with multiple rounds of downscaling, there can be a single > > sub-task referring to multiple historical checkpoints. > > > > For Union, it's even more problematic. > > > > Also, what about channel state (Unaligned checkpoint) re-distribution? > > > > Probably it's better to have a FLIP document with the corresponding > section > > first and then discuss it. > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state > > > > Regards, > > Roman > > > > > > On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote: > > > >> Hi Zakelly, > >> > >> Thanks for the feedback! I completely agree — forcing region-level > merging > >> would significantly reduce the effectiveness of file merging, and that's > >> not the right trade-off. > >> > >> I think TM-level merging can work correctly with Regional Checkpoint if > we > >> ensure the FileMergingSnapshotManager properly handles the new > checkpoint > >> notification semantics. Specifically: > >> > >> - The FileMergingSnapshotManager already uses a notification-based > >> lifecycle (checkpoint complete/abort/subsumed) to manage physical file > >> deletion. > >> - For Regional Checkpoint, we need to introduce the new notification > type > >> we discussed earlier (notify partially-completed) so that the merging > >> manager knows: this checkpoint is complete but some segments belong to > >> failed regions — keep the physical files that contain those segments > alive > >> until the referencing checkpoints are subsumed. > >> - This aligns with the existing design and requires only minor > adjustments > >> to the merging manager's notification handling. > >> > >> The previous suggestion of region-level merging was overly conservative > — > >> glad you pointed out the better approach. > >> > >> Also good to know about the plan to cover channel state in FLIP-306. > That > >> will simplify the compatibility matrix significantly. > >> > >> Best Regards, > >> Raorao > >> > >>> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道: > >>> > >>> Hi Raorao, > >>> > >>> Good to see this proposal and +1 for the direction. Sorry for joining > the > >>> discussion late. I have seen many constructive suggestions that largely > >>> align with my thoughts, but I still have one concern: > >>> > >>> I'm one of the authors of FLIP-306 and I'm not in favor of region-level > >>> merging. IIUC, region-level merge files severely limit the > effectiveness > >> of > >>> merging, as merging cannot happen between subtasks. I think it should > >> still > >>> be possible to perform TM-level merge. The only thing we should do is > to > >>> keep previous checkpoint files alive when a region checkpoint occurs. > >> This > >>> does not conflict with the current design. It is only necessary to > ensure > >>> that the new behavior of checkpoint notifications is compatible with > >>> FLIP-306, or some minor adjustment needed towards the merging manager. > >>> > >>> And BTW to @Rui, we still need more work to let FLIP-306 cover channel > >>> state and deprecate FLINK-26803, and I will look into this soon. > >>> > >>> > >>> Best, > >>> Zakelly > >>> > >>> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote: > >>> > >>>> Hi Roman, thanks for the follow-up. > >>>> > >>>> 1. FLIP page > >>>> > >>>> Thanks for your reminder, I'll create the proper FLIP page before > >> starting > >>>> [VOTE] thread. > >>>> > >>>> 2. Rescaling for OperatorSubtaskState > >>>> > >>>> `refCheckpointId` is checkpoint-level metadata stored in the completed > >>>> checkpoint — it doesn't participate in state redistribution during > >>>> rescaling. The `TaskStateAssignment` redistributes only the state > >> handles > >>>> to new subtasks, while the `CompletedCheckpointStore` retains the > >> original > >>>> metadata (including `refCheckpointId`) for the cleaner to trace > >> reference > >>>> chains. So rescaling is inherently compatible. > >>>> > >>>> 3. Finished operators > >>>> > >>>> You're right, my previous answer missed the real issue. For bounded > >> source > >>>> jobs (FLIP-147), if the final checkpoint is Regional and a failed > >> Region's > >>>> subtask misses `notifyCheckpointComplete`, their side effects (e.g., > >> Kafka > >>>> transactions) are never committed — that's data loss. > >>>> > >>>> Solution: force a global checkpoint when the job is about to > terminate. > >>>> When all sources are exhausted, the JM will mark the next checkpoint > as > >>>> mandatory global, requiring all regions to ack. If it fails, the job > >>>> retries rather than terminating with a partial snapshot. > >>>> > >>>> 4. Limitations section > >>>> > >>>> I agree with you, and I'll add a section covering all limitations: > >>>> > >>>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime > >>>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging) > >>>> NO_CLAIM restore mode -> Warn (require global checkpoint before > snapshot > >>>> deletion) > >>>> Changelog state backend -> Reject job submission > >>>> Finished operators (FLIP-147) -> Force global checkpoint when sources > >> are > >>>> exhausted > >>>> > >>>> > >>>> Thanks again for the review, looking forward to your feedback. > >>>> > >>>> Regards, > >>>> Raorao > >>>> > >>>> > >>>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道: > >>>>> > >>>>> Hi, thanks for your replies and sorry for the delay. > >>>>> > >>>>> Most of my questions were answered, but I still have some concerns. > >>>>> > >>>>>> If there are no further concerns by next Monday (June 22), I'll go > >> ahead > >>>>> and start the [VOTE] thread for this FLIP. > >>>>> > >>>>> Isn't the actual FLIP still missing? I only saw Google Document. Do > you > >>>>> mind creating a page according to [1]? > >>>>> > >>>>> ---------------------------------------- > >>>>> > >>>>>> 3. Checkpoint metadata layout > >>>>>> Regional Checkpoint recombines state from different checkpoint IDs. > To > >>>>> track this, we add a refCheckpointId field to OperatorSubtaskState in > >> the > >>>>> metadata, indicating which historical checkpoint a subtask’s state > >>>>> references. > >>>>> > >>>>> Could you explain how do we find the right OperatorSubtaskState - > >>>>> especially in case of rescaling? > >>>>> Does the proposal support rescaling? > >>>>> > >>>>>> 9. Finished operators > >>>>>> The concern is: a finished operator’s final commit notification gets > >>>>> skipped by Regional Checkpoint, and if this checkpoint is the last > one, > >>>> the > >>>>> operator never receives it — could this cause data loss? > >>>>>> In practice, the impact is limited: > >>>>>> ● Failed Region tasks are already gone: By the time the Regional > >>>>> Checkpoint completes, tasks in the failed Region have already been > >>>>> restarted (decline) or cancelled (timeout). There is no task left to > >>>>> receive the notification anyway. > >>>>> Checkpoint failure doesn't necessarily cause a restart (especially if > >>>> this > >>>>> is limited to one region). The tasks should still be up and running. > >>>>> > >>>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After > >> reaching > >>>>> the limit, the next checkpoint is forced to be global, ensuring all > >> tasks > >>>>> eventually receive notifyCheckpointComplete. We can’t skip the same > >>>> Region > >>>>> forever. > >>>>> maxConsecutiveFailures might not be reached for the final checkpoint. > >>>>> > >>>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user > >> stops > >>>>> the job gracefully, it triggers a full global snapshot, not a > Regional > >>>>> Checkpoint. So the final checkpoint is always complete. > >>>>> stop-with-savepoint should be fine, yes. > >>>>> > >>>>> To clarify, my concern is about jobs with bounded sources. In such > >> cases, > >>>>> some subtasks might finish processing but still participate in > >>>> checkpoints. > >>>>> After a successful checkpoint, they are guaranteed to get checkpoint > >>>>> completion notification - so that they can make side effects visible > in > >>>>> external systems (commit Kafka transactions). > >>>>> See FLIP-147 [2] > >>>>> > >>>>> However, with the current proposal, the job might complete with some > >>>>> subtasks/regions failing the final checkpoint unless I'm missing > >>>> something. > >>>>> This is essentially data loss. > >>>>> To prevent this, the final checkpoint must always be acked by all > >>>>> subtasks/regions. > >>>>> > >>>>> ---------------------------------------- > >>>>> > >>>>> There are quite some limitations in this proposal. > >>>>> Could you add a section describing how each of them is handled? > >>>>> 1. Reject job submission > >>>>> 2. Force all-region checkpoint > >>>>> 3. Warn in documentation > >>>>> > >>>>>> 1. Region independence — BLOCKING/HYBRID edges > >>>>>> You’re right. Our current scope is limited to embarrassingly > parallel > >>>>> regions. In typical ETL scenarios, each parallelism maps to an > >>>> independent > >>>>> Region with no edges connecting them. > >>>>> > >>>>>> 5. SharedStateRegistry — how are old states kept alive? > >>>>>> Good question. In the current design, since we only target > >>>> embarrassingly > >>>>> parallel regions, there is typically no keyed state and no > incremental > >>>>> state. As a result, the SharedStateRegistry is generally empty > (setting > >>>>> aside File Merging and Changelog State for now, discussed on 8.), so > >>>>> keep-alive of files under the shared directory is not a concern. > >>>>> > >>>>>> 8. FLINK-26803 and FLIP-306 compatibility > >>>>>> This is a very important point. Both features essentially merge > small > >>>>> files at the job level. As Rui Fan pointed out, if the merging > >>>> granularity > >>>>> is reduced to the Region level, compatibility with Regional > Checkpoint > >>>>> should be achievable in theory. I think this can be deferred to > future > >>>> work > >>>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit and > >>>> enable > >>>>> support. > >>>>> > >>>>>> 10. NO_CLAIM mode warning > >>>>>> You’re absolutely right — this is an important reminder. After > >> restoring > >>>>> from a Regional Checkpoint, only a successful global checkpoint > >>>> guarantees > >>>>> independence from the old state. We’ll add a clear user warning in > the > >>>>> documentation. > >>>>> > >>>>>> 11. Changelog state backend — not supported > >>>>>> As mentioned earlier, our primary target is embarrassingly parallel > >>>>> regions, which typically have no keyed state and therefore no slow > >>>>> incremental state flush issues. I don’t think we need to support > >>>> Changelog > >>>>> state backend for now. > >>>>> > >>>>> ---------------------------------------- > >>>>> > >>>>>> 2. max-consecutive-failures exceeded — what exactly happens? > >>>>>> The current design says “force a global checkpoint.” To clarify the > >>>>> two-tier behavior: > >>>>>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, > the > >>>>> next checkpoint is forced to be global. > >>>>>> ● Tier 2: If that forced global checkpoint also fails (any task > >>>>> declines), the checkpoint is aborted normally (not a job failure). > The > >>>>> counter is then reset since a global checkpoint was attempted, and > the > >>>> next > >>>>> checkpoint cycle can try again. > >>>>>> This avoids cascading into job failure while ensuring we don’t drift > >>>>> indefinitely on historical state. > >>>>> > >>>>> My assumption was that we would not allow this particular failed > region > >>>> to > >>>>> fail the checkpoint again. > >>>>> But forcing a global checkpoint works as well. > >>>>> > >>>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new > >>>>> notification type > >>>>>> This is a very insightful point. Zihao and Gen also raised this in > >>>>> earlier discussions. The current design doesn’t address state cleanup > >> for > >>>>> tasks in failed regions. I agree it’s necessary to introduce a new > >>>>> notification type. For tasks in failed regions, local state cleanup > can > >>>> be > >>>>> deferred until the next checkpoint trigger. > >>>>> Ok, this can be some future work. > >>>>> > >>>>>> 7. Task that never acknowledges nor declines — per-region timeouts > >>>>>> This was discussed in the previous thread. Network issues may cause > a > >>>>> task to neither ack nor decline in time. In such cases, we treat it > as > >> a > >>>>> checkpoint timeout: the affected tasks’ region is marked as failed, > and > >>>> the > >>>>> process ultimately falls through to the normalRegional Checkpoint > >>>>> processing logic. > >>>>> Ok, this can be some future work. > >>>>> > >>>>> ---------------------------------------- > >>>>> > >>>>> [1] > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP > >>>>> > >>>>> [2] > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > >>>>> > >>>>> Regards, > >>>>> Roman > >>>>> > >>>>> Regards, > >>>>> Roman > >>>>> > >>>>> > >>>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> Thanks everyone for the valuable feedback. I believe all the points > >>>> raised > >>>>>> above have been addressed (@Roman @Rui Fan). If there are no further > >>>>>> concerns by next Monday (June 22), I'll go ahead and start the > [VOTE] > >>>>>> thread for this FLIP. > >>>>>> > >>>>>> For reference, the earlier related discussion can be found here: > >>>>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf > >>>>>> > >>>>>> > >>>>>> Please feel free to share any additional feedback before then. > >>>>>> > >>>>>> Best Regards, > >>>>>> Raorao > >>>>>> > >>>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道: > >>>>>> > >>>>>> Hi devs, > >>>>>> > >>>>>> I would like to start a discussion on FLIP-XXX: Independent > Checkpoint > >>>>>> Based On Pipeline Region. > >>>>>> > >>>>>> In high-parallelism streaming jobs, a single Task's checkpoint > failure > >>>>>> causes the entire global Checkpoint to abort, leading to degraded > >>>>>> checkpoint success rates and wasted compute resources (especially > for > >>>> GPU > >>>>>> operators). > >>>>>> > >>>>>> We propose Regional Checkpoint: when some Regions fail to > checkpoint, > >>>> the > >>>>>> framework combines the historical state of the failed Regions with > the > >>>>>> current state of the healthy Regions to produce a logically complete > >>>>>> Completed Checkpoint — while preserving state consistency. The key > >>>> changes > >>>>>> are: > >>>>>> > >>>>>> 1. Snapshot Collection — Allow partial region failures; combine last > >>>>>> successful state of failed Regions with current state of normal > >> Regions. > >>>>>> > >>>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback > >>>> interface > >>>>>> for OperatorCoordinators to produce consistent snapshots against the > >>>> mixed > >>>>>> view. > >>>>>> > >>>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent > >>>>>> premature cleanup of referenced historical checkpoints. > >>>>>> > >>>>>> The detailed design is described in the FLIP document: > >>>>>> > >>>>>> > >>>> > >> > https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing > >>>>>> > >>>>>> Looking forward to your feedback! > >>>>>> > >>>>>> Best regards, > >>>>>> > >>>>>> Raorao Xiong > >>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >
