[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568458#comment-17568458
 ] 

Jinzhong Li edited comment on FLINK-28515 at 7/19/22 10:32 AM:
---------------------------------------------------------------

[~roman]  I thought about this problem again these days. 

I found the previous analysis for the problem was not comprehensive, the state 
object of uncompleted checkpoint will be cleaned up in 
[AsyncCheckpointRunnable.cleanup|#L377];]

But i think there is still a problem that the localSnapshot files can't be 
cleaned up properly after checkpoint abort.

By reviewing the flink code, i think the reason is:
 # For FutureTask, if it is cancelled, the callable in it may still run until 
it completes or encounters an exception.
 # When one checkpoint is aborted, its corresponding FutureTask will be 
cancelled.  
For rocksdbKeyedStatebackend, [AsyncSnapshotTask.cancel 
|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]will
 be invoke during checkpoint abort. After this, the 
[RocksDBIncrementalSnapshotOperation.get 
|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L384]may
 still run until it completes.
And the localSnapshot files can't be cleaned up in 
RocksDBIncrementalSnapshotOperation.get([finally|[https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L447]])
 and 
AsyncSnapshotCallable.call([finally-cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]).
Then the localSnapshot files  also can't be cleaned up in 
[AsyncCheckpointRunnable.cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L391],
 because 
[AsyncSnapshotTask.cancel|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java#L78]
 return ture.

!image-2022-07-19-18-28-20-239.png|width=812,height=177!

To fix this problem, i think we can 

(1) when TaskLocalStateStoreImpl abort Checkpoint, try to delete the 
corresponding localRecovery directory, even if the checkpoint is not 
unregistered into TaskLocalStateStoreImpl.
Or (2) In AsyncSnapshotCallable.cleanup, check whether the snapshot FutureTask 
is cancelled; If true, force to clean up the localRecovery directory.

[~roman]  WDYT?


was (Author: lijinzhong):
[~roman]  I thought about this problem again these days. 

I found the previous analysis for the problem was not comprehensive, the state 
object of uncompleted checkpoint will be cleaned up in 
[AsyncCheckpointRunnable.cleanup|[https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L377];]

But i think there is still a problem that the localSnapshot files can't be 
cleaned up properly after checkpoint abort.

By reviewing the flink code, i think the reason is:
 # For FutureTask, if it is cancelled, the callable in it may still run until 
it completes or encounters an exception.
 # When one checkpoint is aborted, its corresponding FutureTask will be 
cancelled.  
For rocksdbKeyedStatebackend, [AsyncSnapshotTask.cancel 
|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]will
 be invoke during checkpoint abort. After this, the 
[RocksDBIncrementalSnapshotOperation.get 
|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L384]may
 still run until it completes.
And the localSnapshot files can't be cleaned up in 
RocksDBIncrementalSnapshotOperation.get([https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L447])
 and 
AsyncSnapshotCallable.call([finally-cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]).
Then the localSnapshot files  also can't be cleaned up in 
[AsyncCheckpointRunnable.cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L391],
 because 
[AsyncSnapshotTask.cancel|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java#L78]
 return ture.

!image-2022-07-19-18-28-20-239.png|width=812,height=177!

To fix this problem, i think we can 

(1) when TaskLocalStateStoreImpl abort Checkpoint, try to delete the 
corresponding localRecovery directory, even if the checkpoint is not 
unregistered into TaskLocalStateStoreImpl.
Or (2) In AsyncSnapshotCallable.cleanup, check whether the snapshot FutureTask 
is cancelled; If true, force to clean up the localRecovery directory.

[~roman]  WDYT?

> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-28515
>                 URL: https://issues.apache.org/jira/browse/FLINK-28515
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.15.1, 1.16.0
>            Reporter: Jinzhong Li
>            Assignee: Jinzhong Li
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, 
> image-2022-07-19-18-28-20-239.png, image.png
>
>
> In my case,  i found that some files in local recovery directory hasn't be 
> clean up properly after checkpoint abort(as shown in the attached picture).
> By analyzing flink log, I found that when stateBackend completes the local 
> snapshot but the task has not completed the whole snapshot, 
> then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
> files in the local directory directory may not be cleaned up properly.
> I think the reason for local snapshot file residual is:
> (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
> the comleted localSnapshot info can be registered into 
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
> completed the whole snapshot. 
> ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
> (2) If stateBackend completes the local snapshot but the task has not 
> completed the entire snapshot, when checkpoint-aborting is triggered, the 
> TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
> ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])
> To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, 
> we can try to delete the corresponding localRecovery directory, even if the 
> checkpoint is not unregistered into TaskLocalStateStoreImpl.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to