[ 
https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17532058#comment-17532058
 ] 

fanrui commented on FLINK-26803:
--------------------------------

Hi [~pnowojski]  [~akalashnikov] , I want to add some information about code 
implementation.

I think the synchronization is easy for ChannelState Files, we just need to 
modify the code related to ChannelStateWrite and don't change the 
AsyncCheckpointRunnable. OperatorSnapshotFinalizer is just get the future in 
the AsyncCheckpointRunnable#run. 

Currently, ChannelStateWriterImpl will create 
ChannelStateWriteRequestExecutorImpl for each Task and subtask. 
ChannelStateWriteRequestExecutorImpl will start a Thread to process the 
ChannelStateWriteRequest. After finish write, 
ChannelStateCheckpointWriter#finishWriteAndResult will complete bufferFuture.

We can share the ChannelStateCheckpointWriter for each Task and same 
checkpointId to finish this feature, right?

Please help to check it. And please correct me if I'm wrong.

 

!image-2022-05-05-12-36-09-969.png!

> Merge small ChannelState file for Unaligned Checkpoint
> ------------------------------------------------------
>
>                 Key: FLINK-26803
>                 URL: https://issues.apache.org/jira/browse/FLINK-26803
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Network
>            Reporter: fanrui
>            Priority: Major
>         Attachments: image-2022-05-05-12-36-09-969.png
>
>
> When making an unaligned checkpoint, the number of ChannelState files is 
> TaskNumber * subtaskNumber. For high parallelism job, it writes too many 
> small files. It causes high load for hdfs NN.
>  
> In our production, a job writes more than 50K small files for each Unaligned 
> Checkpoint. Could we merge these files before write FileSystem? We can 
> configure the maximum number of files each TM can write in a single Unaligned 
> Checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to