[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834603#comment-17834603
 ] 

Zakelly Lan commented on FLINK-33734:
-------------------------------------

I'd +1 for this optimization.

> Merge unaligned checkpoint state handle
> ---------------------------------------
>
>                 Key: FLINK-33734
>                 URL: https://issues.apache.org/jira/browse/FLINK-33734
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Feifan Wang
>            Assignee: Feifan Wang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flamegraph.control-group.html, 
> flamegraph.merge-handle-and-serialize-on-tm.html, 
> flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png
>
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to