[
https://issues.apache.org/jira/browse/FLINK-39000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ivan Torres updated FLINK-39000:
--------------------------------
Description:
*Problem:* Operator state restore performs FSDataInputStream.seek(offset) for
every partition element in
OperatorStateRestoreOperation.deserializeOperatorStateValues, even when offsets
are sequential.
For checkpoints stored on object stores (S3, GCS, Ceph, etc), repeated seeks
can be very expensive (often triggering additional range reads or stream
resets). This can dominate restore time for large operator list state (e.g.,
tens/hundreds of thousands of partitions).
*Root cause:* Offsets for PartitionableListState are written sequentially
(captured via out.getPos() before each element is serialized), so after
deserializing one element the stream is typically already positioned at the
next offset; seeking again is redundant.
*Proposed change:* Track current stream position during restore and only call
seek when needed:
* Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos =
in.getPos();
* Scope: flink-runtime only; no state format or semantics changes.
*Expected result:* Restoring operator list state with sequential offsets avoids
redundant seeks (seek count drops from O(numPartitions) to ~1 per state/handle
in the common case). No change in restored state contents/order vs. current
behavior. Lastly, add a unit test in flink-runtime using an FSDataInputStream
wrapper that counts seek() calls to prevent regression (sequential offsets →
minimal seeks; non-sequential offsets still seek as needed).
*References:*
* flink/runtime/state/OperatorStateRestoreOperation.java
** Restore path does per-element in.seek(offset) unconditionally
* flink/runtime/state/PartitionableListState.java
** Offsets are captured via out.getPos() before serializing each element →
sequential offsets
* flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
** Snapshot writes operator list state via PartitionableListState.write(...),
typically through CompressibleFSDataOutputStream
* flink/runtime/state/CompressibleFSDataInputStream.java
** Compressed seek has extra work, making redundant seeks more expensive
–
Happy to contribute on this one :)
was:
*Problem:* Operator state restore performs FSDataInputStream.seek(offset) for
every partition element in
OperatorStateRestoreOperation.deserializeOperatorStateValues, even when offsets
are sequential.
For checkpoints stored on object stores (e.g., S3 / similar), repeated seeks
can be very expensive (often triggering additional range reads / stream
resets). This can dominate restore time for large operator list state (e.g.,
tens/hundreds of thousands of partitions).
*Root cause:* Offsets for PartitionableListState are written sequentially
(captured via out.getPos() before each element is serialized), so after
deserializing one element the stream is typically already positioned at the
next offset; seeking again is redundant.
*Proposed change:* Track current stream position during restore and only call
seek when needed:
* Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos =
in.getPos();
* Scope: flink-runtime only; no state format or semantics changes.
*Expected result:* Restoring operator list state with sequential offsets avoids
redundant seeks (seek count drops from O(numPartitions) to ~1 per state/handle
in the common case). No change in restored state contents/order vs. current
behavior. Lastly, add a unit test in flink-runtime using an FSDataInputStream
wrapper that counts seek() calls to prevent regression (sequential offsets →
minimal seeks; non-sequential offsets still seek as needed).
*References:*
* flink/runtime/state/OperatorStateRestoreOperation.java
** Restore path does per-element in.seek(offset) unconditionally
* flink/runtime/state/PartitionableListState.java
** Offsets are captured via out.getPos() before serializing each element →
sequential offsets
* flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
** Snapshot writes operator list state via PartitionableListState.write(...),
typically through CompressibleFSDataOutputStream
* flink/runtime/state/CompressibleFSDataInputStream.java
** Compressed seek has extra work, making redundant seeks more expensive
--
Happy to contribute on this one :)
> Avoid redundant seeks during operator list state restore
> --------------------------------------------------------
>
> Key: FLINK-39000
> URL: https://issues.apache.org/jira/browse/FLINK-39000
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.20.3, 2.1.1
> Reporter: Ivan Torres
> Priority: Minor
>
> *Problem:* Operator state restore performs FSDataInputStream.seek(offset) for
> every partition element in
> OperatorStateRestoreOperation.deserializeOperatorStateValues, even when
> offsets are sequential.
> For checkpoints stored on object stores (S3, GCS, Ceph, etc), repeated seeks
> can be very expensive (often triggering additional range reads or stream
> resets). This can dominate restore time for large operator list state (e.g.,
> tens/hundreds of thousands of partitions).
> *Root cause:* Offsets for PartitionableListState are written sequentially
> (captured via out.getPos() before each element is serialized), so after
> deserializing one element the stream is typically already positioned at the
> next offset; seeking again is redundant.
> *Proposed change:* Track current stream position during restore and only call
> seek when needed:
> * Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos =
> in.getPos();
> * Scope: flink-runtime only; no state format or semantics changes.
> *Expected result:* Restoring operator list state with sequential offsets
> avoids redundant seeks (seek count drops from O(numPartitions) to ~1 per
> state/handle in the common case). No change in restored state contents/order
> vs. current behavior. Lastly, add a unit test in flink-runtime using an
> FSDataInputStream wrapper that counts seek() calls to prevent regression
> (sequential offsets → minimal seeks; non-sequential offsets still seek as
> needed).
> *References:*
> * flink/runtime/state/OperatorStateRestoreOperation.java
> ** Restore path does per-element in.seek(offset) unconditionally
> * flink/runtime/state/PartitionableListState.java
> ** Offsets are captured via out.getPos() before serializing each element →
> sequential offsets
> * flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
> ** Snapshot writes operator list state via
> PartitionableListState.write(...), typically through
> CompressibleFSDataOutputStream
> * flink/runtime/state/CompressibleFSDataInputStream.java
> ** Compressed seek has extra work, making redundant seeks more expensive
> –
> Happy to contribute on this one :)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)