[
https://issues.apache.org/jira/browse/FLINK-24611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Khachatryan reassigned FLINK-24611:
-----------------------------------------
Assignee: Roman Khachatryan
> Prevent JM from discarding state on checkpoint abortion
> -------------------------------------------------------
>
> Key: FLINK-24611
> URL: https://issues.apache.org/jira/browse/FLINK-24611
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Affects Versions: 1.15.0
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Fix For: 1.15.0
>
>
> When a checkpoint is aborted, JM discards any state that was sent to it and
> wasn't used in other checkpoints. This forces incremental state backends to
> wait for confirmation from JM before re-using this state. For changelog
> backend this is even more critical.
> One approach proposed was to make backends/TMs responsible for their state,
> until it's not shared with other TMs, i.e. until rescaling (private/shared
> state ownership track: FLINK-23342 and more).
> However, that approach is quite invasive.
>
> An alternative solution would be:
> 1. SharedStateRegistry remembers the latest checkpoint for each shared state
> (instead of usage count currently)
> 2. CompletedCheckpointStore notifies it about the lowest valid checkpoint
> (on subsumption)
> 3. SharedStateRegistry then discards any state associated with the lower
> (subsumed/aborted) checkpoints
> So the aborted checkpoint can only be discarded after some subsequent
> successful checkpoint (which can mark state as used).
> Only JM code is changed.
>
> Implementation considerations.
> On subsumption, JM needs to find all the unused state and discard it.
> This can either be done by
> 1) simply traversing all entries; or by
> 2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long,
> List<K>>). This allows to skip unnecessary traversal at the cost of higher
> memory usage
> In both cases:
> - each entry stores last checkpoint ID it was used in (long)
> - key is hashed (even with plain traversal, map.entrySet.iterator.remove()
> computes hash internally)
> Given the following constraints:
> - 10M state entries at most
> - 10 (retained) checkpoint at most
> - 10 checkpoints per second at most
> - state entry key takes 32B (usually UUID or two UUIDs)
> The extra space for (2) would be in order of 10M*32B=38Mb.
> The extra time for (1) would be in order of 10M * 10 checkpoints per second
> * ratio of outdated entries per checkpoint. Depending on the ratio and the
> hardware, this could take up to hundreds of ms per second, blocking the main
> thread.
> So approach (2) seems reasonable.
>
> The following cases shouldn't pose any difficulties:
> 1. Recovery, re-scaling, and state used by not all or by no tasks - we still
> register all states on recovery even after FLINK-22483/FLINK-24086
> 2. PlaceholderStreamStateHandles
> 3. Cross-task state sharing - not an issue as long as everything is managed
> by JM
> 4. Dependencies between SharedStateRegistry and CompletedCheckpointStore -
> simple after FLINK-24086
> 5. Multiple concurrent checkpoints (below)
> Consider the following case:
> (nr. concurrent checkpoints > 1)
> 1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets
> aborted - f1 is now tracked
> 2. savepoint 2 starts, it *will* use f1
> 3. checkpoint 3 starts and finishes; it does NOT use file f1
> When a checkpoint finishes, all pending checkpoints before it are aborted -
> but not savepoints.
> Savepoints currently are NOT incremental. And in the future, incremental
> savepoints shouldn't share any artifacts with checkpionts.
> The following should be kept in mind:
> 1. On job cancellation, state of aborted checkpoints should be cleaned up
> explicitly
> 2. Savepoints should be ignored and not change
> CheckpointStore.lowestCheckpointID
>
> For the end users, this change might render as a delay in discarding state of
> aborted checkpoints, which seems acceptable.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)