[ 
https://issues.apache.org/jira/browse/FLINK-24611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-24611:
--------------------------------------
    Description: 
================================================================================
MOTIVATION
When a checkpoint is aborted, JM discards any state that was sent to it and 
wasn't used in other checkpoints. This forces incremental state backends to 
wait for confirmation from JM before re-using this state. For changelog backend 
this is even more critical.

One approach proposed was to make backends/TMs responsible for their state, 
until it's not shared with other TMs, i.e. until rescaling (private/shared 
state ownership track: FLINK-23342 and more).
However, that approach is quite invasive.

An alternative solution would be:
1. SharedStateRegistry remembers the latest checkpoint for each shared state 
(instead of usage count currently)
2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on 
subsumption)
3. SharedStateRegistry then discards any state associated with the lower 
(subsumed/aborted) checkpoints
So the aborted checkpoint can only be discarded after some subsequent 
successful checkpoint (which can mark state as used).

Mostly JM code is changed. IncrementalRemoteKeyedStateHandle.discard needs to 
be adjusted.
Backends don't re-upload state.

================================================================================
IMPLEMENTATION CONSIDERATIONS

On subsumption, JM needs to find all the unused state and discard it.
This can either be done by
1) simply traversing all entries; or by 
2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, Set<K>>). 
This allows to skip unnecessary traversal at the cost of higher memory usage

In both cases:
- each entry stores last checkpoint ID it was used in (long)
- key is hashed (even with plain traversal, map.entrySet.iterator.remove() 
computes hash internally)

Because of the need to maintain secondary sets, (2) isn't asymptotically better 
than (1), and is likely worse in practice and requires more memory (see 
discussion in the comments). So approach (1) seems reasonable.

================================================================================
CORNER CASES

The following cases shouldn't pose any difficulties:
1. Recovery, re-scaling, and state used by not all or by no tasks - we still 
register all states on recovery even after FLINK-22483/FLINK-24086
2. Cross-task state sharing - not an issue as long as everything is managed by 
JM
3. Dependencies between SharedStateRegistry and CompletedCheckpointStore - 
simple after FLINK-24086
4. Multiple concurrent checkpoints (below)

Consider the following case:
(nr. concurrent checkpoints > 1)
1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets 
aborted - f1 is now tracked
2. savepoint 2 starts, it *will* use f1
3. checkpoint 3 starts and finishes; it does NOT use file f1

When a checkpoint finishes, all pending checkpoints before it are aborted - but 
not savepoints.
Savepoints currently are NOT incremental. And in the future, incremental 
savepoints shouldn't share any artifacts with checkpionts.

The following should be kept in mind:
1. On job cancellation, state of aborted checkpoints should be cleaned up 
explicitly
2. Savepoints should be ignored and not change 
CheckpointStore.lowestCheckpointID
3. PlaceholderStreamStateHandles can NOT be sent from backend unless the 
checkpoint was confirmed (see comments)
4. There are new cases when unused state can be undeleted (see follow-up 
FLINK-24852 and/or comments)
5. JM should replace old state handle with a new one in case of duplication 
when old state isn't included into a completed checkpoint (see comments)

For the end users, this change might render as a delay in discarding state of 
aborted checkpoints and in slight increase of undeleted state in case of 
failures; which seems acceptable.

  was:
When a checkpoint is aborted, JM discards any state that was sent to it and 
wasn't used in other checkpoints. This forces incremental state backends to 
wait for confirmation from JM before re-using this state. For changelog backend 
this is even more critical.

One approach proposed was to make backends/TMs responsible for their state, 
until it's not shared with other TMs, i.e. until rescaling (private/shared 
state ownership track: FLINK-23342 and more).
However, that approach is quite invasive.

An alternative solution would be:
1. SharedStateRegistry remembers the latest checkpoint for each shared state 
(instead of usage count currently)
2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on 
subsumption)
3. SharedStateRegistry then discards any state associated with the lower 
(subsumed/aborted) checkpoints
So the aborted checkpoint can only be discarded after some subsequent 
successful checkpoint (which can mark state as used).

Mostly JM code is changed. IncrementalRemoteKeyedStateHandle.discard needs to 
be adjusted.
Backends don't re-upload state.

================================================================================
Implementation considerations.

On subsumption, JM needs to find all the unused state and discard it.
This can either be done by
1) simply traversing all entries; or by 
2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, Set<K>>). 
This allows to skip unnecessary traversal at the cost of higher memory usage

In both cases:
- each entry stores last checkpoint ID it was used in (long)
- key is hashed (even with plain traversal, map.entrySet.iterator.remove() 
computes hash internally)

Because of the need to maintain secondary sets, (2) isn't asymptotically better 
than (1), and is likely worse in practice and requires more memory (see 
discussion in the comments). So approach (1) seems reasonable.

The following cases shouldn't pose any difficulties:
1. Recovery, re-scaling, and state used by not all or by no tasks - we still 
register all states on recovery even after FLINK-22483/FLINK-24086
2. Cross-task state sharing - not an issue as long as everything is managed by 
JM
3. Dependencies between SharedStateRegistry and CompletedCheckpointStore - 
simple after FLINK-24086
4. Multiple concurrent checkpoints (below)

Consider the following case:
(nr. concurrent checkpoints > 1)
1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets 
aborted - f1 is now tracked
2. savepoint 2 starts, it *will* use f1
3. checkpoint 3 starts and finishes; it does NOT use file f1

When a checkpoint finishes, all pending checkpoints before it are aborted - but 
not savepoints.
Savepoints currently are NOT incremental. And in the future, incremental 
savepoints shouldn't share any artifacts with checkpionts.

The following should be kept in mind:
1. On job cancellation, state of aborted checkpoints should be cleaned up 
explicitly
2. Savepoints should be ignored and not change 
CheckpointStore.lowestCheckpointID
3. PlaceholderStreamStateHandles can NOT be sent from backend unless the 
checkpoint was confirmed (see comments)
4. There are new cases when unused state can be undeleted (see follow-up 
FLINK-24852 and/or comments)
5. JM should replace old state handle with a new one in case of duplication 
when old state isn't included into a completed checkpoint (see comments)

For the end users, this change might render as a delay in discarding state of 
aborted checkpoints and in slight increase of undeleted state in case of 
failures; which seems acceptable.


> Prevent JM from discarding state on checkpoint abortion
> -------------------------------------------------------
>
>                 Key: FLINK-24611
>                 URL: https://issues.apache.org/jira/browse/FLINK-24611
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> ================================================================================
> MOTIVATION
> When a checkpoint is aborted, JM discards any state that was sent to it and 
> wasn't used in other checkpoints. This forces incremental state backends to 
> wait for confirmation from JM before re-using this state. For changelog 
> backend this is even more critical.
> One approach proposed was to make backends/TMs responsible for their state, 
> until it's not shared with other TMs, i.e. until rescaling (private/shared 
> state ownership track: FLINK-23342 and more).
> However, that approach is quite invasive.
> An alternative solution would be:
> 1. SharedStateRegistry remembers the latest checkpoint for each shared state 
> (instead of usage count currently)
> 2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on 
> subsumption)
> 3. SharedStateRegistry then discards any state associated with the lower 
> (subsumed/aborted) checkpoints
> So the aborted checkpoint can only be discarded after some subsequent 
> successful checkpoint (which can mark state as used).
> Mostly JM code is changed. IncrementalRemoteKeyedStateHandle.discard needs to 
> be adjusted.
> Backends don't re-upload state.
> ================================================================================
> IMPLEMENTATION CONSIDERATIONS
> On subsumption, JM needs to find all the unused state and discard it.
> This can either be done by
> 1) simply traversing all entries; or by 
> 2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, 
> Set<K>>). This allows to skip unnecessary traversal at the cost of higher 
> memory usage
> In both cases:
> - each entry stores last checkpoint ID it was used in (long)
> - key is hashed (even with plain traversal, map.entrySet.iterator.remove() 
> computes hash internally)
> Because of the need to maintain secondary sets, (2) isn't asymptotically 
> better than (1), and is likely worse in practice and requires more memory 
> (see discussion in the comments). So approach (1) seems reasonable.
> ================================================================================
> CORNER CASES
> The following cases shouldn't pose any difficulties:
> 1. Recovery, re-scaling, and state used by not all or by no tasks - we still 
> register all states on recovery even after FLINK-22483/FLINK-24086
> 2. Cross-task state sharing - not an issue as long as everything is managed 
> by JM
> 3. Dependencies between SharedStateRegistry and CompletedCheckpointStore - 
> simple after FLINK-24086
> 4. Multiple concurrent checkpoints (below)
> Consider the following case:
> (nr. concurrent checkpoints > 1)
> 1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets 
> aborted - f1 is now tracked
> 2. savepoint 2 starts, it *will* use f1
> 3. checkpoint 3 starts and finishes; it does NOT use file f1
> When a checkpoint finishes, all pending checkpoints before it are aborted - 
> but not savepoints.
> Savepoints currently are NOT incremental. And in the future, incremental 
> savepoints shouldn't share any artifacts with checkpionts.
> The following should be kept in mind:
> 1. On job cancellation, state of aborted checkpoints should be cleaned up 
> explicitly
> 2. Savepoints should be ignored and not change 
> CheckpointStore.lowestCheckpointID
> 3. PlaceholderStreamStateHandles can NOT be sent from backend unless the 
> checkpoint was confirmed (see comments)
> 4. There are new cases when unused state can be undeleted (see follow-up 
> FLINK-24852 and/or comments)
> 5. JM should replace old state handle with a new one in case of duplication 
> when old state isn't included into a completed checkpoint (see comments)
> For the end users, this change might render as a delay in discarding state of 
> aborted checkpoints and in slight increase of undeleted state in case of 
> failures; which seems acceptable.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to