Hi Raorao,

Thanks for the walk-through.
That partially resolves my concern about re-assigning state: it doesn't
have to be recomputed on every partial checkpoint — but the restored
(redistributed) assignment has to be retained until the first checkpoint
completes at the new parallelism.

My other concern still stands: the isolation argument doesn't obviously
cover union operator state, which every subtask sees in full across all
regions — so a partial checkpoint stores a temporal mix of elements, and
rescaling reshuffles them across new region boundaries.

> I'll cover all this in the FLIP document.

Yes — I think it's better to capture these edge cases in a Rescaling
section of the FLIP and discuss in more detail there. Looking forward to it.

Regards,
Roman


On Fri, Jun 26, 2026 at 10:06 AM 熊饶饶 <[email protected]> wrote:

> Hi Roman, thanks for the detailed follow-up.
> Rescaling and state redistribution
> StateAssignmentOperation runs only once per restore — it doesn’t run per
> referenced checkpoint. The subtask states entering StateAssignmentOperation
> have already been pre-merged during the Regional Checkpoint completion
> phase. Each OperatorSubtaskState already physically contains the state
> handles from whichever checkpoint it originated from. The refCheckpointId
> is just metadata for the cleaner.
> Example: P=4 → P=2 → P=1, two rounds of downscaling
> Initial state (P=4, even-split operator state):
> ckp100 (T1, GLOBAL): all 4 regions ack
>   subtask 0: [A0, A1]    region 0
>   subtask 1: [B0, B1]    region 1
>   subtask 2: [C0, C1]    region 2
>   subtask 3: [D0, D1]    region 3
> First downscale: P=4 → P=2 (recover from ckp100 after failover)
> Even-split redistribution:
> concatenate: [A0,A1, B0,B1, C0,C1, D0,D1]  → 8 elements
> split into 2:
>   new subtask 0: [A0, A1, B0, B1]
>   new subtask 1: [C0, C1, D0, D1]
> Now running at P=2:
> ckp101 (T2, REGIONAL):
>   new-subtask 0 ack: [E0, E1, E2, E3]
>   new-subtask 1 decline, fallback to ckp100 → [C0,C1,D0,D1]  (ref=100)
>
> ckp101 metadata (pre-merged):
>   new-subtask 0: stateHandles=[E0,E1,E2,E3], refCheckpointId=null
>   new-subtask 1: stateHandles=[C0,C1,D0,D1], refCheckpointId=100
> Second downscale: P=2 → P=1 (recover from ckp101)
> Even-split redistribution:
> concatenate: [E0,E1,E2,E3, C0,C1,D0,D1]  → 8 elements
> split into 1:
>   new subtask 0: [E0,E1,E2,E3, C0,C1,D0,D1]
> At this point, the single subtask’s state contains elements from two time
> points — this is safe per the partition isolation argument above. Now the
> job continues running at P=1:
> ckp102 (T3, REGIONAL):
>   new subtask 0 decline, fallback to ckp101's redistributed state
>   → ref=101
>
> ckp102 metadata (pre-merged):
>   subtask 0: stateHandles=[E0,E1,E2,E3, C0,C1,D0,D1], refCheckpointId=101
>
> ref chain: ckp102(ref=101) → ckp101(ref=100) → ckp100
> The ref chain is tracked entirely in the CompletedCheckpoint store
> metadata. Rescaling is a restore operation — it redistributes state handles
> to TMs but doesn't create new checkpoint metadata. The next Regional
> Checkpoint (ckp102) references the previous CompletedCheckpoint (ckp101)
> from the store, which still has its original ref=100 annotation in the
> metadata.
>
> Union state, same scenario:
> ckp100 (T1, GLOBAL): P=4, union operator state
>   subtask 0: [A0, A1]
>   subtask 1: [B0, B1]
>   subtask 2: [C0, C1]
>   subtask 3: [D0, D1]
>
> P=4 → P=2 (union):
>   new subtask 0 gets FULL concatenated: [A0,A1, B0,B1, C0,C1, D0,D1]
>   new subtask 1 gets FULL concatenated: same
>
> ckp101 (T2, REGIONAL):
>   new-subtask 0 ack: [E0, ..., E7]
>   new-subtask 1 decline, fallback → [A0,A1,B0,B1,C0,C1,D0,D1]  (ref=100)
>
> P=2 → P=1 (union):
>   new subtask 0 gets: [E0,...,E7, A0,...,D1]  ← complete set, safe
> Union is even simpler — each subtask always gets the complete state, so
> mixing doesn’t add new complexity.
> refCheckpointId chain bounded by maxConsecutiveFailures
>
> ckp100 (global)
>   → ckp101 (regional, ref=100)  consecutive=1
>   → ckp102 (regional, ref=101)  consecutive=2
>   → ckp103: consecutive >= maxConsecutiveFailures(2)
>      → FORCED GLOBAL → if success, chain resets
> The reference chain can only grow as deep as maxConsecutiveFailures. Once
> a global checkpoint succeeds, it’s a fresh start. No inflation beyond the
> configured limit.
> Channel state redistribution
> Each channel state handle carries InputChannelInfo /
> ResultSubpartitionInfo. TaskStateAssignment maps handles by their channel
> metadata — time of origin is irrelevant. Per-subtask channel state and
> operator state come from the same checkpoint snapshot, ensuring internal
> consistency regardless of which checkpoint other subtasks reference.
> Summary
> 1.  One StateAssignmentOperation per restore — states are pre-merged
> 2.  POINTWISE partition isolation makes mixing states across time points
> safe for both even-split and union
> 3.  maxConsecutiveFailures bounds refCheckpointId chain depth
> 4.  Channel state redistribution is time-agnostic
>
> I’ll cover all this in the FLIP document.
>
> Thanks again for the detailed replies, looking forward to your feedback.
>
> Regards,
> Raorao
>
> > 2026年6月24日 18:38,Roman Khachatryan <[email protected]> 写道:
> >
> > Hi Raorao, thanks for your answers
> >
> >>> 2. Rescaling for OperatorSubtaskState
> >
> >> `refCheckpointId` is checkpoint-level metadata stored in the completed
> > checkpoint —
> >> it doesn't participate in state redistribution during rescaling. The
> > `TaskStateAssignment`
> >> redistributes only the state handles to new subtasks, while the
> > `CompletedCheckpointStore`
> >> retains the original metadata (including `refCheckpointId`) for the
> > cleaner to trace
> >> reference chains. So rescaling is inherently compatible.
> >
> > Do you mean that we'll run StateAssignmentOperation per every referenced
> > checkpoint on every "partial" checkpoint completion?
> > (it can be quite heavy)
> >
> > I don't think that "redistributes only the state handles to new subtasks"
> > part is true - on rescaling, distribution changes for (potentially) every
> > sub-task.
> >
> > From the docs [1]:
> > - Even-split redistribution: Each operator returns a List of state
> > elements. The whole state is logically a concatenation of all lists. On
> > restore/redistribution, the list is evenly divided into as many sublists
> as
> > there are parallel operators. Each operator gets a sublist, which can be
> > empty, or contain one or more elements.
> > - Union redistribution: Each operator returns a List of state elements.
> The
> > whole state is logically a concatenation of all lists. On
> > restore/redistribution, each operator gets the complete list of state
> > elements. ...
> > - (we said earlier that keyed stated is not supported)
> >
> > So I don't see how any of the above can work because we might mix old and
> > new states in a sub-task.
> >
> > For example:
> > - a job is running with parallelism 2 and has even-split state
> distribution
> > - 1st checkpoint is completes by 2/2 sub-tasks
> > - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the
> > checkpoint; its state in checkpoint 2 refers to "subtask 2 state in
> > checkpoint 1"
> > - job is rescaled to 1
> > - sub-task 1 now has state from checkpoint 1 AND 2?
> >
> > Furthermore, with multiple rounds of downscaling, there can be a single
> > sub-task referring to multiple historical checkpoints.
> >
> > For Union, it's even more problematic.
> >
> > Also, what about channel state (Unaligned checkpoint) re-distribution?
> >
> > Probably it's better to have a FLIP document with the corresponding
> section
> > first and then discuss it.
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state
> >
> > Regards,
> > Roman
> >
> >
> > On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote:
> >
> >> Hi Zakelly,
> >>
> >> Thanks for the feedback! I completely agree — forcing region-level
> merging
> >> would significantly reduce the effectiveness of file merging, and that's
> >> not the right trade-off.
> >>
> >> I think TM-level merging can work correctly with Regional Checkpoint if
> we
> >> ensure the FileMergingSnapshotManager properly handles the new
> checkpoint
> >> notification semantics. Specifically:
> >>
> >> - The FileMergingSnapshotManager already uses a notification-based
> >> lifecycle (checkpoint complete/abort/subsumed) to manage physical file
> >> deletion.
> >> - For Regional Checkpoint, we need to introduce the new notification
> type
> >> we discussed earlier (notify partially-completed) so that the merging
> >> manager knows: this checkpoint is complete but some segments belong to
> >> failed regions — keep the physical files that contain those segments
> alive
> >> until the referencing checkpoints are subsumed.
> >> - This aligns with the existing design and requires only minor
> adjustments
> >> to the merging manager's notification handling.
> >>
> >> The previous suggestion of region-level merging was overly conservative
> —
> >> glad you pointed out the better approach.
> >>
> >> Also good to know about the plan to cover channel state in FLIP-306.
> That
> >> will simplify the compatibility matrix significantly.
> >>
> >> Best Regards,
> >> Raorao
> >>
> >>> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道:
> >>>
> >>> Hi Raorao,
> >>>
> >>> Good to see this proposal and +1 for the direction. Sorry for joining
> the
> >>> discussion late. I have seen many constructive suggestions that largely
> >>> align with my thoughts, but I still have one concern:
> >>>
> >>> I'm one of the authors of FLIP-306 and I'm not in favor of region-level
> >>> merging. IIUC, region-level merge files severely limit the
> effectiveness
> >> of
> >>> merging, as merging cannot happen between subtasks. I think it should
> >> still
> >>> be possible to perform TM-level merge. The only thing we should do is
> to
> >>> keep previous checkpoint files alive when a region checkpoint occurs.
> >> This
> >>> does not conflict with the current design. It is only necessary to
> ensure
> >>> that the new behavior of checkpoint notifications is compatible with
> >>> FLIP-306, or some minor adjustment needed towards the merging manager.
> >>>
> >>> And BTW to @Rui, we still need more work to let FLIP-306 cover channel
> >>> state and deprecate FLINK-26803, and I will look into this soon.
> >>>
> >>>
> >>> Best,
> >>> Zakelly
> >>>
> >>> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote:
> >>>
> >>>> Hi Roman, thanks for the follow-up.
> >>>>
> >>>> 1. FLIP page
> >>>>
> >>>> Thanks for your reminder, I'll create the proper FLIP page before
> >> starting
> >>>> [VOTE] thread.
> >>>>
> >>>> 2. Rescaling for OperatorSubtaskState
> >>>>
> >>>> `refCheckpointId` is checkpoint-level metadata stored in the completed
> >>>> checkpoint — it doesn't participate in state redistribution during
> >>>> rescaling. The `TaskStateAssignment` redistributes only the state
> >> handles
> >>>> to new subtasks, while the `CompletedCheckpointStore` retains the
> >> original
> >>>> metadata (including `refCheckpointId`) for the cleaner to trace
> >> reference
> >>>> chains. So rescaling is inherently compatible.
> >>>>
> >>>> 3. Finished operators
> >>>>
> >>>> You're right, my previous answer missed the real issue. For bounded
> >> source
> >>>> jobs (FLIP-147), if the final checkpoint is Regional and a failed
> >> Region's
> >>>> subtask misses `notifyCheckpointComplete`, their side effects (e.g.,
> >> Kafka
> >>>> transactions) are never committed — that's data loss.
> >>>>
> >>>> Solution: force a global checkpoint when the job is about to
> terminate.
> >>>> When all sources are exhausted, the JM will mark the next checkpoint
> as
> >>>> mandatory global, requiring all regions to ack. If it fails, the job
> >>>> retries rather than terminating with a partial snapshot.
> >>>>
> >>>> 4. Limitations section
> >>>>
> >>>> I agree with you, and I'll add a section covering all limitations:
> >>>>
> >>>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime
> >>>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
> >>>> NO_CLAIM restore mode -> Warn (require global checkpoint before
> snapshot
> >>>> deletion)
> >>>> Changelog state backend ->  Reject job submission
> >>>> Finished operators (FLIP-147) -> Force global checkpoint when sources
> >> are
> >>>> exhausted
> >>>>
> >>>>
> >>>> Thanks again for the review, looking forward to your feedback.
> >>>>
> >>>> Regards,
> >>>> Raorao
> >>>>
> >>>>
> >>>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
> >>>>>
> >>>>> Hi, thanks for your replies and sorry for the delay.
> >>>>>
> >>>>> Most of my questions were answered, but I still have some concerns.
> >>>>>
> >>>>>> If there are no further concerns by next Monday (June 22), I'll go
> >> ahead
> >>>>> and start the [VOTE] thread for this FLIP.
> >>>>>
> >>>>> Isn't the actual FLIP still missing? I only saw Google Document. Do
> you
> >>>>> mind creating a page according to [1]?
> >>>>>
> >>>>> ----------------------------------------
> >>>>>
> >>>>>> 3. Checkpoint metadata layout
> >>>>>> Regional Checkpoint recombines state from different checkpoint IDs.
> To
> >>>>> track this, we add a refCheckpointId field to OperatorSubtaskState in
> >> the
> >>>>> metadata, indicating which historical checkpoint a subtask’s state
> >>>>> references.
> >>>>>
> >>>>> Could you explain how do we find the right OperatorSubtaskState -
> >>>>> especially in case of rescaling?
> >>>>> Does the proposal support rescaling?
> >>>>>
> >>>>>> 9. Finished operators
> >>>>>> The concern is: a finished operator’s final commit notification gets
> >>>>> skipped by Regional Checkpoint, and if this checkpoint is the last
> one,
> >>>> the
> >>>>> operator never receives it — could this cause data loss?
> >>>>>> In practice, the impact is limited:
> >>>>>> ● Failed Region tasks are already gone: By the time the Regional
> >>>>> Checkpoint completes, tasks in the failed Region have already been
> >>>>> restarted (decline) or cancelled (timeout). There is no task left to
> >>>>> receive the notification anyway.
> >>>>> Checkpoint failure doesn't necessarily cause a restart (especially if
> >>>> this
> >>>>> is limited to one region). The tasks should still be up and running.
> >>>>>
> >>>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After
> >> reaching
> >>>>> the limit, the next checkpoint is forced to be global, ensuring all
> >> tasks
> >>>>> eventually receive notifyCheckpointComplete. We can’t skip the same
> >>>> Region
> >>>>> forever.
> >>>>> maxConsecutiveFailures might not be reached for the final checkpoint.
> >>>>>
> >>>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user
> >> stops
> >>>>> the job gracefully, it triggers a full global snapshot, not a
> Regional
> >>>>> Checkpoint. So the final checkpoint is always complete.
> >>>>> stop-with-savepoint should be fine, yes.
> >>>>>
> >>>>> To clarify, my concern is about jobs with bounded sources. In such
> >> cases,
> >>>>> some subtasks might finish processing but still participate in
> >>>> checkpoints.
> >>>>> After a successful checkpoint, they are guaranteed to get checkpoint
> >>>>> completion notification - so that they can make side effects visible
> in
> >>>>> external systems (commit Kafka transactions).
> >>>>> See FLIP-147 [2]
> >>>>>
> >>>>> However, with the current proposal, the job might complete with some
> >>>>> subtasks/regions failing the final checkpoint unless I'm missing
> >>>> something.
> >>>>> This is essentially data loss.
> >>>>> To prevent this, the final checkpoint must always be acked by all
> >>>>> subtasks/regions.
> >>>>>
> >>>>> ----------------------------------------
> >>>>>
> >>>>> There are quite some limitations in this proposal.
> >>>>> Could you add a section describing how each of them is handled?
> >>>>> 1. Reject job submission
> >>>>> 2. Force all-region checkpoint
> >>>>> 3. Warn in documentation
> >>>>>
> >>>>>> 1. Region independence — BLOCKING/HYBRID edges
> >>>>>> You’re right. Our current scope is limited to embarrassingly
> parallel
> >>>>> regions. In typical ETL scenarios, each parallelism maps to an
> >>>> independent
> >>>>> Region with no edges connecting them.
> >>>>>
> >>>>>> 5. SharedStateRegistry — how are old states kept alive?
> >>>>>> Good question. In the current design, since we only target
> >>>> embarrassingly
> >>>>> parallel regions, there is typically no keyed state and no
> incremental
> >>>>> state. As a result, the SharedStateRegistry is generally empty
> (setting
> >>>>> aside File Merging and Changelog State for now, discussed on 8.), so
> >>>>> keep-alive of files under the shared directory is not a concern.
> >>>>>
> >>>>>> 8. FLINK-26803 and FLIP-306 compatibility
> >>>>>> This is a very important point. Both features essentially merge
> small
> >>>>> files at the job level. As Rui Fan pointed out, if the merging
> >>>> granularity
> >>>>> is reduced to the Region level, compatibility with Regional
> Checkpoint
> >>>>> should be achievable in theory. I think this can be deferred to
> future
> >>>> work
> >>>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit and
> >>>> enable
> >>>>> support.
> >>>>>
> >>>>>> 10. NO_CLAIM mode warning
> >>>>>> You’re absolutely right — this is an important reminder. After
> >> restoring
> >>>>> from a Regional Checkpoint, only a successful global checkpoint
> >>>> guarantees
> >>>>> independence from the old state. We’ll add a clear user warning in
> the
> >>>>> documentation.
> >>>>>
> >>>>>> 11. Changelog state backend — not supported
> >>>>>> As mentioned earlier, our primary target is embarrassingly parallel
> >>>>> regions, which typically have no keyed state and therefore no slow
> >>>>> incremental state flush issues. I don’t think we need to support
> >>>> Changelog
> >>>>> state backend for now.
> >>>>>
> >>>>> ----------------------------------------
> >>>>>
> >>>>>> 2. max-consecutive-failures exceeded — what exactly happens?
> >>>>>> The current design says “force a global checkpoint.” To clarify the
> >>>>> two-tier behavior:
> >>>>>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures,
> the
> >>>>> next checkpoint is forced to be global.
> >>>>>> ● Tier 2: If that forced global checkpoint also fails (any task
> >>>>> declines), the checkpoint is aborted normally (not a job failure).
> The
> >>>>> counter is then reset since a global checkpoint was attempted, and
> the
> >>>> next
> >>>>> checkpoint cycle can try again.
> >>>>>> This avoids cascading into job failure while ensuring we don’t drift
> >>>>> indefinitely on historical state.
> >>>>>
> >>>>> My assumption was that we would not allow this particular failed
> region
> >>>> to
> >>>>> fail the checkpoint again.
> >>>>> But forcing a global checkpoint works as well.
> >>>>>
> >>>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new
> >>>>> notification type
> >>>>>> This is a very insightful point. Zihao and Gen also raised this in
> >>>>> earlier discussions. The current design doesn’t address state cleanup
> >> for
> >>>>> tasks in failed regions. I agree it’s necessary to introduce a new
> >>>>> notification type. For tasks in failed regions, local state cleanup
> can
> >>>> be
> >>>>> deferred until the next checkpoint trigger.
> >>>>> Ok, this can be some future work.
> >>>>>
> >>>>>> 7. Task that never acknowledges nor declines — per-region timeouts
> >>>>>> This was discussed in the previous thread. Network issues may cause
> a
> >>>>> task to neither ack nor decline in time. In such cases, we treat it
> as
> >> a
> >>>>> checkpoint timeout: the affected tasks’ region is marked as failed,
> and
> >>>> the
> >>>>> process ultimately falls through to the normalRegional Checkpoint
> >>>>> processing logic.
> >>>>> Ok, this can be some future work.
> >>>>>
> >>>>> ----------------------------------------
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
> >>>>>
> >>>>> [2]
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> >>>>>
> >>>>> Regards,
> >>>>> Roman
> >>>>>
> >>>>> Regards,
> >>>>> Roman
> >>>>>
> >>>>>
> >>>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> Thanks everyone for the valuable feedback. I believe all the points
> >>>> raised
> >>>>>> above have been addressed (@Roman @Rui Fan). If there are no further
> >>>>>> concerns  by next Monday (June 22), I'll go ahead and start the
> [VOTE]
> >>>>>> thread for this FLIP.
> >>>>>>
> >>>>>> For reference, the earlier related discussion can be found here:
> >>>>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
> >>>>>>
> >>>>>>
> >>>>>> Please feel free to share any additional feedback before then.
> >>>>>>
> >>>>>> Best Regards,
> >>>>>> Raorao
> >>>>>>
> >>>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
> >>>>>>
> >>>>>> Hi devs,
> >>>>>>
> >>>>>> I would like to start a discussion on FLIP-XXX: Independent
> Checkpoint
> >>>>>> Based On Pipeline Region.
> >>>>>>
> >>>>>> In high-parallelism streaming jobs, a single Task's checkpoint
> failure
> >>>>>> causes the entire global Checkpoint to abort, leading to degraded
> >>>>>> checkpoint success rates and wasted compute resources (especially
> for
> >>>> GPU
> >>>>>> operators).
> >>>>>>
> >>>>>> We propose Regional Checkpoint: when some Regions fail to
> checkpoint,
> >>>> the
> >>>>>> framework combines the historical state of the failed Regions with
> the
> >>>>>> current state of the healthy Regions to produce a logically complete
> >>>>>> Completed Checkpoint — while preserving state consistency. The key
> >>>> changes
> >>>>>> are:
> >>>>>>
> >>>>>> 1. Snapshot Collection — Allow partial region failures; combine last
> >>>>>> successful state of failed Regions with current state of normal
> >> Regions.
> >>>>>>
> >>>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback
> >>>> interface
> >>>>>> for OperatorCoordinators to produce consistent snapshots against the
> >>>> mixed
> >>>>>> view.
> >>>>>>
> >>>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
> >>>>>> premature cleanup of referenced historical checkpoints.
> >>>>>>
> >>>>>> The detailed design is described in the FLIP document:
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
> >>>>>>
> >>>>>> Looking forward to your feedback!
> >>>>>>
> >>>>>> Best regards,
> >>>>>>
> >>>>>> Raorao Xiong
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to