[ https://issues.apache.org/jira/browse/FLINK-19911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226512#comment-17226512 ]
fanrui commented on FLINK-19911: -------------------------------- [~yunta], thx. Could you have time to help review it? Thank you. > Read checkpoint stream with buffer to speedup restore > ----------------------------------------------------- > > Key: FLINK-19911 > URL: https://issues.apache.org/jira/browse/FLINK-19911 > Project: Flink > Issue Type: Improvement > Components: FileSystems, Runtime / Checkpointing, Runtime / State > Backends > Affects Versions: 1.12.0, 1.11.3, 1.13.0 > Environment: Flink version: 1.10 > StateBackend : FsStateBackend > code: Flink SQL count(distinct userId) > uv: 10 million > State size: 200M > TM total memory: 16G > Parallelism: 1 > Reporter: fanrui > Assignee: fanrui > Priority: Major > Labels: pull-request-available > > Heap StateBackend needs to serialize each Java Object into the file system > during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to > read kvs from RocksDB and write them to the file system in the snapshot. > The above two cases involve a lot of small io, not large io, frequent small > io is not friendly to disk. Therefore, the buffer is used in the checkpoint > snapshot writing process of the file system. For details, refer to the buffer > of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}. > There will be many small IOs in the restore process, but restore does not > have a buffer. So I added a buffer and tested it based on Flink job. > h2. Flink Job environment: > {code:java} > Flink version: 1.10 > StateBackend : FsStateBackend > code: Flink SQL count(distinct userId) > uv: 10 million > State size: 200M > TM total memory: 16G > Parallelism: 1{code} > It takes 33.1s to restore without read buffer, and 12.8s to restore with read > buffer. > h2. How to do it? > Use FSDataBufferedInputStream to wrap fsDataInputStream in > HeapRestoreOperation#restore,code: > {code:java} > FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream(); > FSDataInputStream bufferedInputStream = new > FSDataBufferedInputStream(fsDataInputStream); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)