[
https://issues.apache.org/jira/browse/FLINK-19911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224458#comment-17224458
]
fanrui commented on FLINK-19911:
--------------------------------
Adjust the buffer size and do some more detailed tests on Flink job.
h2. Flink Job environment:
{code:java}
Flink version: 1.10
StateBackend : FsStateBackend
code: Flink SQL count(distinct userId)
uv: 10 million
State size: 200M
TM total memory: 16G
Parallelism: 1{code}
Flink job [code
link|https://github.com/1996fanrui/fanrui-learning/blob/eb9ed4ef17374e865850987b63a97fa7d8ba6886/module-flink/src/main/java/com/dream/flink/io/BigStateIOOptimization.java]
h2. Restore time-consuming calculation rules:
HeapRestoreOperation#restore records startTime at the head of the method, and
endTime at the end of the method. ʻendTime-startTime` indicates that the
restore takes time.
_*The above environment is very clear and easy to reproduce the following test
results.*_
h2. Test Results:
without buffer : avg restore duration = 30.6 s
{code:java}
restore duration : 33186 ms
restore duration : 28658 ms
restore duration : 30254 ms
{code}
buffer size = 1KB : avg restore duration = 9.1 s
{code:java}
restore duration : 9428 ms
restore duration : 9959 ms
restore duration : 9696 ms
restore duration : 7424 ms
restore duration : 9485 ms
{code}
buffer size = 4KB : avg restore duration = 9.0 s
{code:java}
restore duration : 10650 ms
restore duration : 9087 ms
restore duration : 8936 ms
restore duration : 9126 ms
restore duration : 7527 ms
{code}
buffer size = 8KB : avg restore duration = 11.7 s
{code:java}
restore duration : 11221 ms
restore duration : 9863 ms
restore duration : 13022 ms
restore duration : 12489 ms
restore duration : 12145 ms
{code}
h2. Conclusion:
When buffer is added, restore time is reduced to one-third of the original.
> Read checkpoint stream with buffer to speedup restore
> -----------------------------------------------------
>
> Key: FLINK-19911
> URL: https://issues.apache.org/jira/browse/FLINK-19911
> Project: Flink
> Issue Type: Improvement
> Components: FileSystems, Runtime / Checkpointing, Runtime / State
> Backends
> Affects Versions: 1.12.0, 1.11.3, 1.13.0
> Environment: Flink version: 1.10
> StateBackend : FsStateBackend
> code: Flink SQL count(distinct userId)
> uv: 10 million
> State size: 200M
> TM total memory: 16G
> Parallelism: 1
> Reporter: fanrui
> Priority: Major
>
> Heap StateBackend needs to serialize each Java Object into the file system
> during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to
> read kvs from RocksDB and write them to the file system in the snapshot.
> The above two cases involve a lot of small io, not large io, frequent small
> io is not friendly to disk. Therefore, the buffer is used in the checkpoint
> snapshot writing process of the file system. For details, refer to the buffer
> of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}.
> There will be many small IOs in the restore process, but restore does not
> have a buffer. So I added a buffer and tested it based on Flink job.
> h2. Flink Job environment:
> {code:java}
> Flink version: 1.10
> StateBackend : FsStateBackend
> code: Flink SQL count(distinct userId)
> uv: 10 million
> State size: 200M
> TM total memory: 16G
> Parallelism: 1{code}
> It takes 33.1s to restore without read buffer, and 12.8s to restore with read
> buffer.
> h2. How to do it?
> Use FSDataBufferedInputStream to wrap fsDataInputStream in
> HeapRestoreOperation#restore,code:
> {code:java}
> FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
> FSDataInputStream bufferedInputStream = new
> FSDataBufferedInputStream(fsDataInputStream);
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)