[ 
https://issues.apache.org/jira/browse/FLINK-39308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18068626#comment-18068626
 ] 

Zakelly Lan commented on FLINK-39308:
-------------------------------------

[~ivantorres] I'm wondering have you tried setting 
'execution.checkpointing.file-merging.across-checkpoint-boundary' = 'true' . 
This will merge the small segmented files across the checkpoint into bigger 
ones, so the files will grow large as checkpoints are continuously taken. But 
I'm not sure if the small files issue itself is your focus, it seems you want 
to eliminate the read overhead?

For non-merging scenario, what if you enlarge the 
'execution.checkpointing.data-inline-threshold', will the small metadata be in 
the ByteStreamStateHandle? Does that resolve this problem?

And IIUC when enabling the file-merging, the ByteStreamStateHandle is never 
used even if the file is small. I think it's better we have 
ByteStreamStateHandle cooperate with the file-merging depending on file size. 
If the options for non-merging scenario above solves your problem, then this is 
the approach I prefer.

> Skip empty file-merging operator state snapshots for empty operator list state
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-39308
>                 URL: https://issues.apache.org/jira/browse/FLINK-39308
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.20.3, 2.2.0
>            Reporter: Ivan Torres
>            Priority: Minor
>              Labels: pull-request-available
>
> *Problem:*
> When operator list state is registered but empty, 
> DefaultOperatorStateBackendSnapshotStrategy still falls through to the normal 
> snapshot path. For file-merging checkpoints this can materialize 
> segment-backed operator state handles even though the corresponding operator 
> state has zero offsets.
> During restore, OperatorStateRestoreOperation opens those tiny segment-backed 
> handles and reads their metadata. On object stores, this adds avoidable 
> range-read and open overhead, especially when many tasks register empty 
> operator state.
> *Root cause:*
> DefaultOperatorStateBackendSnapshotStrategy.asyncSnapshot() only uses the 
> empty fast path when there are no registered operator states and no 
> registered broadcast states. It does not treat the common case "registered 
> operator list states exist but all are empty" as empty.
> *Proposed change:*
> If there are no broadcast states and every registered operator list state is 
> empty, return the same empty snapshot result used by the existing fully-empty 
> fast path:
>  * SnapshotResult.empty() for non-file-merging checkpoints
>  * EmptyFileMergingOperatorStreamStateHandle.create(...) for file-merging 
> checkpoints
> This keeps non-empty state unchanged and avoids creating segment-backed files 
> for empty operator state.
> *Expected outcome:*
>  * Empty registered operator list state no longer produces tiny file-merging 
> segments.
>  * Restore skips opening zero-partition file-merging handles.
>  * Object-store-backed restores improve when many empty operator states are 
> present.
>  * No change in restored contents/order for non-empty operator state.
> *Validation:*
>  * Added focused runtime tests for empty registered operator state snapshots 
> and file-merging restore.
>  * Ran OperatorStateBackendTest, OperatorStateRestoreOperationTest, and 
> SharedStateRegistryTest successfully.
>  * In a production-shaped Ceph/S3A lab benchmark derived from real checkpoint 
> metadata, this reduced segment-backed operator handles from 192 to 11 and 
> improved task deploy->running from 13.15s to 9.50s (~27.7%).
> *References:*
>  * 
> flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
>  * 
> flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
>  * 
> flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java
>  * 
> flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to