[
https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522304#comment-17522304
]
fanrui commented on FLINK-26803:
--------------------------------
Hi [~akalashnikov] , thanks for your reply.
In fact, there is only one immediate benefit: reducing the number of small
files.
If we need to understand the advantages, I think we can sort it out: What are
the problems with too many small files?
# High pressure on hdfs NN
# If UC (Unaligned Checkpoint) is enabled for all flink jobs, when external
components such as Kafka slow down, a large number of flink jobs write Kafka
slow. It will lead to severe back pressure, and a large number of jobs may
write a large number of small files at this time. It may cause hdfs NN
avalanche.
Some of our important jobs hope that when the back pressure is severe, the CP
can be successful. After research, we found that UC can solve this problem.
However, due to the small file problem, we dare not use it in a large number of
jobs. So I think solving the small file problem is mainly used to improve the
availability and stability of the job. Of course, I'd love to hear more from
other flink users.
By the way, I know the
[state.storage.fs.memory-threshold|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-storage-fs-memory-threshold],
but I think the size of input + output channel state is always much large
than 20KB. For large parallelism flink jobs, this size is usually more than 1M.
Regarding the code implementation, I have some ideas, and I'd like to discuss
with you.
Adding the ChannelStateFileManager:
* During the initialization of all Tasks, all Tasks need to be registered with
ChannelStateFileManager.
* When checkpointing, all tasks write files to ChannelStateFileManager, and
ChannelStateFileManager returns path, offset and length.
* Since ChannelStateFileManager knows all the tasks, it knows whether all the
tasks of the current checkpointId are over.
Please correct me if any wrong, I look forward to discussing with you in depth.
Thanks.
> Merge small ChannelState file for Unaligned Checkpoint
> ------------------------------------------------------
>
> Key: FLINK-26803
> URL: https://issues.apache.org/jira/browse/FLINK-26803
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / Network
> Reporter: fanrui
> Priority: Major
>
> When making an unaligned checkpoint, the number of ChannelState files is
> TaskNumber * subtaskNumber. For high parallelism job, it writes too many
> small files. It causes high load for hdfs NN.
>
> In our production, a job writes more than 50K small files for each Unaligned
> Checkpoint. Could we merge these files before write FileSystem? We can
> configure the maximum number of files each TM can write in a single Unaligned
> Checkpoint.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)