[ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-23949:
--------------------------------
    Description: 
In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
corresponding to the checkpoint id,and clean it in 
_CheckpointListener#notifyCheckpointComplete ._
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
    synchronized (materializedSstFiles) {
        if (completedCheckpointId > lastCompletedCheckpointId) {
            materializedSstFiles
                    .keySet()
                    .removeIf(checkpointId -> checkpointId < 
completedCheckpointId);
            lastCompletedCheckpointId = completedCheckpointId;
        }
    }
}{code}
 

This works well without savepoint, but when a savepoint is completed, it will 
clean up the _materializedSstFiles_ of the previous checkpoint. It leads to the 
first checkpoint after the savepoint must upload all files in rocksdb.

!image-2021-08-25-00-59-05-779.png|width=1188,height=163!

Solving the problem is also very simple, I propose to change 
CheckpointListener#notifyCheckpointComplete to the following form : 
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
    synchronized (materializedSstFiles) {
        if (completedCheckpointId > lastCompletedCheckpointId
                && 
materializedSstFiles.keySet().contains(completedCheckpointId)) {
            materializedSstFiles
                    .keySet()
                    .removeIf(checkpointId -> checkpointId < 
completedCheckpointId);
            lastCompletedCheckpointId = completedCheckpointId;
        }
    }
}
{code}
 

 

 

  was:
In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
corresponding to the checkpoint id,and clean it in 
_CheckpointListener#notifyCheckpointComplete ._
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
    synchronized (materializedSstFiles) {
        if (completedCheckpointId > lastCompletedCheckpointId) {
            materializedSstFiles
                    .keySet()
                    .removeIf(checkpointId -> checkpointId < 
completedCheckpointId);
            lastCompletedCheckpointId = completedCheckpointId;
        }
    }
}{code}
 

This works well without savepoint, but when a savepoint is completed, it will 
clean up the _materializedSstFiles_ of the previous checkpoint. It leads to the 
first checkpoint after the savepoint must upload all files in rocksdb.

!image-2021-08-25-00-59-05-779.png|width=1188,height=163!

Solving the problem is also very simple, I propose to change 
CheckpointListener#notifyCheckpointComplete to the following form :

 
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
    synchronized (materializedSstFiles) {
        if (completedCheckpointId > lastCompletedCheckpointId
                && 
materializedSstFiles.keySet().contains(completedCheckpointId)) {
            materializedSstFiles
                    .keySet()
                    .removeIf(checkpointId -> checkpointId < 
completedCheckpointId);
            lastCompletedCheckpointId = completedCheckpointId;
        }
    }
}
{code}
 

 

 


> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-23949
>                 URL: https://issues.apache.org/jira/browse/FLINK-23949
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>    Affects Versions: 1.11.4, 1.12.5, 1.13.2
>            Reporter: Feifan Wang
>            Priority: Major
>         Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
>     synchronized (materializedSstFiles) {
>         if (completedCheckpointId > lastCompletedCheckpointId) {
>             materializedSstFiles
>                     .keySet()
>                     .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
>             lastCompletedCheckpointId = completedCheckpointId;
>         }
>     }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to change 
> CheckpointListener#notifyCheckpointComplete to the following form : 
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
>     synchronized (materializedSstFiles) {
>         if (completedCheckpointId > lastCompletedCheckpointId
>                 && 
> materializedSstFiles.keySet().contains(completedCheckpointId)) {
>             materializedSstFiles
>                     .keySet()
>                     .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
>             lastCompletedCheckpointId = completedCheckpointId;
>         }
>     }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to