[
https://issues.apache.org/jira/browse/FLINK-19911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-19911:
-----------------------------------
Labels: auto-deprioritized-major auto-unassigned pull-request-available
stale-minor (was: auto-deprioritized-major auto-unassigned
pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> add read buffer for input stream
> --------------------------------
>
> Key: FLINK-19911
> URL: https://issues.apache.org/jira/browse/FLINK-19911
> Project: Flink
> Issue Type: Improvement
> Components: FileSystems, Runtime / Checkpointing, Runtime / State
> Backends
> Affects Versions: 1.11.3, 1.12.0, 1.13.0
> Environment: Flink version: 1.10
> StateBackend : FsStateBackend
> code: Flink SQL count(distinct userId)
> uv: 10 million
> State size: 200M
> TM total memory: 16G
> Parallelism: 1
> Reporter: future
> Priority: Minor
> Labels: auto-deprioritized-major, auto-unassigned,
> pull-request-available, stale-minor
>
> Heap StateBackend needs to serialize each Java Object into the file system
> during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to
> read kvs from RocksDB and write them to the file system in the snapshot.
> The above two cases involve a lot of small io, not large io, frequent small
> io is not friendly to disk. Therefore, the buffer is used in the checkpoint
> snapshot writing process of the file system. For details, refer to the buffer
> of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}.
> There will be many small IOs in the restore process, but restore does not
> have a buffer. So I added a buffer and tested it based on Flink job.
> h2. Flink Job environment:
> {code:java}
> Flink version: 1.10
> StateBackend : FsStateBackend
> code: Flink SQL count(distinct userId)
> uv: 10 million
> State size: 200M
> TM total memory: 16G
> Parallelism: 1{code}
> It takes 33.1s to restore without read buffer, and 12.8s to restore with read
> buffer.
> h2. How to do it?
> Use FSDataBufferedInputStream to wrap fsDataInputStream in
> HeapRestoreOperation#restore,code:
> {code:java}
> FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
> FSDataInputStream bufferedInputStream = new
> FSDataBufferedInputStream(fsDataInputStream);
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)