[
https://issues.apache.org/jira/browse/FLINK-39308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18068139#comment-18068139
]
Ivan Torres commented on FLINK-39308:
-------------------------------------
Oh good catch. I checked this more carefully in the runtime code and with the
current patch the restored state can get inconsistent for valid empty lists.
For operator list state, the snapshot metadata also carries the state name,
assignment mode, and serializer snapshot. So, in the "all registered operator
list states are empty" case, that metadata would no longer be persisted. With
this, on restore, Flink would treat such state as newly created instead of
restored-empty, and the usual compatibility checks for operator state would be
skipped, which is not good.
So... I shall change the approach. I think a better alternative may be to
preserve the operator-state metadata while still avoiding the tiny file-merging
segments. Something like this:
* keep the `FileMergingOperatorStreamStateHandle` so the directory handles are
still tracked
* keep the operator-state entries and their empty offsets in the handle
metadata
* BUT... store only the serialized operator backend metadata payload in a
lightweight in-memory `ByteStreamStateHandle` instead of a segment-backed file
handle
That way restore would still reconstruct the operator-state meta info and run
the normal compatibility checks, but it would no longer need to open tiny
segment files for empty operator state, which is the real problem here.
Thoughts?
> Skip empty file-merging operator state snapshots for empty operator list state
> ------------------------------------------------------------------------------
>
> Key: FLINK-39308
> URL: https://issues.apache.org/jira/browse/FLINK-39308
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.20.3, 2.2.0
> Reporter: Ivan Torres
> Priority: Minor
> Labels: pull-request-available
>
> *Problem:*
> When operator list state is registered but empty,
> DefaultOperatorStateBackendSnapshotStrategy still falls through to the normal
> snapshot path. For file-merging checkpoints this can materialize
> segment-backed operator state handles even though the corresponding operator
> state has zero offsets.
> During restore, OperatorStateRestoreOperation opens those tiny segment-backed
> handles and reads their metadata. On object stores, this adds avoidable
> range-read and open overhead, especially when many tasks register empty
> operator state.
> *Root cause:*
> DefaultOperatorStateBackendSnapshotStrategy.asyncSnapshot() only uses the
> empty fast path when there are no registered operator states and no
> registered broadcast states. It does not treat the common case "registered
> operator list states exist but all are empty" as empty.
> *Proposed change:*
> If there are no broadcast states and every registered operator list state is
> empty, return the same empty snapshot result used by the existing fully-empty
> fast path:
> * SnapshotResult.empty() for non-file-merging checkpoints
> * EmptyFileMergingOperatorStreamStateHandle.create(...) for file-merging
> checkpoints
> This keeps non-empty state unchanged and avoids creating segment-backed files
> for empty operator state.
> *Expected outcome:*
> * Empty registered operator list state no longer produces tiny file-merging
> segments.
> * Restore skips opening zero-partition file-merging handles.
> * Object-store-backed restores improve when many empty operator states are
> present.
> * No change in restored contents/order for non-empty operator state.
> *Validation:*
> * Added focused runtime tests for empty registered operator state snapshots
> and file-merging restore.
> * Ran OperatorStateBackendTest, OperatorStateRestoreOperationTest, and
> SharedStateRegistryTest successfully.
> * In a production-shaped Ceph/S3A lab benchmark derived from real checkpoint
> metadata, this reduced segment-backed operator handles from 192 to 11 and
> improved task deploy->running from 13.15s to 9.50s (~27.7%).
> *References:*
> *
> flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
> *
> flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
> *
> flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java
> *
> flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
--
This message was sent by Atlassian Jira
(v8.20.10#820010)