[ 
https://issues.apache.org/jira/browse/FLINK-13856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058278#comment-18058278
 ] 

Liu commented on FLINK-13856:
-----------------------------

Hello, [~fanrui]. I'd like to pick up this issue and provide an updated 
implementation targeting the current Flink master (2.x).
h1. Current Situation (Flink 2.x master / 2.3-SNAPSHOT)

I've verified that the core problem described in this issue still exists in the 
latest codebase:
 # CompletedCheckpointDiscardObject.discard() (CompletedCheckpoint.java) still 
calls StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()), 
which iterates over every OperatorState → OperatorSubtaskState → individual 
StateObject and issues separate file deletion RPCs. The async path 
discardAsync() similarly creates one CompletableFuture per state object.
 # FsCompletedCheckpointStorageLocation.disposeStorageLocation() still uses 
fs.delete(exclusiveCheckpointDir, false) (non-recursive), which may leave 
orphaned files if individual state deletions fail or if there are 
subdirectories.

h1. Proposed Changes

Building upon the original idea and @dawidwys's POC branch, my updated approach 
accounts for several architectural changes in Flink 2.x:
 # Optimize CompletedCheckpointDiscardObject.discard() and discardAsync(): 
Instead of discarding ALL state objects individually, only collect and 
explicitly discard state handles whose underlying data lives outside the 
exclusive checkpoint directory (chk-N/). This includes:

 ## IncrementalRemoteKeyedStateHandle — its shared SST files reside under the 
shared/ directory and are reference-counted by SharedStateRegistry
 ## ChangelogStateBackendHandle (implements CompositeStateHandle) — similarly 
manages shared changelog state
 ## FileMergingOperatorStreamStateHandle — its directory handles (taskowned/ 
and shared/) are managed by SharedStateRegistry
All other state (exclusive/private) will be cleaned up by the subsequent 
recursive directory deletion.
 # Enable recursive deletion in 
FsCompletedCheckpointStorageLocation.disposeStorageLocation(): Change 
fs.delete(exclusiveCheckpointDir, false) to fs.delete(exclusiveCheckpointDir, 
true) to reliably clean up the entire exclusive checkpoint directory.

h1. Addressing Previous Review Concerns
 * S3 "pseudo-recursive" delete (@sewen's concern): While S3's recursive delete 
is internally list + batch delete, modern Flink S3 connectors already optimize 
this into efficient batch operations. The total number of API calls is still 
significantly reduced compared to individual per-file deletions. We can 
document this trade-off.
 * Shared state safety (@tang_yun's concern): The implementation explicitly 
separates shared state (managed by SharedStateRegistry reference counting) from 
exclusive state. Only shared state handles are discarded individually; 
exclusive state is cleaned up solely via directory deletion. This is safe 
because shared state files reside in the shared/ directory, not under chk-N/.
 * File Merging compatibility (new in 2.x): 
FileMergingOperatorStreamStateHandle implements CompositeStateHandle and 
registers its directory handles with SharedStateRegistry. These are collected 
as shared handles for explicit cleanup, ensuring correct lifecycle management.

> Reduce the delete file api when the checkpoint is completed
> -----------------------------------------------------------
>
>                 Key: FLINK-13856
>                 URL: https://issues.apache.org/jira/browse/FLINK-13856
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.8.1, 1.9.0
>            Reporter: Andrew.D.lin
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>         Attachments: after.png, before.png, 
> f6cc56b7-2c74-4f4b-bb6a-476d28a22096.png
>
>   Original Estimate: 48h
>          Time Spent: 10m
>  Remaining Estimate: 47h 50m
>
> When the new checkpoint is completed, an old checkpoint will be deleted by 
> calling CompletedCheckpoint.discardOnSubsume().
> When deleting old checkpoints, follow these steps:
> 1, drop the metadata
> 2, discard private state objects
> 3, discard location as a whole
> In some cases, is it possible to delete the checkpoint folder recursively by 
> one call?
> As far as I know the full amount of checkpoint, it should be possible to 
> delete the folder directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to