[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579515#comment-17579515
 ] 

ChangjiGuo commented on FLINK-28927:
------------------------------------

[~yunta] Thanks for your replay. In my first solution, I caught the exception 
while waiting for the upload to complete and set a callback for all futures. It 
could delete most of the remaining files. But I found that there are some 
threads that are not interrupted normally, the files will upload complete and 
the solution above doesn't work well. 

In this 
method(https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L265),
 the completed variable will be set true, and uploaded file is also not cleaned 
up. I'm still testing for this. 

In short, interrupting a thread has some unexpected result. What do you think, 
looking forward to your reply. [~yunta]




> Can not clean up the uploaded shared files when the checkpoint times out
> ------------------------------------------------------------------------
>
>                 Key: FLINK-28927
>                 URL: https://issues.apache.org/jira/browse/FLINK-28927
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.11.6, 1.15.1
>         Environment: Flink-1.11
>            Reporter: ChangjiGuo
>            Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
> * Cancel all AsyncSnapshotCallable thread.
> * If the thread has finished, it will clean up all state object.
> * If the thread has not completed, it will be interrupted.
> * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
> * Files that have finished uploading before the thread is canceled.
> * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to