[
https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531034#comment-17531034
]
fanrui commented on FLINK-26803:
--------------------------------
Hi [~pnowojski] , thanks for your reply.
In fact, there is one file per Task, that is, one file per OperatorChain. I
guess it's typo. `number of files = number of chains * parallelism + state
backend * parallelism`.
For the unaligned checkpoint, could we reduce the number of files from the
number of Tasks(chains) to the number of TMs? After optimization, the number of
files = number of TM + state backend * parallelism.
For code implementation, we just control the ChannelStateCheckpointWriter.
Currently, each Task and checkpointId have a ChannelStateCheckpointWriter,
[code
link|https://github.com/apache/flink/blob/f3d4cebb6516f1f35d4ff9fbb9d3ff941f92243e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java#L39].
We can share the ChannelStateCheckpointWriter for each Task and same
checkpointId. If the checkpointId is the same, we can share the
ChannelStateCheckpointWriter for each Task inside the TM.
> Merge small ChannelState file for Unaligned Checkpoint
> ------------------------------------------------------
>
> Key: FLINK-26803
> URL: https://issues.apache.org/jira/browse/FLINK-26803
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / Network
> Reporter: fanrui
> Priority: Major
>
> When making an unaligned checkpoint, the number of ChannelState files is
> TaskNumber * subtaskNumber. For high parallelism job, it writes too many
> small files. It causes high load for hdfs NN.
>
> In our production, a job writes more than 50K small files for each Unaligned
> Checkpoint. Could we merge these files before write FileSystem? We can
> configure the maximum number of files each TM can write in a single Unaligned
> Checkpoint.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)