[ 
https://issues.apache.org/jira/browse/FLINK-39000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Torres updated FLINK-39000:
--------------------------------
    Description: 
*Problem:* Operator state restore performs FSDataInputStream.seek(offset) for 
every partition element in 
OperatorStateRestoreOperation.deserializeOperatorStateValues, even when offsets 
are sequential.

For checkpoints stored on object stores (S3, GCS, Ceph, etc), repeated seeks 
can be very expensive (often triggering additional range reads or stream 
resets). This can dominate restore time for large operator list state (e.g., 
tens/hundreds of thousands of partitions).

*Root cause:* Offsets for PartitionableListState are written sequentially 
(captured via out.getPos() before each element is serialized), so after 
deserializing one element the stream is typically already positioned at the 
next offset; seeking again is redundant.

*Proposed change:* Track current stream position during restore and only call 
seek when needed:
 * Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos = 
in.getPos();
 * Scope: flink-runtime only; no state format or semantics changes.

*Expected result:* Restoring operator list state with sequential offsets avoids 
redundant seeks (seek count drops from O(numPartitions) to ~1 per state/handle 
in the common case). No change in restored state contents/order vs. current 
behavior. Lastly, add a unit test in flink-runtime using an FSDataInputStream 
wrapper that counts seek() calls to prevent regression (sequential offsets → 
minimal seeks; non-sequential offsets still seek as needed).

*References:*
 * flink/runtime/state/OperatorStateRestoreOperation.java
 ** Restore path does per-element in.seek(offset) unconditionally
 * flink/runtime/state/PartitionableListState.java
 ** Offsets are captured via out.getPos() before serializing each element → 
sequential offsets
 * flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
 ** Snapshot writes operator list state via PartitionableListState.write(...), 
typically through CompressibleFSDataOutputStream
 * flink/runtime/state/CompressibleFSDataInputStream.java
 ** Compressed seek has extra work, making redundant seeks more expensive

–

Happy to contribute on this one :)

  was:
*Problem:* Operator state restore performs FSDataInputStream.seek(offset) for 
every partition element in 
OperatorStateRestoreOperation.deserializeOperatorStateValues, even when offsets 
are sequential.

For checkpoints stored on object stores (e.g., S3 / similar), repeated seeks 
can be very expensive (often triggering additional range reads / stream 
resets). This can dominate restore time for large operator list state (e.g., 
tens/hundreds of thousands of partitions).

*Root cause:* Offsets for PartitionableListState are written sequentially 
(captured via out.getPos() before each element is serialized), so after 
deserializing one element the stream is typically already positioned at the 
next offset; seeking again is redundant.

*Proposed change:* Track current stream position during restore and only call 
seek when needed:
 * Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos = 
in.getPos();
 * Scope: flink-runtime only; no state format or semantics changes.

*Expected result:* Restoring operator list state with sequential offsets avoids 
redundant seeks (seek count drops from O(numPartitions) to ~1 per state/handle 
in the common case). No change in restored state contents/order vs. current 
behavior. Lastly, add a unit test in flink-runtime using an FSDataInputStream 
wrapper that counts seek() calls to prevent regression (sequential offsets → 
minimal seeks; non-sequential offsets still seek as needed).

*References:*
 * flink/runtime/state/OperatorStateRestoreOperation.java
 ** Restore path does per-element in.seek(offset) unconditionally
 * flink/runtime/state/PartitionableListState.java
 ** Offsets are captured via out.getPos() before serializing each element → 
sequential offsets
 * flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
 ** Snapshot writes operator list state via PartitionableListState.write(...), 
typically through CompressibleFSDataOutputStream
 * flink/runtime/state/CompressibleFSDataInputStream.java
 ** Compressed seek has extra work, making redundant seeks more expensive

--

Happy to contribute on this one :)


> Avoid redundant seeks during operator list state restore
> --------------------------------------------------------
>
>                 Key: FLINK-39000
>                 URL: https://issues.apache.org/jira/browse/FLINK-39000
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.20.3, 2.1.1
>            Reporter: Ivan Torres
>            Priority: Minor
>
> *Problem:* Operator state restore performs FSDataInputStream.seek(offset) for 
> every partition element in 
> OperatorStateRestoreOperation.deserializeOperatorStateValues, even when 
> offsets are sequential.
> For checkpoints stored on object stores (S3, GCS, Ceph, etc), repeated seeks 
> can be very expensive (often triggering additional range reads or stream 
> resets). This can dominate restore time for large operator list state (e.g., 
> tens/hundreds of thousands of partitions).
> *Root cause:* Offsets for PartitionableListState are written sequentially 
> (captured via out.getPos() before each element is serialized), so after 
> deserializing one element the stream is typically already positioned at the 
> next offset; seeking again is redundant.
> *Proposed change:* Track current stream position during restore and only call 
> seek when needed:
>  * Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos = 
> in.getPos();
>  * Scope: flink-runtime only; no state format or semantics changes.
> *Expected result:* Restoring operator list state with sequential offsets 
> avoids redundant seeks (seek count drops from O(numPartitions) to ~1 per 
> state/handle in the common case). No change in restored state contents/order 
> vs. current behavior. Lastly, add a unit test in flink-runtime using an 
> FSDataInputStream wrapper that counts seek() calls to prevent regression 
> (sequential offsets → minimal seeks; non-sequential offsets still seek as 
> needed).
> *References:*
>  * flink/runtime/state/OperatorStateRestoreOperation.java
>  ** Restore path does per-element in.seek(offset) unconditionally
>  * flink/runtime/state/PartitionableListState.java
>  ** Offsets are captured via out.getPos() before serializing each element → 
> sequential offsets
>  * flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
>  ** Snapshot writes operator list state via 
> PartitionableListState.write(...), typically through 
> CompressibleFSDataOutputStream
>  * flink/runtime/state/CompressibleFSDataInputStream.java
>  ** Compressed seek has extra work, making redundant seeks more expensive
> –
> Happy to contribute on this one :)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to