[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579515#comment-17579515 ]
ChangjiGuo commented on FLINK-28927: ------------------------------------ [~yunta] Thanks for your replay. In my first solution, I caught the exception while waiting for the upload to complete and set a callback for all futures. It could delete most of the remaining files. But I found that there are some threads that are not interrupted normally, the files will upload complete and the solution above doesn't work well. In this method(https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L265), the completed variable will be set true, and uploaded file is also not cleaned up. I'm still testing for this. In short, interrupting a thread has some unexpected result. What do you think, looking forward to your reply. [~yunta] > Can not clean up the uploaded shared files when the checkpoint times out > ------------------------------------------------------------------------ > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 > Reporter: ChangjiGuo > Priority: Major > > If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable > thread and do some cleanup work, including the following: > * Cancel all AsyncSnapshotCallable thread. > * If the thread has finished, it will clean up all state object. > * If the thread has not completed, it will be interrupted. > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it and it works well so far. -- This message was sent by Atlassian Jira (v8.20.10#820010)