[ 
https://issues.apache.org/jira/browse/FLINK-39000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18055271#comment-18055271
 ] 

Gabor Somogyi commented on FLINK-39000:
---------------------------------------

This is a long standing issue and would be very helpful to solve:)

> Avoid redundant seeks during operator list state restore
> --------------------------------------------------------
>
>                 Key: FLINK-39000
>                 URL: https://issues.apache.org/jira/browse/FLINK-39000
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.20.3, 2.1.1
>            Reporter: Ivan Torres
>            Priority: Minor
>
> *Problem:* Operator state restore performs FSDataInputStream.seek(offset) for 
> every partition element in 
> OperatorStateRestoreOperation.deserializeOperatorStateValues, even when 
> offsets are sequential.
> For checkpoints stored on object stores (S3, GCS, Ceph, etc), repeated seeks 
> can be very expensive (often triggering additional range reads or stream 
> resets). This can dominate restore time for large operator list state (e.g., 
> tens/hundreds of thousands of partitions).
> *Root cause:* Offsets for PartitionableListState are written sequentially 
> (captured via out.getPos() before each element is serialized), so after 
> deserializing one element the stream is typically already positioned at the 
> next offset; seeking again is redundant.
> *Proposed change:* Track current stream position during restore and only call 
> seek when needed:
>  * Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos = 
> in.getPos();
>  * Scope: flink-runtime only; no state format or semantics changes.
> *Expected result:* Restoring operator list state with sequential offsets 
> avoids redundant seeks (seek count drops from O(numPartitions) to ~1 per 
> state/handle in the common case). No change in restored state contents/order 
> vs. current behavior. Lastly, add a unit test in flink-runtime using an 
> FSDataInputStream wrapper that counts seek() calls to prevent regression 
> (sequential offsets → minimal seeks; non-sequential offsets still seek as 
> needed).
> *References:*
>  * flink/runtime/state/OperatorStateRestoreOperation.java
>  ** Restore path does per-element in.seek(offset) unconditionally
>  * flink/runtime/state/PartitionableListState.java
>  ** Offsets are captured via out.getPos() before serializing each element → 
> sequential offsets
>  * flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
>  ** Snapshot writes operator list state via 
> PartitionableListState.write(...), typically through 
> CompressibleFSDataOutputStream
>  * flink/runtime/state/CompressibleFSDataInputStream.java
>  ** Compressed seek has extra work, making redundant seeks more expensive
> –
> Happy to contribute on this one :)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to