Ivan Torres created FLINK-39308:
-----------------------------------

             Summary: Skip empty file-merging operator state snapshots for 
empty operator list state
                 Key: FLINK-39308
                 URL: https://issues.apache.org/jira/browse/FLINK-39308
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 2.2.0, 1.20.3
            Reporter: Ivan Torres


*Problem:*


When operator list state is registered but empty, 
DefaultOperatorStateBackendSnapshotStrategy still falls through to the normal 
snapshot path. For file-merging checkpoints this can materialize segment-backed 
operator state handles even though the corresponding operator state has zero 
offsets.

During restore, OperatorStateRestoreOperation opens those tiny segment-backed 
handles and reads their metadata. On object stores, this adds avoidable 
range-read and open overhead, especially when many tasks register empty 
operator state.

*Root cause:*

DefaultOperatorStateBackendSnapshotStrategy.asyncSnapshot() only uses the empty 
fast path when there are no registered operator states and no registered 
broadcast states. It does not treat the common case "registered operator list 
states exist but all are empty" as empty.

*Proposed change:*

If there are no broadcast states and every registered operator list state is 
empty, return the same empty snapshot result used by the existing fully-empty 
fast path:
 * SnapshotResult.empty() for non-file-merging checkpoints
 * EmptyFileMergingOperatorStreamStateHandle.create(...) for file-merging 
checkpoints

This keeps non-empty state unchanged and avoids creating segment-backed files 
for empty operator state.

*Expected outcome:*
 * Empty registered operator list state no longer produces tiny file-merging 
segments.
 * Restore skips opening zero-partition file-merging handles.
 * Object-store-backed restores improve when many empty operator states are 
present.
 * No change in restored contents/order for non-empty operator state.

*Validation:*
 * Added focused runtime tests for empty registered operator state snapshots 
and file-merging restore.
 * Ran OperatorStateBackendTest, OperatorStateRestoreOperationTest, and 
SharedStateRegistryTest successfully.
 * In a production-shaped Ceph/S3A lab benchmark derived from real checkpoint 
metadata, this reduced segment-backed operator handles from 192 to 11 and 
improved task deploy->running from 13.15s to 9.50s (~27.7%).

*References:*
 * 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
 * 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
 * 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java
 * 
flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to