[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793998#comment-17793998 ]
Feifan Wang commented on FLINK-33734: ------------------------------------- Thanks to [~Zakelly] , [~pnowojski] and [~fanrui] for participating in the discussion. to [~Zakelly] : Yes, this proposal only aims to reduce the meta size of unaligned checkpoint. I also think that FLIP-306 does not solve the above problems. At the same time, I think my above proposal can work with FLIP-306. to [~pnowojski] : {quote}That sending out the RPCs during recovery will take a long time? {quote} Yes,in theory sending these rpc during recovery also takes a long time, but we have not paid attention to it before. First, because our job can accept a recovery time of several minutes from a business perspective. The second is that this kind of checkpoint only occurs during backpressure, and we have not tried to use this kind of checkpoint to restore the job. {quote}Wouldn't it be better to keep the state handles merged during recovery until they reach their destined subtasks on TMs? {quote} I hold the same view with [~fanrui] on this issue. It is acceptable to me to solve the problems during checkpoint creation and recovery in two steps. to [~fanrui] : {quote}Can we think the _metadata file size will be reduced 68% after this proposal? {quote} Yes, but only for checkpoints where unaligned checkpoint handles account for the vast majority as mentioned above. {quote}How does flink serialize the MergedInputChannelStateHandle? Does it store the field name? {quote} The current serialization method of metadata objects is compact, and field names are not saved in the file. The serialization of each handle is hardcoded. > Merge unaligned checkpoint state handle > --------------------------------------- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Reporter: Feifan Wang > Assignee: Feifan Wang > Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)