[
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Feifan Wang updated FLINK-33734:
--------------------------------
Attachment: flamegraph.control-group.html
flamegraph.merge-handle-and-serialize-on-tm.html
flamegraph.only-merge-handle.html
> Merge unaligned checkpoint state handle
> ---------------------------------------
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Reporter: Feifan Wang
> Assignee: Feifan Wang
> Priority: Major
> Labels: pull-request-available
> Attachments: flamegraph.control-group.html,
> flamegraph.merge-handle-and-serialize-on-tm.html,
> flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png
>
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and
> ResultSubpartition of the same subtask to the same file during checkpoint.
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the
> metadata of inflight-data at the channel granularity, which causes the file
> name to be repeated many times. When a job is under backpressure and task
> parallelism is high, the metadata of unaligned checkpoints will bloat. This
> will result in:
> # The amount of data reported by taskmanager to jobmanager increases, and
> jobmanager takes longer to process these RPC requests.
> # The metadata of the entire checkpoint becomes very large, and it takes
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
> # When there is no back pressure, checkpoint end-to-end duration is within 7
> seconds.
> # When under pressure: checkpoint end-to-end duration often exceeds 1
> minute. We found that jobmanager took more than 40 seconds to process rpc
> requests, and serialized metadata took more than 20 seconds.Some checkpoint
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint
> could be completed within 30 seconds. This problem made it difficult to
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle
> before reporting. When recovering from checkpoint, jobmangager converts
> MergedInputChannelStateHandle to InputChannelStateHandle collection before
> assigning state handle, and the rest of the process does not need to be
> changed.
> Structure of MergedInputChannelStateHandle :
>
> {code:java}
> { // MergedInputChannelStateHandle
> "delegate": {
> "filePath":
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
> "stateSize": 123456
> },
> "size": 2000,
> "subtaskIndex":0,
> "channels": [ // One InputChannel per element
> {
> "info": {
> "gateIdx": 0,
> "inputChannelIdx": 0
> },
> "offsets": [
> 100,200,300,400
> ],
> "size": 1400
> },
> {
> "info": {
> "gateIdx": 0,
> "inputChannelIdx": 1
> },
> "offsets": [
> 500,600
> ],
> "size": 600
> }
> ]
> }
> {code}
> MergedResultSubpartitionStateHandle is similar.
>
>
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)