[
https://issues.apache.org/jira/browse/FLINK-13856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058278#comment-18058278
]
Liu commented on FLINK-13856:
-----------------------------
Hello, [~fanrui]. I'd like to pick up this issue and provide an updated
implementation targeting the current Flink master (2.x).
h1. Current Situation (Flink 2.x master / 2.3-SNAPSHOT)
I've verified that the core problem described in this issue still exists in the
latest codebase:
# CompletedCheckpointDiscardObject.discard() (CompletedCheckpoint.java) still
calls StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()),
which iterates over every OperatorState → OperatorSubtaskState → individual
StateObject and issues separate file deletion RPCs. The async path
discardAsync() similarly creates one CompletableFuture per state object.
# FsCompletedCheckpointStorageLocation.disposeStorageLocation() still uses
fs.delete(exclusiveCheckpointDir, false) (non-recursive), which may leave
orphaned files if individual state deletions fail or if there are
subdirectories.
h1. Proposed Changes
Building upon the original idea and @dawidwys's POC branch, my updated approach
accounts for several architectural changes in Flink 2.x:
# Optimize CompletedCheckpointDiscardObject.discard() and discardAsync():
Instead of discarding ALL state objects individually, only collect and
explicitly discard state handles whose underlying data lives outside the
exclusive checkpoint directory (chk-N/). This includes:
## IncrementalRemoteKeyedStateHandle — its shared SST files reside under the
shared/ directory and are reference-counted by SharedStateRegistry
## ChangelogStateBackendHandle (implements CompositeStateHandle) — similarly
manages shared changelog state
## FileMergingOperatorStreamStateHandle — its directory handles (taskowned/
and shared/) are managed by SharedStateRegistry
All other state (exclusive/private) will be cleaned up by the subsequent
recursive directory deletion.
# Enable recursive deletion in
FsCompletedCheckpointStorageLocation.disposeStorageLocation(): Change
fs.delete(exclusiveCheckpointDir, false) to fs.delete(exclusiveCheckpointDir,
true) to reliably clean up the entire exclusive checkpoint directory.
h1. Addressing Previous Review Concerns
* S3 "pseudo-recursive" delete (@sewen's concern): While S3's recursive delete
is internally list + batch delete, modern Flink S3 connectors already optimize
this into efficient batch operations. The total number of API calls is still
significantly reduced compared to individual per-file deletions. We can
document this trade-off.
* Shared state safety (@tang_yun's concern): The implementation explicitly
separates shared state (managed by SharedStateRegistry reference counting) from
exclusive state. Only shared state handles are discarded individually;
exclusive state is cleaned up solely via directory deletion. This is safe
because shared state files reside in the shared/ directory, not under chk-N/.
* File Merging compatibility (new in 2.x):
FileMergingOperatorStreamStateHandle implements CompositeStateHandle and
registers its directory handles with SharedStateRegistry. These are collected
as shared handles for explicit cleanup, ensuring correct lifecycle management.
> Reduce the delete file api when the checkpoint is completed
> -----------------------------------------------------------
>
> Key: FLINK-13856
> URL: https://issues.apache.org/jira/browse/FLINK-13856
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.8.1, 1.9.0
> Reporter: Andrew.D.lin
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor,
> pull-request-available
> Attachments: after.png, before.png,
> f6cc56b7-2c74-4f4b-bb6a-476d28a22096.png
>
> Original Estimate: 48h
> Time Spent: 10m
> Remaining Estimate: 47h 50m
>
> When the new checkpoint is completed, an old checkpoint will be deleted by
> calling CompletedCheckpoint.discardOnSubsume().
> When deleting old checkpoints, follow these steps:
> 1, drop the metadata
> 2, discard private state objects
> 3, discard location as a whole
> In some cases, is it possible to delete the checkpoint folder recursively by
> one call?
> As far as I know the full amount of checkpoint, it should be possible to
> delete the folder directly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)