[ 
https://issues.apache.org/jira/browse/FLINK-39307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18067935#comment-18067935
 ] 

Zakelly Lan commented on FLINK-39307:
-------------------------------------

I would say this is a limited approach, as it only work in some storage and 
with full checkpoint taken. For some object storages, the recursive deletion of 
directory still involves listing files and one-by-one deletion, so it's better 
to delete them in parallel. I think it is valuable to add support for batch 
deletion on Flink's Filesystem interfaces. So how about we consider enrich the 
FS interfaces?

> Improve checkpoint deletion speed in CompletedCheckpoint
> --------------------------------------------------------
>
>                 Key: FLINK-39307
>                 URL: https://issues.apache.org/jira/browse/FLINK-39307
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Shihuan Liu
>            Priority: Minor
>
> In flink, due to checkpoints have a retention number, after completed a 
> checkpoint the job will delete an old one. Right now to delete that old 
> checkpoint, the files within the checkpoint are deleted in sequential:
> {code:java}
> discard()                                          // on ioExecutor thread
>  ├─ metadataHandle.discardState()                  // fs.delete(file1,false)  
>  ├─ for each OperatorState:                        // e.g. 5 operators
>  │    └─ for each subtask:                         // e.g. 200 subtasks
>  │         └─ for each state handle:               // e.g. 1-3 handles per 
> subtask
>  │              └─ FileStateHandle.discardState()  // fs.delete(fileN, false) 
>  ~"false" -> file deletion
>  └─ disposeStorageLocation()                       // fs.delete(emptyDir, 
> false) {code}
> In our company (Uber), we found that if the checkpoint has a large number of 
> files, like many thousand files, the deletion speed is slow due to the fact 
> that the job has to delete files one-by-one, and send an rpc call to storage 
> backend server (like OCI/GCS servers) per file. 
> We are thinking of one potential improvement, which is to do a directory 
> deletion instead of file-by-file deletion such that the job just calls 
> fs.delete on the checkpoint directory (like chk-1010 for example). This will 
> allow storage layer's batch deletion api (within GCS/OCI clients for example) 
> to kick in, so that they can fit multiple files deletion requests into a 
> single rpc call. This potentially can significantly improve checkpoint clean 
> up performance. We can do this for full checkpointing mode, as for this mode 
> all checkpoint files are hosted under the chk-N folder.
> Any thoughts/recommendations?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to