[ 
https://issues.apache.org/jira/browse/FLINK-19911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224406#comment-17224406
 ] 

fanrui commented on FLINK-19911:
--------------------------------

Flink job [code link| 
[https://github.com/1996fanrui/fanrui-learning/blob/eb9ed4ef17374e865850987b63a97fa7d8ba6886/module-flink/src/main/java/com/dream/flink/io/BigStateIOOptimization.java]]

> Read checkpoint stream with buffer to speedup restore
> -----------------------------------------------------
>
>                 Key: FLINK-19911
>                 URL: https://issues.apache.org/jira/browse/FLINK-19911
>             Project: Flink
>          Issue Type: Improvement
>          Components: FileSystems, Runtime / Checkpointing, Runtime / State 
> Backends
>    Affects Versions: 1.12.0, 1.11.3, 1.13.0
>         Environment: Flink version: 1.10
> StateBackend : FsStateBackend 
> code: Flink SQL count(distinct userId)
> uv: 10 million
> State size: 200M
> TM total memory: 16G
> Parallelism: 1
>            Reporter: fanrui
>            Priority: Major
>
> Heap StateBackend needs to serialize each Java Object into the file system 
> during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to 
> read kvs from RocksDB and write them to the file system in the snapshot.
> The above two cases involve a lot of small io, not large io, frequent small 
> io is not friendly to disk. Therefore, the buffer is used in the checkpoint 
> snapshot writing process of the file system. For details, refer to the buffer 
> of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}.
> There will be many small IOs in the restore process, but restore does not 
> have a buffer. So I added a buffer and tested it based on Flink job.
> h2. Flink Job environment:
> {code:java}
>  Flink version: 1.10
>  StateBackend : FsStateBackend 
>  code: Flink SQL count(distinct userId)
>  uv: 10 million
>  State size: 200M
>  TM total memory: 16G
>  Parallelism: 1{code}
> It takes 33.1s to restore without read buffer, and 12.8s to restore with read 
> buffer.
> h2. How to do it?
> Use FSDataBufferedInputStream to wrap fsDataInputStream in 
> HeapRestoreOperation#restore,code:
> {code:java}
> FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
> FSDataInputStream bufferedInputStream = new 
> FSDataBufferedInputStream(fsDataInputStream);
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to