[ https://issues.apache.org/jira/browse/FLINK-24506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452861#comment-17452861 ]
Chesnay Schepler edited comment on FLINK-24506 at 12/4/21, 1:40 PM: -------------------------------------------------------------------- master: 83cad724890e4904758b46f0e292a64fc1a5d1d4 1.13: 98c49d63efa7723ed5394e503f7f0c3f54061456 was (Author: zentol): master: 83cad724890e4904758b46f0e292a64fc1a5d1d4 > checkpoint directory is not configurable through the Flink configuration > passed into the StreamExecutionEnvironment > ------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-24506 > URL: https://issues.apache.org/jira/browse/FLINK-24506 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Runtime / State Backends > Affects Versions: 1.14.0, 1.13.2 > Reporter: Matthias > Assignee: Matthias > Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > FLINK-19463 introduced the separation of {{StateBackend}} and > {{{}CheckpointStorage{}}}. Before that, both were included in the same > interface implementation > [AbstractFileStateBackend|https://github.com/apache/flink/blob/0a76daba0a428a322f0273d7dc6a70966f62bf26/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java]. > {{FsStateBackend}} was used as a default implementation pre-1.13. > pre-{{{}1.13{}}} initialized the checkpoint directory when instantiating the > state backend (see > [FsStateBackendFactory|https://github.com/apache/flink/blob/release-1.12/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java#L46]). > Starting from {{1.13}} loading the {{CheckpointStorage}} is done by the > {{CheckpointStorageLoader.load}} method that is called in various places: > * Savepoint Disposal (through {{{}Checkpoints.loadCheckpointStorage{}}}) > where it only relies on the configuration passed in by the cluster > configuration (no application checkpoint storage is passed) > * {{SchedulerBase}} initialization (through DefaultExecutionGraphBuilder) > where it’s based on the cluster’s configuration but also the application > configuration (i.e. the {{{}JobGraph{}}}’s setting) that would be considered > if {{CheckpointConfig#configure}} would have the checkpoint storage included > * {{StreamTask}} on the {{{}TaskManager{}}}’s side where it’s based on the > configuration passed in by the {{JobVertex}} for the application’s > {{CheckpointStorage}} and the {{{}TaskManager{}}}’s configuration (coming > from the session cluster) for the fallback {{CheckpointStorage}} > The issue is that we don't set the checkpoint directory in the > {{{}CheckpointConfig{}}}. Hence, it's not going to get picked up as a > job-related property. Flink always uses the fallback provided by the session > cluster configuration. -- This message was sent by Atlassian Jira (v8.20.1#820001)