[ https://issues.apache.org/jira/browse/FLINK-17860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17113856#comment-17113856 ]
Roman Khachatryan commented on FLINK-17860: ------------------------------------------- I think FLINK-13856 describes a more general problem (any state handle types). This issue is about channel state handles only. They are: * always located on file system (so we don't miss anything by replacing discard() call with recursive delete) * their number depends on stream properties (backpressure), not operator logic * not incremental > Recursively remove channel state directories > -------------------------------------------- > > Key: FLINK-17860 > URL: https://issues.apache.org/jira/browse/FLINK-17860 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Affects Versions: 1.11.0 > Reporter: Roman Khachatryan > Assignee: Roman Khachatryan > Priority: Critical > Fix For: 1.11.0 > > > With a high degree of parallelism, we end up with n*s number of files in each > checkpoint (n = parallelism, s = stages). Writing them if fast (from many > subtasks), removing them is slow (from JM). > This can't be mitigated by state.backend.fs.memory-threshold because most > states are ten to hundreds Mb. > > Instead of going through them 1 by 1, we could remove the directory > recursively. > > The easiest way is to remove channelStateHandle.discard() calls and use > isRecursive=true in > FsCompletedCheckpointStorageLocation.disposeStorageLocation. > Note: with the current isRecursive=false there will be an exception if there > are any files left under that folder. > > This can be extended to other state handles in future as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)