[
https://issues.apache.org/jira/browse/FLINK-24506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias reassigned FLINK-24506:
--------------------------------
Assignee: Matthias
> checkpoint directory is not configurable through the Flink configuration
> passed into the StreamExecutionEnvironment
> -------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24506
> URL: https://issues.apache.org/jira/browse/FLINK-24506
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Configuration, Runtime / State Backends
> Affects Versions: 1.14.0, 1.13.2
> Reporter: Matthias
> Assignee: Matthias
> Priority: Major
>
> FLINK-19463 introduced the separation of {{StateBackend}} and
> {{{}CheckpointStorage{}}}. Before that, both were included in the same
> interface implementation
> [AbstractFileStateBackend|https://github.com/apache/flink/blob/0a76daba0a428a322f0273d7dc6a70966f62bf26/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java].
> {{FsStateBackend}} was used as a default implementation pre-1.13.
> pre-{{{}1.13{}}} initialized the checkpoint directory when instantiating the
> state backend (see
> [FsStateBackendFactory|https://github.com/apache/flink/blob/release-1.12/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java#L46]).
> Starting from {{1.13}} loading the {{CheckpointStorage}} is done by the
> {{CheckpointStorageLoader.load}} method that is called in various places:
> * Savepoint Disposal (through {{{}Checkpoints.loadCheckpointStorage{}}})
> where it only relies on the configuration passed in by the cluster
> configuration (no application checkpoint storage is passed)
> * {{SchedulerBase}} initialization (through DefaultExecutionGraphBuilder)
> where it’s based on the cluster’s configuration but also the application
> configuration (i.e. the {{{}JobGraph{}}}’s setting) that would be considered
> if {{CheckpointConfig#configure}} would have the checkpoint storage included
> * {{StreamTask}} on the {{{}TaskManager{}}}’s side where it’s based on the
> configuration passed in by the {{JobVertex}} for the application’s
> {{CheckpointStorage}} and the {{{}TaskManager{}}}’s configuration (coming
> from the session cluster) for the fallback {{CheckpointStorage}}
> The issue is that we don't set the checkpoint directory in the
> {{{}CheckpointConfig{}}}. Hence, it's not going to get picked up as a
> job-related property. Flink always uses the fallback provided by the session
> cluster configuration.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)