[
https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522261#comment-17522261
]
Anton Kalashnikov commented on FLINK-26803:
-------------------------------------------
[~fanrui], thanks for this idea. I did some research about this(extra thanks to
[~roman] for feedback and explanation). Some results:
Every parallel instance(operator chain) creates:
* one file for input + output channel state
* one file for each operator in the chain
So `number of files = number of chains * parallelism + state backend *
parallelism`
As I understand, the main reason why Flink has such logic is the simplicity of
implementation + easy rescaling. Theoretically, the problem with rescaling can
be resolved by adding the offset with a specific subtaskIndex but there is
still the problem with creating `stream factory` on `TaskManager` since Flink
don't know in advance all task that will be executed on it. Also reading should
be changed as well to avoid loading one big file multiply times.
You also can check this configuration
[state.storage.fs.memory-threshold|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-storage-fs-memory-threshold].
It helps to reduce the number of the files if states are small enough. Maybe
it will help you.
In conclusion, I don't really sure that it makes sense to implement this
feature since it looks too complicated but the advantages are not so obvious.
But of course I can be wrong somewhere and we can discuss it. But anyway, I
think we need firstly to collect all advantages and try to understand possible
implementation.
> Merge small ChannelState file for Unaligned Checkpoint
> ------------------------------------------------------
>
> Key: FLINK-26803
> URL: https://issues.apache.org/jira/browse/FLINK-26803
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / Network
> Reporter: fanrui
> Priority: Major
>
> When making an unaligned checkpoint, the number of ChannelState files is
> TaskNumber * subtaskNumber. For high parallelism job, it writes too many
> small files. It causes high load for hdfs NN.
>
> In our production, a job writes more than 50K small files for each Unaligned
> Checkpoint. Could we merge these files before write FileSystem? We can
> configure the maximum number of files each TM can write in a single Unaligned
> Checkpoint.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)