Hi everyone,

Thank you all for the thorough review and the great discussion on this FLIP — 
especially Roman, Gen, Rui Fan, Zakelly and Zihao for the detailed feedback and 
suggestions.

I've updated the FLIP document to incorporate the points raised during the 
discussion. I believe we've reached consensus on the key issues:

- Scope limited to embarrassingly-parallel regions, opt-in behind the 
`execution.checkpointing.region.enabled` flag (default false).
- Operator state safety assumptions documented in the Limitations section.
- OperatorCoordinator and CheckpointListener interface extensions.
- Finished Operators handled via forced global checkpoint on job termination.
- FLINK-26803 documented as incompatible; FLIP-306 integration deferred to 
future work.

The updated FLIP is at:
https://cwiki.apache.org/confluence/spaces/FLINK/pages/438010016/FLIP-600+Independent+Checkpoint+Based+On+Pipeline+Region

If there are no further concerns, I'll start the [VOTE] thread next Tuesday 
(July 7).

Regards,
Raorao

> 2026年7月3日 18:35,Roman Khachatryan <[email protected]> 写道:
> 
> Hi Raorao,
> 
> Sounds good, thank you!
> 
> Regards,
> Roman
> 
> 
> On Fri, Jul 3, 2026 at 9:06 AM 熊饶饶 <[email protected]> wrote:
> 
>> Hi Roman, Gen,
>> 
>> Thanks both for the discussion.
>> 
>> Roman, I agree with the safety boundary you've defined: each region's
>> state must be a deterministic function of consumed data, with no
>> cross-subtask agreement or cross-element temporal invariant.
>> 
>> Since Regional Checkpoint is opt-in
>> (`execution.checkpointing.region.enabled = false` by default), users who
>> know their job has such invariants simply won't enable it. I'll document
>> the safety conditions in the FLIP's Limitations section so the expectation
>> is clear before anyone turns the flag on. No additional API or framework
>> guard needed.
>> 
>> Let me know if this sounds right.
>> 
>> Regards,
>> Raorao
>> 
>>> 2026年7月3日 04:57,Roman Khachatryan <[email protected]> 写道:
>>> 
>>> Hi Gen,
>>> 
>>> Thanks for your reply,
>>> 
>>> I don't think the "stalled subtask" equivalence fully covers this case. A
>>> stalled-but-alive subtask still acknowledges checkpoints, so it would
>>> contribute its current snapshot, not an old one. The composition only
>>> arises because the failed region
>>> never acked the current checkpoint — which in normal Flink fails the
>> whole
>>> checkpoint (exactly the case this FLIP is introducing). So the "slow
>>> subtask" analogy holds only when a subtask that consumed no new data
>> would
>>> produce an identical snapshot
>>> at the next barrier — i.e. state that's a deterministic function of the
>>> consumed data. It doesn't hold for state that mutates in
>>> snapshotState()/open(), on timers/wall-clock, or that's meant to be
>>> globally agreed — e.g. an epoch or recovery
>>> counter kept as union state that all subtasks must agree on. There the
>>> composed view can be {100, 101}, which no real execution produces.
>>> 
>>> More generally: a Flink checkpoint today is globally atomic — every
>>> subtask's state comes from the same barrier, one logical time. This FLIP
>> is
>>> the first thing to relax that and mix state from different logical times
>>> across subtasks, so an
>>> assumption operators can rely on today — "all of my state comes from one
>>> consistent snapshot" — no longer holds. That applies to even-split state
>>> too, not just union: ordinary rescaling only requires operators to
>> tolerate
>>> regrouping (which elements
>>> land together), never temporal mixing (elements from different times).
>>> Self-contained elements like Kafka splits are fine; an operator with any
>>> cross-element or cross-subtask temporal invariant is not, and would be
>>> broken only by this feature.
>>> 
>>> Since the runtime can't inspect operator semantics, I'd suggest:
>>> 1. Make the feature opt-in and document the assumption explicitly — each
>>> region's state must be a deterministic function of the data it consumed,
>>> with no cross-subtask agreement or cross-element temporal invariant.
>>> 2. As an automatic backstop, guard union and broadcast state (contract:
>>> every subtask sees the same/complete view) — either disable regional
>>> checkpointing when the job uses it, or force a global checkpoint when
>> such
>>> a region fails. This covers the
>>> common case but isn't a soundness proof, which is why (1) matters —
>>> even-split isn't automatically safe either.
>>> 
>>> Could we capture this in the Limitations/Assumptions section of the FLIP
>> if
>>> you agree?
>>> 
>>> Regards,
>>> Roman
>>> 
>>> 
>>> On Thu, Jul 2, 2026 at 9:19 AM Gen Luo <[email protected]> wrote:
>>> 
>>>> Hi Roman and Raorao,
>>>> 
>>>> 
>>>> Regarding the union operator state, I'd like to share some thoughts.
>>>> 
>>>> In my understanding, union operator state is not a separate state type,
>> but
>>>> a different recovery semantic of OperatorState. The underlying state is
>> the
>>>> same; the only difference is that regular operator state redistributes
>>>> state partitions, while union operator state restores the full state
>> list
>>>> to every subtask.
>>>> 
>>>> From this perspective, I think combining newer snapshots from healthy
>>>> regions with an older snapshot from a failed region is still valid. The
>>>> snapshots of different regions are independent, so the newer snapshots
>> are
>>>> unaffected by whether the failed region contributes its latest or
>> previous
>>>> snapshot.
>>>> 
>>>> Another way to see this is to assume the failed subtask simply stopped
>>>> making progress after the previous checkpoint instead of failing. The
>>>> resulting checkpoint would naturally contain the latest snapshots from
>>>> healthy subtasks and the previous snapshot from that subtask, which is
>>>> exactly the same composition as the proposed regional checkpoint.
>>>> 
>>>> What do you think?
>>>> 
>>>> 
>>>> Best,
>>>> 
>>>> Gen
>>>> 
>>>> On Mon, Jun 29, 2026 at 4:45 AM Roman Khachatryan <[email protected]>
>>>> wrote:
>>>> 
>>>>> Hi Raorao,
>>>>> 
>>>>> Thanks for the walk-through.
>>>>> That partially resolves my concern about re-assigning state: it doesn't
>>>>> have to be recomputed on every partial checkpoint — but the restored
>>>>> (redistributed) assignment has to be retained until the first
>> checkpoint
>>>>> completes at the new parallelism.
>>>>> 
>>>>> My other concern still stands: the isolation argument doesn't obviously
>>>>> cover union operator state, which every subtask sees in full across all
>>>>> regions — so a partial checkpoint stores a temporal mix of elements,
>> and
>>>>> rescaling reshuffles them across new region boundaries.
>>>>> 
>>>>>> I'll cover all this in the FLIP document.
>>>>> 
>>>>> Yes — I think it's better to capture these edge cases in a Rescaling
>>>>> section of the FLIP and discuss in more detail there. Looking forward
>> to
>>>>> it.
>>>>> 
>>>>> Regards,
>>>>> Roman
>>>>> 
>>>>> 
>>>>> On Fri, Jun 26, 2026 at 10:06 AM 熊饶饶 <[email protected]> wrote:
>>>>> 
>>>>>> Hi Roman, thanks for the detailed follow-up.
>>>>>> Rescaling and state redistribution
>>>>>> StateAssignmentOperation runs only once per restore — it doesn’t run
>>>> per
>>>>>> referenced checkpoint. The subtask states entering
>>>>> StateAssignmentOperation
>>>>>> have already been pre-merged during the Regional Checkpoint completion
>>>>>> phase. Each OperatorSubtaskState already physically contains the state
>>>>>> handles from whichever checkpoint it originated from. The
>>>> refCheckpointId
>>>>>> is just metadata for the cleaner.
>>>>>> Example: P=4 → P=2 → P=1, two rounds of downscaling
>>>>>> Initial state (P=4, even-split operator state):
>>>>>> ckp100 (T1, GLOBAL): all 4 regions ack
>>>>>> subtask 0: [A0, A1]    region 0
>>>>>> subtask 1: [B0, B1]    region 1
>>>>>> subtask 2: [C0, C1]    region 2
>>>>>> subtask 3: [D0, D1]    region 3
>>>>>> First downscale: P=4 → P=2 (recover from ckp100 after failover)
>>>>>> Even-split redistribution:
>>>>>> concatenate: [A0,A1, B0,B1, C0,C1, D0,D1]  → 8 elements
>>>>>> split into 2:
>>>>>> new subtask 0: [A0, A1, B0, B1]
>>>>>> new subtask 1: [C0, C1, D0, D1]
>>>>>> Now running at P=2:
>>>>>> ckp101 (T2, REGIONAL):
>>>>>> new-subtask 0 ack: [E0, E1, E2, E3]
>>>>>> new-subtask 1 decline, fallback to ckp100 → [C0,C1,D0,D1]  (ref=100)
>>>>>> 
>>>>>> ckp101 metadata (pre-merged):
>>>>>> new-subtask 0: stateHandles=[E0,E1,E2,E3], refCheckpointId=null
>>>>>> new-subtask 1: stateHandles=[C0,C1,D0,D1], refCheckpointId=100
>>>>>> Second downscale: P=2 → P=1 (recover from ckp101)
>>>>>> Even-split redistribution:
>>>>>> concatenate: [E0,E1,E2,E3, C0,C1,D0,D1]  → 8 elements
>>>>>> split into 1:
>>>>>> new subtask 0: [E0,E1,E2,E3, C0,C1,D0,D1]
>>>>>> At this point, the single subtask’s state contains elements from two
>>>> time
>>>>>> points — this is safe per the partition isolation argument above. Now
>>>> the
>>>>>> job continues running at P=1:
>>>>>> ckp102 (T3, REGIONAL):
>>>>>> new subtask 0 decline, fallback to ckp101's redistributed state
>>>>>> → ref=101
>>>>>> 
>>>>>> ckp102 metadata (pre-merged):
>>>>>> subtask 0: stateHandles=[E0,E1,E2,E3, C0,C1,D0,D1],
>>>> refCheckpointId=101
>>>>>> 
>>>>>> ref chain: ckp102(ref=101) → ckp101(ref=100) → ckp100
>>>>>> The ref chain is tracked entirely in the CompletedCheckpoint store
>>>>>> metadata. Rescaling is a restore operation — it redistributes state
>>>>> handles
>>>>>> to TMs but doesn't create new checkpoint metadata. The next Regional
>>>>>> Checkpoint (ckp102) references the previous CompletedCheckpoint
>>>> (ckp101)
>>>>>> from the store, which still has its original ref=100 annotation in the
>>>>>> metadata.
>>>>>> 
>>>>>> Union state, same scenario:
>>>>>> ckp100 (T1, GLOBAL): P=4, union operator state
>>>>>> subtask 0: [A0, A1]
>>>>>> subtask 1: [B0, B1]
>>>>>> subtask 2: [C0, C1]
>>>>>> subtask 3: [D0, D1]
>>>>>> 
>>>>>> P=4 → P=2 (union):
>>>>>> new subtask 0 gets FULL concatenated: [A0,A1, B0,B1, C0,C1, D0,D1]
>>>>>> new subtask 1 gets FULL concatenated: same
>>>>>> 
>>>>>> ckp101 (T2, REGIONAL):
>>>>>> new-subtask 0 ack: [E0, ..., E7]
>>>>>> new-subtask 1 decline, fallback → [A0,A1,B0,B1,C0,C1,D0,D1]
>>>> (ref=100)
>>>>>> 
>>>>>> P=2 → P=1 (union):
>>>>>> new subtask 0 gets: [E0,...,E7, A0,...,D1]  ← complete set, safe
>>>>>> Union is even simpler — each subtask always gets the complete state,
>> so
>>>>>> mixing doesn’t add new complexity.
>>>>>> refCheckpointId chain bounded by maxConsecutiveFailures
>>>>>> 
>>>>>> ckp100 (global)
>>>>>> → ckp101 (regional, ref=100)  consecutive=1
>>>>>> → ckp102 (regional, ref=101)  consecutive=2
>>>>>> → ckp103: consecutive >= maxConsecutiveFailures(2)
>>>>>>    → FORCED GLOBAL → if success, chain resets
>>>>>> The reference chain can only grow as deep as maxConsecutiveFailures.
>>>> Once
>>>>>> a global checkpoint succeeds, it’s a fresh start. No inflation beyond
>>>> the
>>>>>> configured limit.
>>>>>> Channel state redistribution
>>>>>> Each channel state handle carries InputChannelInfo /
>>>>>> ResultSubpartitionInfo. TaskStateAssignment maps handles by their
>>>> channel
>>>>>> metadata — time of origin is irrelevant. Per-subtask channel state and
>>>>>> operator state come from the same checkpoint snapshot, ensuring
>>>> internal
>>>>>> consistency regardless of which checkpoint other subtasks reference.
>>>>>> Summary
>>>>>> 1.  One StateAssignmentOperation per restore — states are pre-merged
>>>>>> 2.  POINTWISE partition isolation makes mixing states across time
>>>> points
>>>>>> safe for both even-split and union
>>>>>> 3.  maxConsecutiveFailures bounds refCheckpointId chain depth
>>>>>> 4.  Channel state redistribution is time-agnostic
>>>>>> 
>>>>>> I’ll cover all this in the FLIP document.
>>>>>> 
>>>>>> Thanks again for the detailed replies, looking forward to your
>>>> feedback.
>>>>>> 
>>>>>> Regards,
>>>>>> Raorao
>>>>>> 
>>>>>>> 2026年6月24日 18:38,Roman Khachatryan <[email protected]> 写道:
>>>>>>> 
>>>>>>> Hi Raorao, thanks for your answers
>>>>>>> 
>>>>>>>>> 2. Rescaling for OperatorSubtaskState
>>>>>>> 
>>>>>>>> `refCheckpointId` is checkpoint-level metadata stored in the
>>>> completed
>>>>>>> checkpoint —
>>>>>>>> it doesn't participate in state redistribution during rescaling. The
>>>>>>> `TaskStateAssignment`
>>>>>>>> redistributes only the state handles to new subtasks, while the
>>>>>>> `CompletedCheckpointStore`
>>>>>>>> retains the original metadata (including `refCheckpointId`) for the
>>>>>>> cleaner to trace
>>>>>>>> reference chains. So rescaling is inherently compatible.
>>>>>>> 
>>>>>>> Do you mean that we'll run StateAssignmentOperation per every
>>>>> referenced
>>>>>>> checkpoint on every "partial" checkpoint completion?
>>>>>>> (it can be quite heavy)
>>>>>>> 
>>>>>>> I don't think that "redistributes only the state handles to new
>>>>> subtasks"
>>>>>>> part is true - on rescaling, distribution changes for (potentially)
>>>>> every
>>>>>>> sub-task.
>>>>>>> 
>>>>>>> From the docs [1]:
>>>>>>> - Even-split redistribution: Each operator returns a List of state
>>>>>>> elements. The whole state is logically a concatenation of all lists.
>>>> On
>>>>>>> restore/redistribution, the list is evenly divided into as many
>>>>> sublists
>>>>>> as
>>>>>>> there are parallel operators. Each operator gets a sublist, which can
>>>>> be
>>>>>>> empty, or contain one or more elements.
>>>>>>> - Union redistribution: Each operator returns a List of state
>>>> elements.
>>>>>> The
>>>>>>> whole state is logically a concatenation of all lists. On
>>>>>>> restore/redistribution, each operator gets the complete list of state
>>>>>>> elements. ...
>>>>>>> - (we said earlier that keyed stated is not supported)
>>>>>>> 
>>>>>>> So I don't see how any of the above can work because we might mix old
>>>>> and
>>>>>>> new states in a sub-task.
>>>>>>> 
>>>>>>> For example:
>>>>>>> - a job is running with parallelism 2 and has even-split state
>>>>>> distribution
>>>>>>> - 1st checkpoint is completes by 2/2 sub-tasks
>>>>>>> - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the
>>>>>>> checkpoint; its state in checkpoint 2 refers to "subtask 2 state in
>>>>>>> checkpoint 1"
>>>>>>> - job is rescaled to 1
>>>>>>> - sub-task 1 now has state from checkpoint 1 AND 2?
>>>>>>> 
>>>>>>> Furthermore, with multiple rounds of downscaling, there can be a
>>>> single
>>>>>>> sub-task referring to multiple historical checkpoints.
>>>>>>> 
>>>>>>> For Union, it's even more problematic.
>>>>>>> 
>>>>>>> Also, what about channel state (Unaligned checkpoint)
>>>> re-distribution?
>>>>>>> 
>>>>>>> Probably it's better to have a FLIP document with the corresponding
>>>>>> section
>>>>>>> first and then discuss it.
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote:
>>>>>>> 
>>>>>>>> Hi Zakelly,
>>>>>>>> 
>>>>>>>> Thanks for the feedback! I completely agree — forcing region-level
>>>>>> merging
>>>>>>>> would significantly reduce the effectiveness of file merging, and
>>>>> that's
>>>>>>>> not the right trade-off.
>>>>>>>> 
>>>>>>>> I think TM-level merging can work correctly with Regional Checkpoint
>>>>> if
>>>>>> we
>>>>>>>> ensure the FileMergingSnapshotManager properly handles the new
>>>>>> checkpoint
>>>>>>>> notification semantics. Specifically:
>>>>>>>> 
>>>>>>>> - The FileMergingSnapshotManager already uses a notification-based
>>>>>>>> lifecycle (checkpoint complete/abort/subsumed) to manage physical
>>>> file
>>>>>>>> deletion.
>>>>>>>> - For Regional Checkpoint, we need to introduce the new notification
>>>>>> type
>>>>>>>> we discussed earlier (notify partially-completed) so that the
>>>> merging
>>>>>>>> manager knows: this checkpoint is complete but some segments belong
>>>> to
>>>>>>>> failed regions — keep the physical files that contain those segments
>>>>>> alive
>>>>>>>> until the referencing checkpoints are subsumed.
>>>>>>>> - This aligns with the existing design and requires only minor
>>>>>> adjustments
>>>>>>>> to the merging manager's notification handling.
>>>>>>>> 
>>>>>>>> The previous suggestion of region-level merging was overly
>>>>> conservative
>>>>>> —
>>>>>>>> glad you pointed out the better approach.
>>>>>>>> 
>>>>>>>> Also good to know about the plan to cover channel state in FLIP-306.
>>>>>> That
>>>>>>>> will simplify the compatibility matrix significantly.
>>>>>>>> 
>>>>>>>> Best Regards,
>>>>>>>> Raorao
>>>>>>>> 
>>>>>>>>> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道:
>>>>>>>>> 
>>>>>>>>> Hi Raorao,
>>>>>>>>> 
>>>>>>>>> Good to see this proposal and +1 for the direction. Sorry for
>>>> joining
>>>>>> the
>>>>>>>>> discussion late. I have seen many constructive suggestions that
>>>>> largely
>>>>>>>>> align with my thoughts, but I still have one concern:
>>>>>>>>> 
>>>>>>>>> I'm one of the authors of FLIP-306 and I'm not in favor of
>>>>> region-level
>>>>>>>>> merging. IIUC, region-level merge files severely limit the
>>>>>> effectiveness
>>>>>>>> of
>>>>>>>>> merging, as merging cannot happen between subtasks. I think it
>>>> should
>>>>>>>> still
>>>>>>>>> be possible to perform TM-level merge. The only thing we should do
>>>> is
>>>>>> to
>>>>>>>>> keep previous checkpoint files alive when a region checkpoint
>>>> occurs.
>>>>>>>> This
>>>>>>>>> does not conflict with the current design. It is only necessary to
>>>>>> ensure
>>>>>>>>> that the new behavior of checkpoint notifications is compatible
>>>> with
>>>>>>>>> FLIP-306, or some minor adjustment needed towards the merging
>>>>> manager.
>>>>>>>>> 
>>>>>>>>> And BTW to @Rui, we still need more work to let FLIP-306 cover
>>>>> channel
>>>>>>>>> state and deprecate FLINK-26803, and I will look into this soon.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Zakelly
>>>>>>>>> 
>>>>>>>>> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Roman, thanks for the follow-up.
>>>>>>>>>> 
>>>>>>>>>> 1. FLIP page
>>>>>>>>>> 
>>>>>>>>>> Thanks for your reminder, I'll create the proper FLIP page before
>>>>>>>> starting
>>>>>>>>>> [VOTE] thread.
>>>>>>>>>> 
>>>>>>>>>> 2. Rescaling for OperatorSubtaskState
>>>>>>>>>> 
>>>>>>>>>> `refCheckpointId` is checkpoint-level metadata stored in the
>>>>> completed
>>>>>>>>>> checkpoint — it doesn't participate in state redistribution during
>>>>>>>>>> rescaling. The `TaskStateAssignment` redistributes only the state
>>>>>>>> handles
>>>>>>>>>> to new subtasks, while the `CompletedCheckpointStore` retains the
>>>>>>>> original
>>>>>>>>>> metadata (including `refCheckpointId`) for the cleaner to trace
>>>>>>>> reference
>>>>>>>>>> chains. So rescaling is inherently compatible.
>>>>>>>>>> 
>>>>>>>>>> 3. Finished operators
>>>>>>>>>> 
>>>>>>>>>> You're right, my previous answer missed the real issue. For
>>>> bounded
>>>>>>>> source
>>>>>>>>>> jobs (FLIP-147), if the final checkpoint is Regional and a failed
>>>>>>>> Region's
>>>>>>>>>> subtask misses `notifyCheckpointComplete`, their side effects
>>>> (e.g.,
>>>>>>>> Kafka
>>>>>>>>>> transactions) are never committed — that's data loss.
>>>>>>>>>> 
>>>>>>>>>> Solution: force a global checkpoint when the job is about to
>>>>>> terminate.
>>>>>>>>>> When all sources are exhausted, the JM will mark the next
>>>> checkpoint
>>>>>> as
>>>>>>>>>> mandatory global, requiring all regions to ack. If it fails, the
>>>> job
>>>>>>>>>> retries rather than terminating with a partial snapshot.
>>>>>>>>>> 
>>>>>>>>>> 4. Limitations section
>>>>>>>>>> 
>>>>>>>>>> I agree with you, and I'll add a section covering all limitations:
>>>>>>>>>> 
>>>>>>>>>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime
>>>>>>>>>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
>>>>>>>>>> NO_CLAIM restore mode -> Warn (require global checkpoint before
>>>>>> snapshot
>>>>>>>>>> deletion)
>>>>>>>>>> Changelog state backend ->  Reject job submission
>>>>>>>>>> Finished operators (FLIP-147) -> Force global checkpoint when
>>>>> sources
>>>>>>>> are
>>>>>>>>>> exhausted
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Thanks again for the review, looking forward to your feedback.
>>>>>>>>>> 
>>>>>>>>>> Regards,
>>>>>>>>>> Raorao
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
>>>>>>>>>>> 
>>>>>>>>>>> Hi, thanks for your replies and sorry for the delay.
>>>>>>>>>>> 
>>>>>>>>>>> Most of my questions were answered, but I still have some
>>>> concerns.
>>>>>>>>>>> 
>>>>>>>>>>>> If there are no further concerns by next Monday (June 22), I'll
>>>> go
>>>>>>>> ahead
>>>>>>>>>>> and start the [VOTE] thread for this FLIP.
>>>>>>>>>>> 
>>>>>>>>>>> Isn't the actual FLIP still missing? I only saw Google Document.
>>>> Do
>>>>>> you
>>>>>>>>>>> mind creating a page according to [1]?
>>>>>>>>>>> 
>>>>>>>>>>> ----------------------------------------
>>>>>>>>>>> 
>>>>>>>>>>>> 3. Checkpoint metadata layout
>>>>>>>>>>>> Regional Checkpoint recombines state from different checkpoint
>>>>> IDs.
>>>>>> To
>>>>>>>>>>> track this, we add a refCheckpointId field to
>>>> OperatorSubtaskState
>>>>> in
>>>>>>>> the
>>>>>>>>>>> metadata, indicating which historical checkpoint a subtask’s
>>>> state
>>>>>>>>>>> references.
>>>>>>>>>>> 
>>>>>>>>>>> Could you explain how do we find the right OperatorSubtaskState -
>>>>>>>>>>> especially in case of rescaling?
>>>>>>>>>>> Does the proposal support rescaling?
>>>>>>>>>>> 
>>>>>>>>>>>> 9. Finished operators
>>>>>>>>>>>> The concern is: a finished operator’s final commit notification
>>>>> gets
>>>>>>>>>>> skipped by Regional Checkpoint, and if this checkpoint is the
>>>> last
>>>>>> one,
>>>>>>>>>> the
>>>>>>>>>>> operator never receives it — could this cause data loss?
>>>>>>>>>>>> In practice, the impact is limited:
>>>>>>>>>>>> ● Failed Region tasks are already gone: By the time the Regional
>>>>>>>>>>> Checkpoint completes, tasks in the failed Region have already
>>>> been
>>>>>>>>>>> restarted (decline) or cancelled (timeout). There is no task left
>>>>> to
>>>>>>>>>>> receive the notification anyway.
>>>>>>>>>>> Checkpoint failure doesn't necessarily cause a restart
>>>> (especially
>>>>> if
>>>>>>>>>> this
>>>>>>>>>>> is limited to one region). The tasks should still be up and
>>>>> running.
>>>>>>>>>>> 
>>>>>>>>>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After
>>>>>>>> reaching
>>>>>>>>>>> the limit, the next checkpoint is forced to be global, ensuring
>>>> all
>>>>>>>> tasks
>>>>>>>>>>> eventually receive notifyCheckpointComplete. We can’t skip the
>>>> same
>>>>>>>>>> Region
>>>>>>>>>>> forever.
>>>>>>>>>>> maxConsecutiveFailures might not be reached for the final
>>>>> checkpoint.
>>>>>>>>>>> 
>>>>>>>>>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the
>>>> user
>>>>>>>> stops
>>>>>>>>>>> the job gracefully, it triggers a full global snapshot, not a
>>>>>> Regional
>>>>>>>>>>> Checkpoint. So the final checkpoint is always complete.
>>>>>>>>>>> stop-with-savepoint should be fine, yes.
>>>>>>>>>>> 
>>>>>>>>>>> To clarify, my concern is about jobs with bounded sources. In
>>>> such
>>>>>>>> cases,
>>>>>>>>>>> some subtasks might finish processing but still participate in
>>>>>>>>>> checkpoints.
>>>>>>>>>>> After a successful checkpoint, they are guaranteed to get
>>>>> checkpoint
>>>>>>>>>>> completion notification - so that they can make side effects
>>>>> visible
>>>>>> in
>>>>>>>>>>> external systems (commit Kafka transactions).
>>>>>>>>>>> See FLIP-147 [2]
>>>>>>>>>>> 
>>>>>>>>>>> However, with the current proposal, the job might complete with
>>>>> some
>>>>>>>>>>> subtasks/regions failing the final checkpoint unless I'm missing
>>>>>>>>>> something.
>>>>>>>>>>> This is essentially data loss.
>>>>>>>>>>> To prevent this, the final checkpoint must always be acked by all
>>>>>>>>>>> subtasks/regions.
>>>>>>>>>>> 
>>>>>>>>>>> ----------------------------------------
>>>>>>>>>>> 
>>>>>>>>>>> There are quite some limitations in this proposal.
>>>>>>>>>>> Could you add a section describing how each of them is handled?
>>>>>>>>>>> 1. Reject job submission
>>>>>>>>>>> 2. Force all-region checkpoint
>>>>>>>>>>> 3. Warn in documentation
>>>>>>>>>>> 
>>>>>>>>>>>> 1. Region independence — BLOCKING/HYBRID edges
>>>>>>>>>>>> You’re right. Our current scope is limited to embarrassingly
>>>>>> parallel
>>>>>>>>>>> regions. In typical ETL scenarios, each parallelism maps to an
>>>>>>>>>> independent
>>>>>>>>>>> Region with no edges connecting them.
>>>>>>>>>>> 
>>>>>>>>>>>> 5. SharedStateRegistry — how are old states kept alive?
>>>>>>>>>>>> Good question. In the current design, since we only target
>>>>>>>>>> embarrassingly
>>>>>>>>>>> parallel regions, there is typically no keyed state and no
>>>>>> incremental
>>>>>>>>>>> state. As a result, the SharedStateRegistry is generally empty
>>>>>> (setting
>>>>>>>>>>> aside File Merging and Changelog State for now, discussed on 8.),
>>>>> so
>>>>>>>>>>> keep-alive of files under the shared directory is not a concern.
>>>>>>>>>>> 
>>>>>>>>>>>> 8. FLINK-26803 and FLIP-306 compatibility
>>>>>>>>>>>> This is a very important point. Both features essentially merge
>>>>>> small
>>>>>>>>>>> files at the job level. As Rui Fan pointed out, if the merging
>>>>>>>>>> granularity
>>>>>>>>>>> is reduced to the Region level, compatibility with Regional
>>>>>> Checkpoint
>>>>>>>>>>> should be achievable in theory. I think this can be deferred to
>>>>>> future
>>>>>>>>>> work
>>>>>>>>>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit
>>>>> and
>>>>>>>>>> enable
>>>>>>>>>>> support.
>>>>>>>>>>> 
>>>>>>>>>>>> 10. NO_CLAIM mode warning
>>>>>>>>>>>> You’re absolutely right — this is an important reminder. After
>>>>>>>> restoring
>>>>>>>>>>> from a Regional Checkpoint, only a successful global checkpoint
>>>>>>>>>> guarantees
>>>>>>>>>>> independence from the old state. We’ll add a clear user warning
>>>> in
>>>>>> the
>>>>>>>>>>> documentation.
>>>>>>>>>>> 
>>>>>>>>>>>> 11. Changelog state backend — not supported
>>>>>>>>>>>> As mentioned earlier, our primary target is embarrassingly
>>>>> parallel
>>>>>>>>>>> regions, which typically have no keyed state and therefore no
>>>> slow
>>>>>>>>>>> incremental state flush issues. I don’t think we need to support
>>>>>>>>>> Changelog
>>>>>>>>>>> state backend for now.
>>>>>>>>>>> 
>>>>>>>>>>> ----------------------------------------
>>>>>>>>>>> 
>>>>>>>>>>>> 2. max-consecutive-failures exceeded — what exactly happens?
>>>>>>>>>>>> The current design says “force a global checkpoint.” To clarify
>>>>> the
>>>>>>>>>>> two-tier behavior:
>>>>>>>>>>>> ● Tier 1: When consecutiveRegionalCount >=
>>>> maxConsecutiveFailures,
>>>>>> the
>>>>>>>>>>> next checkpoint is forced to be global.
>>>>>>>>>>>> ● Tier 2: If that forced global checkpoint also fails (any task
>>>>>>>>>>> declines), the checkpoint is aborted normally (not a job
>>>> failure).
>>>>>> The
>>>>>>>>>>> counter is then reset since a global checkpoint was attempted,
>>>> and
>>>>>> the
>>>>>>>>>> next
>>>>>>>>>>> checkpoint cycle can try again.
>>>>>>>>>>>> This avoids cascading into job failure while ensuring we don’t
>>>>> drift
>>>>>>>>>>> indefinitely on historical state.
>>>>>>>>>>> 
>>>>>>>>>>> My assumption was that we would not allow this particular failed
>>>>>> region
>>>>>>>>>> to
>>>>>>>>>>> fail the checkpoint again.
>>>>>>>>>>> But forcing a global checkpoint works as well.
>>>>>>>>>>> 
>>>>>>>>>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new
>>>>>>>>>>> notification type
>>>>>>>>>>>> This is a very insightful point. Zihao and Gen also raised this
>>>> in
>>>>>>>>>>> earlier discussions. The current design doesn’t address state
>>>>> cleanup
>>>>>>>> for
>>>>>>>>>>> tasks in failed regions. I agree it’s necessary to introduce a
>>>> new
>>>>>>>>>>> notification type. For tasks in failed regions, local state
>>>> cleanup
>>>>>> can
>>>>>>>>>> be
>>>>>>>>>>> deferred until the next checkpoint trigger.
>>>>>>>>>>> Ok, this can be some future work.
>>>>>>>>>>> 
>>>>>>>>>>>> 7. Task that never acknowledges nor declines — per-region
>>>> timeouts
>>>>>>>>>>>> This was discussed in the previous thread. Network issues may
>>>>> cause
>>>>>> a
>>>>>>>>>>> task to neither ack nor decline in time. In such cases, we treat
>>>> it
>>>>>> as
>>>>>>>> a
>>>>>>>>>>> checkpoint timeout: the affected tasks’ region is marked as
>>>> failed,
>>>>>> and
>>>>>>>>>> the
>>>>>>>>>>> process ultimately falls through to the normalRegional Checkpoint
>>>>>>>>>>> processing logic.
>>>>>>>>>>> Ok, this can be some future work.
>>>>>>>>>>> 
>>>>>>>>>>> ----------------------------------------
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
>>>>>>>>>>> 
>>>>>>>>>>> [2]
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>>>>>>>>>>> 
>>>>>>>>>>> Regards,
>>>>>>>>>>> Roman
>>>>>>>>>>> 
>>>>>>>>>>> Regards,
>>>>>>>>>>> Roman
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]>
>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks everyone for the valuable feedback. I believe all the
>>>>> points
>>>>>>>>>> raised
>>>>>>>>>>>> above have been addressed (@Roman @Rui Fan). If there are no
>>>>> further
>>>>>>>>>>>> concerns  by next Monday (June 22), I'll go ahead and start the
>>>>>> [VOTE]
>>>>>>>>>>>> thread for this FLIP.
>>>>>>>>>>>> 
>>>>>>>>>>>> For reference, the earlier related discussion can be found here:
>>>>>>>>>>>> 
>>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Please feel free to share any additional feedback before then.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>> Raorao
>>>>>>>>>>>> 
>>>>>>>>>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi devs,
>>>>>>>>>>>> 
>>>>>>>>>>>> I would like to start a discussion on FLIP-XXX: Independent
>>>>>> Checkpoint
>>>>>>>>>>>> Based On Pipeline Region.
>>>>>>>>>>>> 
>>>>>>>>>>>> In high-parallelism streaming jobs, a single Task's checkpoint
>>>>>> failure
>>>>>>>>>>>> causes the entire global Checkpoint to abort, leading to
>>>> degraded
>>>>>>>>>>>> checkpoint success rates and wasted compute resources
>>>> (especially
>>>>>> for
>>>>>>>>>> GPU
>>>>>>>>>>>> operators).
>>>>>>>>>>>> 
>>>>>>>>>>>> We propose Regional Checkpoint: when some Regions fail to
>>>>>> checkpoint,
>>>>>>>>>> the
>>>>>>>>>>>> framework combines the historical state of the failed Regions
>>>> with
>>>>>> the
>>>>>>>>>>>> current state of the healthy Regions to produce a logically
>>>>> complete
>>>>>>>>>>>> Completed Checkpoint — while preserving state consistency. The
>>>> key
>>>>>>>>>> changes
>>>>>>>>>>>> are:
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. Snapshot Collection — Allow partial region failures; combine
>>>>> last
>>>>>>>>>>>> successful state of failed Regions with current state of normal
>>>>>>>> Regions.
>>>>>>>>>>>> 
>>>>>>>>>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback
>>>>>>>>>> interface
>>>>>>>>>>>> for OperatorCoordinators to produce consistent snapshots against
>>>>> the
>>>>>>>>>> mixed
>>>>>>>>>>>> view.
>>>>>>>>>>>> 
>>>>>>>>>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to
>>>>> prevent
>>>>>>>>>>>> premature cleanup of referenced historical checkpoints.
>>>>>>>>>>>> 
>>>>>>>>>>>> The detailed design is described in the FLIP document:
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
>>>>>>>>>>>> 
>>>>>>>>>>>> Looking forward to your feedback!
>>>>>>>>>>>> 
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>> 
>>>>>>>>>>>> Raorao Xiong
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 


Reply via email to