[
https://issues.apache.org/jira/browse/FLINK-30412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648317#comment-17648317
]
xiaodao commented on FLINK-30412:
---------------------------------
[~xtsong] thank you for your reply。
Supplementary Note:
when we build jobGraph in StreamingJobGraphGenerator#createJobGraph, it call
configureCheckpointing();
in this function it will set
jobGraph.setSnapshotSettings(settings);
and the field snapshotSettings in jobGraph never be null;
after submit to jobmanager. it build executionGraph. the code is
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder#buildGraph
in this function it create chekcpoint when the jobGraph field snapshotSettings
is not null;
// configure the state checkpointing
JobCheckpointingSettings snapshotSettings =
jobGraph.getCheckpointingSettings();
if (snapshotSettings != null) \{...}
and it build CheckpointCoordinator then create empty checkpoint dir in this
code;
'''
try {
this.checkpointStorage =
checkpointStateBackend.createCheckpointStorage(job);
checkpointStorage.initializeBaseLocations();
} catch (IOException e) {
throw new FlinkRuntimeException(
"Failed to create checkpoint storage at checkpoint
coordinator side.", e);
}
'''
my doubt is that why we need to open checkpoint when the checkpointInterval is
not set(close);
> create many checkpoint empty dir when job not enable checkpoint
> ---------------------------------------------------------------
>
> Key: FLINK-30412
> URL: https://issues.apache.org/jira/browse/FLINK-30412
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.12.7, 1.13.6, 1.15.2
> Reporter: xiaodao
> Priority: Major
>
> when we submit job to flink session cluster , after a long time, we find it
> create too much
> empty checkpoint dir,and it over hdfs max node limit ;
> i found StreamingJobGraphGenerator set snapshot whennever the job is open
> checkpoint;
> jobGraph.setSnapshotSettings(settings)
> {code:java}
> private void configureCheckpointing()
> CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval =
> cfg.getCheckpointInterval(); if (interval < MINIMAL_CHECKPOINT_TIME) { //
> interval of max value means disable periodic checkpoint interval =
> Long.MAX_VALUE; }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)