[ 
https://issues.apache.org/jira/browse/FLINK-20653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253970#comment-17253970
 ] 

Yun Tang commented on FLINK-20653:
----------------------------------

Though the code of checkpoint store changed much recently, I think the logic is 
still storing metadata first and then clean up the unreferenced incremental 
state:

Code below is pasted from 
[DefaultCompletedCheckpointStore.java#addCheckpoint.|https://github.com/apache/flink/blob/release-1.12/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java#L201-L226]
 And as you can see, Flink stores the metadata in 
{{checkpointStateHandleStore.addAndLock(path, checkpoint)}} and then discard 
previous unreferenced state in {{tryRemoveCompletedCheckpoint}} if no exception 
meet. 
{code:java}
        public void addCheckpoint(
                        final CompletedCheckpoint checkpoint,
                        CheckpointsCleaner checkpointsCleaner,
                        Runnable postCleanup) throws Exception {

                checkNotNull(checkpoint, "Checkpoint");

                final String path = 
completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());

                // Now add the new one. If it fails, we don't want to loose 
existing data.
                checkpointStateHandleStore.addAndLock(path, checkpoint);

                completedCheckpoints.addLast(checkpoint);

                // Everything worked, let's remove a previous checkpoint if 
necessary.
                while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
                        final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
                        tryRemoveCompletedCheckpoint(
                                completedCheckpoint,
                                
completedCheckpoint.shouldBeDiscardedOnSubsume(),
                                checkpointsCleaner,
                                postCleanup);
                }

                LOG.debug("Added {} to {}.", checkpoint, path);
        }
{code}
 

I have to say this part of code is refactored in release-1.12, and could you 
please share your exception logs with DFS audit log in detail to see whether 
there existed something unexpected leading data lost?

 

> SharedState should be registered after PendingCheckpoint storeCheckpoint
> ------------------------------------------------------------------------
>
>                 Key: FLINK-20653
>                 URL: https://issues.apache.org/jira/browse/FLINK-20653
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: weiyunqing
>            Priority: Major
>         Attachments: 123.png, image-2020-12-17-20-20-21-246.png
>
>
> SharedState should be registered after PendingCheckpoint storeCheckpoint,If 
> the metadata storage fails, the deleted incremental state cannot be recovered。
>  !image-2020-12-17-20-20-21-246.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to