[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-33734:
--------------------------------
    Component/s: Runtime / Checkpointing
    Description: 
h3. Background

Unaligned checkpoint will write the inflight-data of all InputChannel and 
ResultSubpartition of the same subtask to the same file during checkpoint. The 
InputChannelStateHandle and ResultSubpartitionStateHandle organize the metadata 
of inflight-data at the channel granularity, which causes the file name to be 
repeated many times. When a job is under backpressure and task parallelism is 
high, the metadata of unaligned checkpoints will bloat. This will result in:
 # The amount of data reported by taskmanager to jobmanager increases, and 
jobmanager takes longer to process these RPC requests.
 # The metadata of the entire checkpoint becomes very large, and it takes 
longer to serialize and write it to dfs.

Both of the above points ultimately lead to longer checkpoint duration.
h3. A Production example

Take our production job with a parallelism of 4800 as an example:
 # When there is no back pressure, checkpoint end-to-end duration is within 7 
seconds.
 # When under pressure: checkpoint end-to-end duration often exceeds 1 minute. 
We found that jobmanager took more than 40 seconds to process rpc requests, and 
serialized metadata took more than 20 seconds.Some checkpoint statistics:
|metadata file size|950 MB|
|channel state count|12,229,854|
|channel file count|5536|
Of the 950MB in the metadata file, 68% are redundant file paths.

We enabled log-based checkpoint on this job and hoped that the checkpoint could 
be completed within 30 seconds. This problem made it difficult to achieve this 
goal.
h3. Propose changes

I suggest introducing MergedInputChannelStateHandle and 
MergedResultSubpartitionStateHandle to eliminate redundant file paths.

The taskmanager merges all InputChannelStateHandles with the same delegated 
StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
before reporting. When recovering from checkpoint, jobmangager converts 
MergedInputChannelStateHandle to InputChannelStateHandle collection before 
assigning state handle, and the rest of the process does not need to be 
changed. 

Structure of MergedInputChannelStateHandle :

 
{code:java}
{   // MergedInputChannelStateHandle
    "delegate": {
        "filePath": 
"viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1361195/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
        "stateSize": 123456
    },
    "size": 2000,
    "subtaskIndex":0,
    "channels": [ // One InputChannel per element
        {
            "info": {
                "gateIdx": 0,
                "inputChannelIdx": 0
            },
            "offsets": [
                100,200,300,400
            ],
            "size": 1400
        },
        {
            "info": {
                "gateIdx": 0,
                "inputChannelIdx": 1
            },
            "offsets": [
                500,600
            ],
            "size": 600
        }
    ]
}
 {code}
MergedResultSubpartitionStateHandle is similar.

 

 

WDYT [~roman] , [~pnowojski] , [~fanrui] ?
        Summary: Merge unaligned checkpoint state handle  (was: Merge unaligned 
checkpoint )

> Merge unaligned checkpoint state handle
> ---------------------------------------
>
>                 Key: FLINK-33734
>                 URL: https://issues.apache.org/jira/browse/FLINK-33734
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Feifan Wang
>            Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1361195/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to