Hi Raorao,

Sounds good, thank you!

Regards,
Roman


On Fri, Jul 3, 2026 at 9:06 AM 熊饶饶 <[email protected]> wrote:

> Hi Roman, Gen,
>
> Thanks both for the discussion.
>
> Roman, I agree with the safety boundary you've defined: each region's
> state must be a deterministic function of consumed data, with no
> cross-subtask agreement or cross-element temporal invariant.
>
> Since Regional Checkpoint is opt-in
> (`execution.checkpointing.region.enabled = false` by default), users who
> know their job has such invariants simply won't enable it. I'll document
> the safety conditions in the FLIP's Limitations section so the expectation
> is clear before anyone turns the flag on. No additional API or framework
> guard needed.
>
> Let me know if this sounds right.
>
> Regards,
> Raorao
>
> > 2026年7月3日 04:57,Roman Khachatryan <[email protected]> 写道:
> >
> > Hi Gen,
> >
> > Thanks for your reply,
> >
> > I don't think the "stalled subtask" equivalence fully covers this case. A
> > stalled-but-alive subtask still acknowledges checkpoints, so it would
> > contribute its current snapshot, not an old one. The composition only
> > arises because the failed region
> > never acked the current checkpoint — which in normal Flink fails the
> whole
> > checkpoint (exactly the case this FLIP is introducing). So the "slow
> > subtask" analogy holds only when a subtask that consumed no new data
> would
> > produce an identical snapshot
> > at the next barrier — i.e. state that's a deterministic function of the
> > consumed data. It doesn't hold for state that mutates in
> > snapshotState()/open(), on timers/wall-clock, or that's meant to be
> > globally agreed — e.g. an epoch or recovery
> > counter kept as union state that all subtasks must agree on. There the
> > composed view can be {100, 101}, which no real execution produces.
> >
> > More generally: a Flink checkpoint today is globally atomic — every
> > subtask's state comes from the same barrier, one logical time. This FLIP
> is
> > the first thing to relax that and mix state from different logical times
> > across subtasks, so an
> > assumption operators can rely on today — "all of my state comes from one
> > consistent snapshot" — no longer holds. That applies to even-split state
> > too, not just union: ordinary rescaling only requires operators to
> tolerate
> > regrouping (which elements
> > land together), never temporal mixing (elements from different times).
> > Self-contained elements like Kafka splits are fine; an operator with any
> > cross-element or cross-subtask temporal invariant is not, and would be
> > broken only by this feature.
> >
> > Since the runtime can't inspect operator semantics, I'd suggest:
> > 1. Make the feature opt-in and document the assumption explicitly — each
> > region's state must be a deterministic function of the data it consumed,
> > with no cross-subtask agreement or cross-element temporal invariant.
> > 2. As an automatic backstop, guard union and broadcast state (contract:
> > every subtask sees the same/complete view) — either disable regional
> > checkpointing when the job uses it, or force a global checkpoint when
> such
> > a region fails. This covers the
> > common case but isn't a soundness proof, which is why (1) matters —
> > even-split isn't automatically safe either.
> >
> > Could we capture this in the Limitations/Assumptions section of the FLIP
> if
> > you agree?
> >
> > Regards,
> > Roman
> >
> >
> > On Thu, Jul 2, 2026 at 9:19 AM Gen Luo <[email protected]> wrote:
> >
> >> Hi Roman and Raorao,
> >>
> >>
> >> Regarding the union operator state, I'd like to share some thoughts.
> >>
> >> In my understanding, union operator state is not a separate state type,
> but
> >> a different recovery semantic of OperatorState. The underlying state is
> the
> >> same; the only difference is that regular operator state redistributes
> >> state partitions, while union operator state restores the full state
> list
> >> to every subtask.
> >>
> >> From this perspective, I think combining newer snapshots from healthy
> >> regions with an older snapshot from a failed region is still valid. The
> >> snapshots of different regions are independent, so the newer snapshots
> are
> >> unaffected by whether the failed region contributes its latest or
> previous
> >> snapshot.
> >>
> >> Another way to see this is to assume the failed subtask simply stopped
> >> making progress after the previous checkpoint instead of failing. The
> >> resulting checkpoint would naturally contain the latest snapshots from
> >> healthy subtasks and the previous snapshot from that subtask, which is
> >> exactly the same composition as the proposed regional checkpoint.
> >>
> >> What do you think?
> >>
> >>
> >> Best,
> >>
> >> Gen
> >>
> >> On Mon, Jun 29, 2026 at 4:45 AM Roman Khachatryan <[email protected]>
> >> wrote:
> >>
> >>> Hi Raorao,
> >>>
> >>> Thanks for the walk-through.
> >>> That partially resolves my concern about re-assigning state: it doesn't
> >>> have to be recomputed on every partial checkpoint — but the restored
> >>> (redistributed) assignment has to be retained until the first
> checkpoint
> >>> completes at the new parallelism.
> >>>
> >>> My other concern still stands: the isolation argument doesn't obviously
> >>> cover union operator state, which every subtask sees in full across all
> >>> regions — so a partial checkpoint stores a temporal mix of elements,
> and
> >>> rescaling reshuffles them across new region boundaries.
> >>>
> >>>> I'll cover all this in the FLIP document.
> >>>
> >>> Yes — I think it's better to capture these edge cases in a Rescaling
> >>> section of the FLIP and discuss in more detail there. Looking forward
> to
> >>> it.
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>>
> >>> On Fri, Jun 26, 2026 at 10:06 AM 熊饶饶 <[email protected]> wrote:
> >>>
> >>>> Hi Roman, thanks for the detailed follow-up.
> >>>> Rescaling and state redistribution
> >>>> StateAssignmentOperation runs only once per restore — it doesn’t run
> >> per
> >>>> referenced checkpoint. The subtask states entering
> >>> StateAssignmentOperation
> >>>> have already been pre-merged during the Regional Checkpoint completion
> >>>> phase. Each OperatorSubtaskState already physically contains the state
> >>>> handles from whichever checkpoint it originated from. The
> >> refCheckpointId
> >>>> is just metadata for the cleaner.
> >>>> Example: P=4 → P=2 → P=1, two rounds of downscaling
> >>>> Initial state (P=4, even-split operator state):
> >>>> ckp100 (T1, GLOBAL): all 4 regions ack
> >>>>  subtask 0: [A0, A1]    region 0
> >>>>  subtask 1: [B0, B1]    region 1
> >>>>  subtask 2: [C0, C1]    region 2
> >>>>  subtask 3: [D0, D1]    region 3
> >>>> First downscale: P=4 → P=2 (recover from ckp100 after failover)
> >>>> Even-split redistribution:
> >>>> concatenate: [A0,A1, B0,B1, C0,C1, D0,D1]  → 8 elements
> >>>> split into 2:
> >>>>  new subtask 0: [A0, A1, B0, B1]
> >>>>  new subtask 1: [C0, C1, D0, D1]
> >>>> Now running at P=2:
> >>>> ckp101 (T2, REGIONAL):
> >>>>  new-subtask 0 ack: [E0, E1, E2, E3]
> >>>>  new-subtask 1 decline, fallback to ckp100 → [C0,C1,D0,D1]  (ref=100)
> >>>>
> >>>> ckp101 metadata (pre-merged):
> >>>>  new-subtask 0: stateHandles=[E0,E1,E2,E3], refCheckpointId=null
> >>>>  new-subtask 1: stateHandles=[C0,C1,D0,D1], refCheckpointId=100
> >>>> Second downscale: P=2 → P=1 (recover from ckp101)
> >>>> Even-split redistribution:
> >>>> concatenate: [E0,E1,E2,E3, C0,C1,D0,D1]  → 8 elements
> >>>> split into 1:
> >>>>  new subtask 0: [E0,E1,E2,E3, C0,C1,D0,D1]
> >>>> At this point, the single subtask’s state contains elements from two
> >> time
> >>>> points — this is safe per the partition isolation argument above. Now
> >> the
> >>>> job continues running at P=1:
> >>>> ckp102 (T3, REGIONAL):
> >>>>  new subtask 0 decline, fallback to ckp101's redistributed state
> >>>>  → ref=101
> >>>>
> >>>> ckp102 metadata (pre-merged):
> >>>>  subtask 0: stateHandles=[E0,E1,E2,E3, C0,C1,D0,D1],
> >> refCheckpointId=101
> >>>>
> >>>> ref chain: ckp102(ref=101) → ckp101(ref=100) → ckp100
> >>>> The ref chain is tracked entirely in the CompletedCheckpoint store
> >>>> metadata. Rescaling is a restore operation — it redistributes state
> >>> handles
> >>>> to TMs but doesn't create new checkpoint metadata. The next Regional
> >>>> Checkpoint (ckp102) references the previous CompletedCheckpoint
> >> (ckp101)
> >>>> from the store, which still has its original ref=100 annotation in the
> >>>> metadata.
> >>>>
> >>>> Union state, same scenario:
> >>>> ckp100 (T1, GLOBAL): P=4, union operator state
> >>>>  subtask 0: [A0, A1]
> >>>>  subtask 1: [B0, B1]
> >>>>  subtask 2: [C0, C1]
> >>>>  subtask 3: [D0, D1]
> >>>>
> >>>> P=4 → P=2 (union):
> >>>>  new subtask 0 gets FULL concatenated: [A0,A1, B0,B1, C0,C1, D0,D1]
> >>>>  new subtask 1 gets FULL concatenated: same
> >>>>
> >>>> ckp101 (T2, REGIONAL):
> >>>>  new-subtask 0 ack: [E0, ..., E7]
> >>>>  new-subtask 1 decline, fallback → [A0,A1,B0,B1,C0,C1,D0,D1]
> >> (ref=100)
> >>>>
> >>>> P=2 → P=1 (union):
> >>>>  new subtask 0 gets: [E0,...,E7, A0,...,D1]  ← complete set, safe
> >>>> Union is even simpler — each subtask always gets the complete state,
> so
> >>>> mixing doesn’t add new complexity.
> >>>> refCheckpointId chain bounded by maxConsecutiveFailures
> >>>>
> >>>> ckp100 (global)
> >>>>  → ckp101 (regional, ref=100)  consecutive=1
> >>>>  → ckp102 (regional, ref=101)  consecutive=2
> >>>>  → ckp103: consecutive >= maxConsecutiveFailures(2)
> >>>>     → FORCED GLOBAL → if success, chain resets
> >>>> The reference chain can only grow as deep as maxConsecutiveFailures.
> >> Once
> >>>> a global checkpoint succeeds, it’s a fresh start. No inflation beyond
> >> the
> >>>> configured limit.
> >>>> Channel state redistribution
> >>>> Each channel state handle carries InputChannelInfo /
> >>>> ResultSubpartitionInfo. TaskStateAssignment maps handles by their
> >> channel
> >>>> metadata — time of origin is irrelevant. Per-subtask channel state and
> >>>> operator state come from the same checkpoint snapshot, ensuring
> >> internal
> >>>> consistency regardless of which checkpoint other subtasks reference.
> >>>> Summary
> >>>> 1.  One StateAssignmentOperation per restore — states are pre-merged
> >>>> 2.  POINTWISE partition isolation makes mixing states across time
> >> points
> >>>> safe for both even-split and union
> >>>> 3.  maxConsecutiveFailures bounds refCheckpointId chain depth
> >>>> 4.  Channel state redistribution is time-agnostic
> >>>>
> >>>> I’ll cover all this in the FLIP document.
> >>>>
> >>>> Thanks again for the detailed replies, looking forward to your
> >> feedback.
> >>>>
> >>>> Regards,
> >>>> Raorao
> >>>>
> >>>>> 2026年6月24日 18:38,Roman Khachatryan <[email protected]> 写道:
> >>>>>
> >>>>> Hi Raorao, thanks for your answers
> >>>>>
> >>>>>>> 2. Rescaling for OperatorSubtaskState
> >>>>>
> >>>>>> `refCheckpointId` is checkpoint-level metadata stored in the
> >> completed
> >>>>> checkpoint —
> >>>>>> it doesn't participate in state redistribution during rescaling. The
> >>>>> `TaskStateAssignment`
> >>>>>> redistributes only the state handles to new subtasks, while the
> >>>>> `CompletedCheckpointStore`
> >>>>>> retains the original metadata (including `refCheckpointId`) for the
> >>>>> cleaner to trace
> >>>>>> reference chains. So rescaling is inherently compatible.
> >>>>>
> >>>>> Do you mean that we'll run StateAssignmentOperation per every
> >>> referenced
> >>>>> checkpoint on every "partial" checkpoint completion?
> >>>>> (it can be quite heavy)
> >>>>>
> >>>>> I don't think that "redistributes only the state handles to new
> >>> subtasks"
> >>>>> part is true - on rescaling, distribution changes for (potentially)
> >>> every
> >>>>> sub-task.
> >>>>>
> >>>>> From the docs [1]:
> >>>>> - Even-split redistribution: Each operator returns a List of state
> >>>>> elements. The whole state is logically a concatenation of all lists.
> >> On
> >>>>> restore/redistribution, the list is evenly divided into as many
> >>> sublists
> >>>> as
> >>>>> there are parallel operators. Each operator gets a sublist, which can
> >>> be
> >>>>> empty, or contain one or more elements.
> >>>>> - Union redistribution: Each operator returns a List of state
> >> elements.
> >>>> The
> >>>>> whole state is logically a concatenation of all lists. On
> >>>>> restore/redistribution, each operator gets the complete list of state
> >>>>> elements. ...
> >>>>> - (we said earlier that keyed stated is not supported)
> >>>>>
> >>>>> So I don't see how any of the above can work because we might mix old
> >>> and
> >>>>> new states in a sub-task.
> >>>>>
> >>>>> For example:
> >>>>> - a job is running with parallelism 2 and has even-split state
> >>>> distribution
> >>>>> - 1st checkpoint is completes by 2/2 sub-tasks
> >>>>> - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the
> >>>>> checkpoint; its state in checkpoint 2 refers to "subtask 2 state in
> >>>>> checkpoint 1"
> >>>>> - job is rescaled to 1
> >>>>> - sub-task 1 now has state from checkpoint 1 AND 2?
> >>>>>
> >>>>> Furthermore, with multiple rounds of downscaling, there can be a
> >> single
> >>>>> sub-task referring to multiple historical checkpoints.
> >>>>>
> >>>>> For Union, it's even more problematic.
> >>>>>
> >>>>> Also, what about channel state (Unaligned checkpoint)
> >> re-distribution?
> >>>>>
> >>>>> Probably it's better to have a FLIP document with the corresponding
> >>>> section
> >>>>> first and then discuss it.
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>
> >>>
> >>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state
> >>>>>
> >>>>> Regards,
> >>>>> Roman
> >>>>>
> >>>>>
> >>>>> On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote:
> >>>>>
> >>>>>> Hi Zakelly,
> >>>>>>
> >>>>>> Thanks for the feedback! I completely agree — forcing region-level
> >>>> merging
> >>>>>> would significantly reduce the effectiveness of file merging, and
> >>> that's
> >>>>>> not the right trade-off.
> >>>>>>
> >>>>>> I think TM-level merging can work correctly with Regional Checkpoint
> >>> if
> >>>> we
> >>>>>> ensure the FileMergingSnapshotManager properly handles the new
> >>>> checkpoint
> >>>>>> notification semantics. Specifically:
> >>>>>>
> >>>>>> - The FileMergingSnapshotManager already uses a notification-based
> >>>>>> lifecycle (checkpoint complete/abort/subsumed) to manage physical
> >> file
> >>>>>> deletion.
> >>>>>> - For Regional Checkpoint, we need to introduce the new notification
> >>>> type
> >>>>>> we discussed earlier (notify partially-completed) so that the
> >> merging
> >>>>>> manager knows: this checkpoint is complete but some segments belong
> >> to
> >>>>>> failed regions — keep the physical files that contain those segments
> >>>> alive
> >>>>>> until the referencing checkpoints are subsumed.
> >>>>>> - This aligns with the existing design and requires only minor
> >>>> adjustments
> >>>>>> to the merging manager's notification handling.
> >>>>>>
> >>>>>> The previous suggestion of region-level merging was overly
> >>> conservative
> >>>> —
> >>>>>> glad you pointed out the better approach.
> >>>>>>
> >>>>>> Also good to know about the plan to cover channel state in FLIP-306.
> >>>> That
> >>>>>> will simplify the compatibility matrix significantly.
> >>>>>>
> >>>>>> Best Regards,
> >>>>>> Raorao
> >>>>>>
> >>>>>>> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道:
> >>>>>>>
> >>>>>>> Hi Raorao,
> >>>>>>>
> >>>>>>> Good to see this proposal and +1 for the direction. Sorry for
> >> joining
> >>>> the
> >>>>>>> discussion late. I have seen many constructive suggestions that
> >>> largely
> >>>>>>> align with my thoughts, but I still have one concern:
> >>>>>>>
> >>>>>>> I'm one of the authors of FLIP-306 and I'm not in favor of
> >>> region-level
> >>>>>>> merging. IIUC, region-level merge files severely limit the
> >>>> effectiveness
> >>>>>> of
> >>>>>>> merging, as merging cannot happen between subtasks. I think it
> >> should
> >>>>>> still
> >>>>>>> be possible to perform TM-level merge. The only thing we should do
> >> is
> >>>> to
> >>>>>>> keep previous checkpoint files alive when a region checkpoint
> >> occurs.
> >>>>>> This
> >>>>>>> does not conflict with the current design. It is only necessary to
> >>>> ensure
> >>>>>>> that the new behavior of checkpoint notifications is compatible
> >> with
> >>>>>>> FLIP-306, or some minor adjustment needed towards the merging
> >>> manager.
> >>>>>>>
> >>>>>>> And BTW to @Rui, we still need more work to let FLIP-306 cover
> >>> channel
> >>>>>>> state and deprecate FLINK-26803, and I will look into this soon.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Zakelly
> >>>>>>>
> >>>>>>> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote:
> >>>>>>>
> >>>>>>>> Hi Roman, thanks for the follow-up.
> >>>>>>>>
> >>>>>>>> 1. FLIP page
> >>>>>>>>
> >>>>>>>> Thanks for your reminder, I'll create the proper FLIP page before
> >>>>>> starting
> >>>>>>>> [VOTE] thread.
> >>>>>>>>
> >>>>>>>> 2. Rescaling for OperatorSubtaskState
> >>>>>>>>
> >>>>>>>> `refCheckpointId` is checkpoint-level metadata stored in the
> >>> completed
> >>>>>>>> checkpoint — it doesn't participate in state redistribution during
> >>>>>>>> rescaling. The `TaskStateAssignment` redistributes only the state
> >>>>>> handles
> >>>>>>>> to new subtasks, while the `CompletedCheckpointStore` retains the
> >>>>>> original
> >>>>>>>> metadata (including `refCheckpointId`) for the cleaner to trace
> >>>>>> reference
> >>>>>>>> chains. So rescaling is inherently compatible.
> >>>>>>>>
> >>>>>>>> 3. Finished operators
> >>>>>>>>
> >>>>>>>> You're right, my previous answer missed the real issue. For
> >> bounded
> >>>>>> source
> >>>>>>>> jobs (FLIP-147), if the final checkpoint is Regional and a failed
> >>>>>> Region's
> >>>>>>>> subtask misses `notifyCheckpointComplete`, their side effects
> >> (e.g.,
> >>>>>> Kafka
> >>>>>>>> transactions) are never committed — that's data loss.
> >>>>>>>>
> >>>>>>>> Solution: force a global checkpoint when the job is about to
> >>>> terminate.
> >>>>>>>> When all sources are exhausted, the JM will mark the next
> >> checkpoint
> >>>> as
> >>>>>>>> mandatory global, requiring all regions to ack. If it fails, the
> >> job
> >>>>>>>> retries rather than terminating with a partial snapshot.
> >>>>>>>>
> >>>>>>>> 4. Limitations section
> >>>>>>>>
> >>>>>>>> I agree with you, and I'll add a section covering all limitations:
> >>>>>>>>
> >>>>>>>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime
> >>>>>>>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
> >>>>>>>> NO_CLAIM restore mode -> Warn (require global checkpoint before
> >>>> snapshot
> >>>>>>>> deletion)
> >>>>>>>> Changelog state backend ->  Reject job submission
> >>>>>>>> Finished operators (FLIP-147) -> Force global checkpoint when
> >>> sources
> >>>>>> are
> >>>>>>>> exhausted
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks again for the review, looking forward to your feedback.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Raorao
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
> >>>>>>>>>
> >>>>>>>>> Hi, thanks for your replies and sorry for the delay.
> >>>>>>>>>
> >>>>>>>>> Most of my questions were answered, but I still have some
> >> concerns.
> >>>>>>>>>
> >>>>>>>>>> If there are no further concerns by next Monday (June 22), I'll
> >> go
> >>>>>> ahead
> >>>>>>>>> and start the [VOTE] thread for this FLIP.
> >>>>>>>>>
> >>>>>>>>> Isn't the actual FLIP still missing? I only saw Google Document.
> >> Do
> >>>> you
> >>>>>>>>> mind creating a page according to [1]?
> >>>>>>>>>
> >>>>>>>>> ----------------------------------------
> >>>>>>>>>
> >>>>>>>>>> 3. Checkpoint metadata layout
> >>>>>>>>>> Regional Checkpoint recombines state from different checkpoint
> >>> IDs.
> >>>> To
> >>>>>>>>> track this, we add a refCheckpointId field to
> >> OperatorSubtaskState
> >>> in
> >>>>>> the
> >>>>>>>>> metadata, indicating which historical checkpoint a subtask’s
> >> state
> >>>>>>>>> references.
> >>>>>>>>>
> >>>>>>>>> Could you explain how do we find the right OperatorSubtaskState -
> >>>>>>>>> especially in case of rescaling?
> >>>>>>>>> Does the proposal support rescaling?
> >>>>>>>>>
> >>>>>>>>>> 9. Finished operators
> >>>>>>>>>> The concern is: a finished operator’s final commit notification
> >>> gets
> >>>>>>>>> skipped by Regional Checkpoint, and if this checkpoint is the
> >> last
> >>>> one,
> >>>>>>>> the
> >>>>>>>>> operator never receives it — could this cause data loss?
> >>>>>>>>>> In practice, the impact is limited:
> >>>>>>>>>> ● Failed Region tasks are already gone: By the time the Regional
> >>>>>>>>> Checkpoint completes, tasks in the failed Region have already
> >> been
> >>>>>>>>> restarted (decline) or cancelled (timeout). There is no task left
> >>> to
> >>>>>>>>> receive the notification anyway.
> >>>>>>>>> Checkpoint failure doesn't necessarily cause a restart
> >> (especially
> >>> if
> >>>>>>>> this
> >>>>>>>>> is limited to one region). The tasks should still be up and
> >>> running.
> >>>>>>>>>
> >>>>>>>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After
> >>>>>> reaching
> >>>>>>>>> the limit, the next checkpoint is forced to be global, ensuring
> >> all
> >>>>>> tasks
> >>>>>>>>> eventually receive notifyCheckpointComplete. We can’t skip the
> >> same
> >>>>>>>> Region
> >>>>>>>>> forever.
> >>>>>>>>> maxConsecutiveFailures might not be reached for the final
> >>> checkpoint.
> >>>>>>>>>
> >>>>>>>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the
> >> user
> >>>>>> stops
> >>>>>>>>> the job gracefully, it triggers a full global snapshot, not a
> >>>> Regional
> >>>>>>>>> Checkpoint. So the final checkpoint is always complete.
> >>>>>>>>> stop-with-savepoint should be fine, yes.
> >>>>>>>>>
> >>>>>>>>> To clarify, my concern is about jobs with bounded sources. In
> >> such
> >>>>>> cases,
> >>>>>>>>> some subtasks might finish processing but still participate in
> >>>>>>>> checkpoints.
> >>>>>>>>> After a successful checkpoint, they are guaranteed to get
> >>> checkpoint
> >>>>>>>>> completion notification - so that they can make side effects
> >>> visible
> >>>> in
> >>>>>>>>> external systems (commit Kafka transactions).
> >>>>>>>>> See FLIP-147 [2]
> >>>>>>>>>
> >>>>>>>>> However, with the current proposal, the job might complete with
> >>> some
> >>>>>>>>> subtasks/regions failing the final checkpoint unless I'm missing
> >>>>>>>> something.
> >>>>>>>>> This is essentially data loss.
> >>>>>>>>> To prevent this, the final checkpoint must always be acked by all
> >>>>>>>>> subtasks/regions.
> >>>>>>>>>
> >>>>>>>>> ----------------------------------------
> >>>>>>>>>
> >>>>>>>>> There are quite some limitations in this proposal.
> >>>>>>>>> Could you add a section describing how each of them is handled?
> >>>>>>>>> 1. Reject job submission
> >>>>>>>>> 2. Force all-region checkpoint
> >>>>>>>>> 3. Warn in documentation
> >>>>>>>>>
> >>>>>>>>>> 1. Region independence — BLOCKING/HYBRID edges
> >>>>>>>>>> You’re right. Our current scope is limited to embarrassingly
> >>>> parallel
> >>>>>>>>> regions. In typical ETL scenarios, each parallelism maps to an
> >>>>>>>> independent
> >>>>>>>>> Region with no edges connecting them.
> >>>>>>>>>
> >>>>>>>>>> 5. SharedStateRegistry — how are old states kept alive?
> >>>>>>>>>> Good question. In the current design, since we only target
> >>>>>>>> embarrassingly
> >>>>>>>>> parallel regions, there is typically no keyed state and no
> >>>> incremental
> >>>>>>>>> state. As a result, the SharedStateRegistry is generally empty
> >>>> (setting
> >>>>>>>>> aside File Merging and Changelog State for now, discussed on 8.),
> >>> so
> >>>>>>>>> keep-alive of files under the shared directory is not a concern.
> >>>>>>>>>
> >>>>>>>>>> 8. FLINK-26803 and FLIP-306 compatibility
> >>>>>>>>>> This is a very important point. Both features essentially merge
> >>>> small
> >>>>>>>>> files at the job level. As Rui Fan pointed out, if the merging
> >>>>>>>> granularity
> >>>>>>>>> is reduced to the Region level, compatibility with Regional
> >>>> Checkpoint
> >>>>>>>>> should be achievable in theory. I think this can be deferred to
> >>>> future
> >>>>>>>> work
> >>>>>>>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit
> >>> and
> >>>>>>>> enable
> >>>>>>>>> support.
> >>>>>>>>>
> >>>>>>>>>> 10. NO_CLAIM mode warning
> >>>>>>>>>> You’re absolutely right — this is an important reminder. After
> >>>>>> restoring
> >>>>>>>>> from a Regional Checkpoint, only a successful global checkpoint
> >>>>>>>> guarantees
> >>>>>>>>> independence from the old state. We’ll add a clear user warning
> >> in
> >>>> the
> >>>>>>>>> documentation.
> >>>>>>>>>
> >>>>>>>>>> 11. Changelog state backend — not supported
> >>>>>>>>>> As mentioned earlier, our primary target is embarrassingly
> >>> parallel
> >>>>>>>>> regions, which typically have no keyed state and therefore no
> >> slow
> >>>>>>>>> incremental state flush issues. I don’t think we need to support
> >>>>>>>> Changelog
> >>>>>>>>> state backend for now.
> >>>>>>>>>
> >>>>>>>>> ----------------------------------------
> >>>>>>>>>
> >>>>>>>>>> 2. max-consecutive-failures exceeded — what exactly happens?
> >>>>>>>>>> The current design says “force a global checkpoint.” To clarify
> >>> the
> >>>>>>>>> two-tier behavior:
> >>>>>>>>>> ● Tier 1: When consecutiveRegionalCount >=
> >> maxConsecutiveFailures,
> >>>> the
> >>>>>>>>> next checkpoint is forced to be global.
> >>>>>>>>>> ● Tier 2: If that forced global checkpoint also fails (any task
> >>>>>>>>> declines), the checkpoint is aborted normally (not a job
> >> failure).
> >>>> The
> >>>>>>>>> counter is then reset since a global checkpoint was attempted,
> >> and
> >>>> the
> >>>>>>>> next
> >>>>>>>>> checkpoint cycle can try again.
> >>>>>>>>>> This avoids cascading into job failure while ensuring we don’t
> >>> drift
> >>>>>>>>> indefinitely on historical state.
> >>>>>>>>>
> >>>>>>>>> My assumption was that we would not allow this particular failed
> >>>> region
> >>>>>>>> to
> >>>>>>>>> fail the checkpoint again.
> >>>>>>>>> But forcing a global checkpoint works as well.
> >>>>>>>>>
> >>>>>>>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new
> >>>>>>>>> notification type
> >>>>>>>>>> This is a very insightful point. Zihao and Gen also raised this
> >> in
> >>>>>>>>> earlier discussions. The current design doesn’t address state
> >>> cleanup
> >>>>>> for
> >>>>>>>>> tasks in failed regions. I agree it’s necessary to introduce a
> >> new
> >>>>>>>>> notification type. For tasks in failed regions, local state
> >> cleanup
> >>>> can
> >>>>>>>> be
> >>>>>>>>> deferred until the next checkpoint trigger.
> >>>>>>>>> Ok, this can be some future work.
> >>>>>>>>>
> >>>>>>>>>> 7. Task that never acknowledges nor declines — per-region
> >> timeouts
> >>>>>>>>>> This was discussed in the previous thread. Network issues may
> >>> cause
> >>>> a
> >>>>>>>>> task to neither ack nor decline in time. In such cases, we treat
> >> it
> >>>> as
> >>>>>> a
> >>>>>>>>> checkpoint timeout: the affected tasks’ region is marked as
> >> failed,
> >>>> and
> >>>>>>>> the
> >>>>>>>>> process ultimately falls through to the normalRegional Checkpoint
> >>>>>>>>> processing logic.
> >>>>>>>>> Ok, this can be some future work.
> >>>>>>>>>
> >>>>>>>>> ----------------------------------------
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
> >>>>>>>>>
> >>>>>>>>> [2]
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Roman
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Roman
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]>
> >> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> Thanks everyone for the valuable feedback. I believe all the
> >>> points
> >>>>>>>> raised
> >>>>>>>>>> above have been addressed (@Roman @Rui Fan). If there are no
> >>> further
> >>>>>>>>>> concerns  by next Monday (June 22), I'll go ahead and start the
> >>>> [VOTE]
> >>>>>>>>>> thread for this FLIP.
> >>>>>>>>>>
> >>>>>>>>>> For reference, the earlier related discussion can be found here:
> >>>>>>>>>>
> >> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Please feel free to share any additional feedback before then.
> >>>>>>>>>>
> >>>>>>>>>> Best Regards,
> >>>>>>>>>> Raorao
> >>>>>>>>>>
> >>>>>>>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
> >>>>>>>>>>
> >>>>>>>>>> Hi devs,
> >>>>>>>>>>
> >>>>>>>>>> I would like to start a discussion on FLIP-XXX: Independent
> >>>> Checkpoint
> >>>>>>>>>> Based On Pipeline Region.
> >>>>>>>>>>
> >>>>>>>>>> In high-parallelism streaming jobs, a single Task's checkpoint
> >>>> failure
> >>>>>>>>>> causes the entire global Checkpoint to abort, leading to
> >> degraded
> >>>>>>>>>> checkpoint success rates and wasted compute resources
> >> (especially
> >>>> for
> >>>>>>>> GPU
> >>>>>>>>>> operators).
> >>>>>>>>>>
> >>>>>>>>>> We propose Regional Checkpoint: when some Regions fail to
> >>>> checkpoint,
> >>>>>>>> the
> >>>>>>>>>> framework combines the historical state of the failed Regions
> >> with
> >>>> the
> >>>>>>>>>> current state of the healthy Regions to produce a logically
> >>> complete
> >>>>>>>>>> Completed Checkpoint — while preserving state consistency. The
> >> key
> >>>>>>>> changes
> >>>>>>>>>> are:
> >>>>>>>>>>
> >>>>>>>>>> 1. Snapshot Collection — Allow partial region failures; combine
> >>> last
> >>>>>>>>>> successful state of failed Regions with current state of normal
> >>>>>> Regions.
> >>>>>>>>>>
> >>>>>>>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback
> >>>>>>>> interface
> >>>>>>>>>> for OperatorCoordinators to produce consistent snapshots against
> >>> the
> >>>>>>>> mixed
> >>>>>>>>>> view.
> >>>>>>>>>>
> >>>>>>>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to
> >>> prevent
> >>>>>>>>>> premature cleanup of referenced historical checkpoints.
> >>>>>>>>>>
> >>>>>>>>>> The detailed design is described in the FLIP document:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
> >>>>>>>>>>
> >>>>>>>>>> Looking forward to your feedback!
> >>>>>>>>>>
> >>>>>>>>>> Best regards,
> >>>>>>>>>>
> >>>>>>>>>> Raorao Xiong
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
>
>

Reply via email to