[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinzhong Li updated FLINK-28515:
--------------------------------
    Description: 
In my case,  i found that some files in local recovery directory hasn't be 
clean up properly after checkpoint abort(as shown in the attached picture).

By analyzing flink log, I found that when stateBackend completes the local 
snapshot but the task has not completed the whole snapshot, 
then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
files in the local directory directory may not be cleaned up properly.

I think the reason for local snapshot file residual is:
(1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
the comleted localSnapshot info can be registered into 
org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
completed the whole snapshot. 
([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
(2) If stateBackend completes the local snapshot but the task has not completed 
the entire snapshot, when checkpoint-aborting is triggered, the 
TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])

(3) And when SubtaskCheckpointCoordinatorImpl receive the abort notification, 
it will cancel all the ongoing stateSnapshot futureTask in 
'AsyncCheckpointRunnable.close()'.  For rocksdbKeyedStatebackend, 
[AsyncSnapshotTask.cancel 
|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]will
 be invoke during checkpoint abort. After this, the 
[RocksDBIncrementalSnapshotOperation.get 
|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L384]may
 still run until it completes.
And the localSnapshot files can't be cleaned up in 
RocksDBIncrementalSnapshotOperation.get(finally) and 
AsyncSnapshotCallable.call([finally-cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]).
Then the localSnapshot files  also can't be cleaned up in 
[AsyncCheckpointRunnable.cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L391],
 because 
[AsyncSnapshotTask.cancel|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java#L78]
 return ture.

 

To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, we 
can try to delete the corresponding localRecovery directory, even if the 
checkpoint is not unregistered into TaskLocalStateStoreImpl.

 

 

  was:
In my case,  i found that some files in local recovery directory hasn't be 
clean up properly after checkpoint abort(as shown in the attached picture).

By analyzing flink log, I found that when stateBackend completes the local 
snapshot but the task has not completed the whole snapshot, 
then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
files in the local directory directory may not be cleaned up properly.

I think the reason for local snapshot file residual is:
(1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
the comleted localSnapshot info can be registered into 
org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
completed the whole snapshot. 
([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
(2) If stateBackend completes the local snapshot but the task has not completed 
the entire snapshot, when checkpoint-aborting is triggered, the 
TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])

(3) And when SubtaskCheckpointCoordinatorImpl receive the abort notification, 
it will cancel all the ongoing stateSnapshot futureTask in 
'AsyncCheckpointRunnable.close()'. 

To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, we 
can try to delete the corresponding localRecovery directory, even if the 
checkpoint is not unregistered into TaskLocalStateStoreImpl.

 

 


> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-28515
>                 URL: https://issues.apache.org/jira/browse/FLINK-28515
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.15.1, 1.16.0
>            Reporter: Jinzhong Li
>            Assignee: Jinzhong Li
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, 
> image-2022-07-19-18-28-20-239.png, image.png
>
>
> In my case,  i found that some files in local recovery directory hasn't be 
> clean up properly after checkpoint abort(as shown in the attached picture).
> By analyzing flink log, I found that when stateBackend completes the local 
> snapshot but the task has not completed the whole snapshot, 
> then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
> files in the local directory directory may not be cleaned up properly.
> I think the reason for local snapshot file residual is:
> (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
> the comleted localSnapshot info can be registered into 
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
> completed the whole snapshot. 
> ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
> (2) If stateBackend completes the local snapshot but the task has not 
> completed the entire snapshot, when checkpoint-aborting is triggered, the 
> TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
> ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])
> (3) And when SubtaskCheckpointCoordinatorImpl receive the abort notification, 
> it will cancel all the ongoing stateSnapshot futureTask in 
> 'AsyncCheckpointRunnable.close()'.  For rocksdbKeyedStatebackend, 
> [AsyncSnapshotTask.cancel 
> |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]will
>  be invoke during checkpoint abort. After this, the 
> [RocksDBIncrementalSnapshotOperation.get 
> |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L384]may
>  still run until it completes.
> And the localSnapshot files can't be cleaned up in 
> RocksDBIncrementalSnapshotOperation.get(finally) and 
> AsyncSnapshotCallable.call([finally-cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]).
> Then the localSnapshot files  also can't be cleaned up in 
> [AsyncCheckpointRunnable.cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L391],
>  because 
> [AsyncSnapshotTask.cancel|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java#L78]
>  return ture.
>  
> To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, 
> we can try to delete the corresponding localRecovery directory, even if the 
> checkpoint is not unregistered into TaskLocalStateStoreImpl.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to