Feifan Wang created FLINK-23949:
-----------------------------------
Summary: first incremental checkpoint after a savepoint will
degenerate into a full checkpoint
Key: FLINK-23949
URL: https://issues.apache.org/jira/browse/FLINK-23949
Project: Flink
Issue Type: Improvement
Components: Runtime / State Backends
Affects Versions: 1.13.2, 1.12.5, 1.11.4
Reporter: Feifan Wang
Attachments: image-2021-08-25-00-59-05-779.png
In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files
corresponding to the checkpoint id,and clean it in
_CheckpointListener#notifyCheckpointComplete ._
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
synchronized (materializedSstFiles) {
if (completedCheckpointId > lastCompletedCheckpointId) {
materializedSstFiles
.keySet()
.removeIf(checkpointId -> checkpointId <
completedCheckpointId);
lastCompletedCheckpointId = completedCheckpointId;
}
}
}{code}
This works well without savepoint, but when a savepoint is completed, it will
clean up the _materializedSstFiles_ of the previous checkpoint. It leads to the
first checkpoint after the savepoint must upload all files in rocksdb.
!image-2021-08-25-00-59-05-779.png|width=1640,height=225!
Solving the problem is also very simple, I propose to change
CheckpointListener#notifyCheckpointComplete to the following form :
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
synchronized (materializedSstFiles) {
if (completedCheckpointId > lastCompletedCheckpointId
&&
materializedSstFiles.keySet().contains(completedCheckpointId)) {
materializedSstFiles
.keySet()
.removeIf(checkpointId -> checkpointId <
completedCheckpointId);
lastCompletedCheckpointId = completedCheckpointId;
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)