jiexray commented on PR #21822:
URL: https://github.com/apache/flink/pull/21822#issuecomment-1418605498
Hello @fredia, thank you for creating this PR. I have some concern about the
removal of `LocalChangelogRegistryImpl#prune()`, please have a look.
In my opinion, the main purpose of `LocalChangelogRegistryImpl#prune()` is
to proactively remove the useless state uploaded by an aborted checkpoint. More
precisely, the useless state is uploaded by an aborted checkpoint and is not
shared by previous checkpoints. However, the current removal mechanism may
delete the state of a previous checkpoint. I think the root cause of the
undesirable removal is cause by the following code snippet:
```
public void register(StreamStateHandle handle, long checkpointID) {
handleToLastUsedCheckpointID.compute(
handle.getStreamStateHandleID(),
(k, v) -> {
if (v == null) {
return Tuple2.of(handle, checkpointID);
} else {
Preconditions.checkState(handle.equals(v.f0));
// !Here: already existed `handle` will be removed by an
aborted checkpoint with `checkpointID`
return Tuple2.of(handle, Math.max(v.f1, checkpointID));
}
});
}
```
This PR finds a way to fix the aforementioned root cause (`Math.max(v.f1,
checkpointID)`) by removing the `LocalChangelogRegistryImpl#prune()`.
Therefore, the useless state uploaded by an aborted checkpoint (denoted by
CHK) will be passively removed by
`LocalChangelogRegistryImpl#discardUpToCheckpoint(CHK + 1)`. I was wondering if
the useless state could be the blocker of the next invocation of
`LocalChangelogRegistryImpl#discardUpToCheckpoint(CHK + 1)`. For example,
maybe, the useless state takes up some disk space and leads to other failure.
By the way, I am thinking whether we could change the registering machanism
in `LocalChangelogRegistryImpl#register` to get around the undesirable removal.
The objective of this design is to stick on proactively removing the useless
state upon checkpoint aborting. For example, support that `CHK` is the current
checkpoint.
- Method-1: We temporarily store the `(CHK, StreamStateHandle)` in a new
map. When registering `CHK+1`, we merge the mapping of `(CHK,
StreamStateHandle)` to `handleToLastUsedCheckpointID`.
- Method-2: Use reference counting for `handleToLastUsedCheckpointID`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]