[ 
https://issues.apache.org/jira/browse/FLINK-24611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448614#comment-17448614
 ] 

Roman Khachatryan commented on FLINK-24611:
-------------------------------------------

Just to clarify, the recent discussion is about dropping 
PlaceholderStreamStateHandle AND not using ByteStreamStateHandle , right?

Currently, PlaceholderStreamStateHandle is used for: 1) de-duplication; 2) 
avoid sending ByteStreamStateHandle twice.

This ticket removes the need for (1) and we are considering whether it makes 
sense to drop (2).

 

Codewise, it doesn't seem to simplify a lot (not considering 
FsCheckpointStateOutputStream which creates ByteStreamStateHandle). For RocksDB 
and SharedStateRegistry it's only 2-3 if checks.

 

ChangelogStateBackend currently doesn't use ByteStreamStateHandle. But with 
Unaligned checkpoints, channel state can often be small and here we could see a 
regression.

> Prevent JM from discarding state on checkpoint abortion
> -------------------------------------------------------
>
>                 Key: FLINK-24611
>                 URL: https://issues.apache.org/jira/browse/FLINK-24611
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> ================================================================================
> MOTIVATION
> When a checkpoint is aborted, JM discards any state that was sent to it and 
> wasn't used in other checkpoints. This forces incremental state backends to 
> wait for confirmation from JM before re-using this state. For changelog 
> backend this is even more critical.
> One approach proposed was to make backends/TMs responsible for their state, 
> until it's not shared with other TMs, i.e. until rescaling (private/shared 
> state ownership track: FLINK-23342 and more).
> However, that approach is quite invasive.
> An alternative solution would be:
> 1. SharedStateRegistry remembers the latest checkpoint for each shared state 
> (instead of usage count currently)
> 2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on 
> subsumption)
> 3. SharedStateRegistry then discards any state associated with the lower 
> (subsumed/aborted) checkpoints
> So the aborted checkpoint can only be discarded after some subsequent 
> successful checkpoint (which can mark state as used).
> Mostly JM code is changed. IncrementalRemoteKeyedStateHandle.discard needs to 
> be adjusted.
> Backends don't re-upload state.
> ================================================================================
> IMPLEMENTATION CONSIDERATIONS
> On subsumption, JM needs to find all the unused state and discard it.
> This can either be done by
> 1) simply traversing all entries; or by 
> 2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, 
> Set<K>>). This allows to skip unnecessary traversal at the cost of higher 
> memory usage
> In both cases:
> - each entry stores last checkpoint ID it was used in (long)
> - key is hashed (even with plain traversal, map.entrySet.iterator.remove() 
> computes hash internally)
> Because of the need to maintain secondary sets, (2) isn't asymptotically 
> better than (1), and is likely worse in practice and requires more memory 
> (see discussion in the comments). So approach (1) seems reasonable.
> ================================================================================
> CORNER CASES
> The following cases shouldn't pose any difficulties:
> 1. Recovery, re-scaling, and state used by not all or by no tasks - we still 
> register all states on recovery even after FLINK-22483/FLINK-24086
> 2. Cross-task state sharing - not an issue as long as everything is managed 
> by JM
> 3. Dependencies between SharedStateRegistry and CompletedCheckpointStore - 
> simple after FLINK-24086
> 4. Multiple concurrent checkpoints (below)
> Consider the following case:
> (nr. concurrent checkpoints > 1)
> 1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets 
> aborted - f1 is now tracked
> 2. savepoint 2 starts, it *will* use f1
> 3. checkpoint 3 starts and finishes; it does NOT use file f1
> When a checkpoint finishes, all pending checkpoints before it are aborted - 
> but not savepoints.
> Savepoints currently are NOT incremental. And in the future, incremental 
> savepoints shouldn't share any artifacts with checkpionts.
> The following should be kept in mind:
> 1. On job cancellation, state of aborted checkpoints should be cleaned up 
> explicitly
> 2. Savepoints should be ignored and not change 
> CheckpointStore.lowestCheckpointID
> 3. In case of JM failover, there might be more state left undeleted (see 
> follow-up FLINK-24852 and/or comments)
> 4. To handle JM leadership change,  backends should NOT send 
> PlaceholderStreamStateHandles unless the checkpoint was confirmed (see 
> comments)
> 5. To handle TM failover after an incomplete checkpoint, JM should replace 
> old state handle with a new one in case of collision (test that old state 
> isn't included into a completed checkpoint) (see comments)
> ================================================================================
> USER IMPACT
> For the end users, this change might render as a delay in discarding state of 
> aborted checkpoints and in slight increase of undeleted state in case of 
> failures; which seems acceptable.
> After updating backends to not re-upload state, checkpointing time should be 
> reduced and less IO will be used (in cases when notifications are delayed or 
> new JM is elected).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to