[
https://issues.apache.org/jira/browse/FLINK-39000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18055271#comment-18055271
]
Gabor Somogyi commented on FLINK-39000:
---------------------------------------
This is a long standing issue and would be very helpful to solve:)
> Avoid redundant seeks during operator list state restore
> --------------------------------------------------------
>
> Key: FLINK-39000
> URL: https://issues.apache.org/jira/browse/FLINK-39000
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.20.3, 2.1.1
> Reporter: Ivan Torres
> Priority: Minor
>
> *Problem:* Operator state restore performs FSDataInputStream.seek(offset) for
> every partition element in
> OperatorStateRestoreOperation.deserializeOperatorStateValues, even when
> offsets are sequential.
> For checkpoints stored on object stores (S3, GCS, Ceph, etc), repeated seeks
> can be very expensive (often triggering additional range reads or stream
> resets). This can dominate restore time for large operator list state (e.g.,
> tens/hundreds of thousands of partitions).
> *Root cause:* Offsets for PartitionableListState are written sequentially
> (captured via out.getPos() before each element is serialized), so after
> deserializing one element the stream is typically already positioned at the
> next offset; seeking again is redundant.
> *Proposed change:* Track current stream position during restore and only call
> seek when needed:
> * Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos =
> in.getPos();
> * Scope: flink-runtime only; no state format or semantics changes.
> *Expected result:* Restoring operator list state with sequential offsets
> avoids redundant seeks (seek count drops from O(numPartitions) to ~1 per
> state/handle in the common case). No change in restored state contents/order
> vs. current behavior. Lastly, add a unit test in flink-runtime using an
> FSDataInputStream wrapper that counts seek() calls to prevent regression
> (sequential offsets → minimal seeks; non-sequential offsets still seek as
> needed).
> *References:*
> * flink/runtime/state/OperatorStateRestoreOperation.java
> ** Restore path does per-element in.seek(offset) unconditionally
> * flink/runtime/state/PartitionableListState.java
> ** Offsets are captured via out.getPos() before serializing each element →
> sequential offsets
> * flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
> ** Snapshot writes operator list state via
> PartitionableListState.write(...), typically through
> CompressibleFSDataOutputStream
> * flink/runtime/state/CompressibleFSDataInputStream.java
> ** Compressed seek has extra work, making redundant seeks more expensive
> –
> Happy to contribute on this one :)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)