[ 
https://issues.apache.org/jira/browse/FLINK-24506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17451622#comment-17451622
 ] 

Matthias commented on FLINK-24506:
----------------------------------

Both, {{StreamPlanEnvironment}} and {{StreamContextEnvironment}} are intialized 
using the Flink configuration (which includes the checkpoint directory). Both 
classes derive from {{{}StreamExecutionEnvironment{}}}. The 
{{StreamExecutionEnvironment}} initializes the {{CheckpointConfig}} (see 
[StreamExecutionEnvironment:975|https://github.com/apache/flink/blob/b4c385e41832f16e39d5cbe4fb69ead9bbe077b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L975]).
 But it doesn’t set the checkpoint directory in 
[CheckpointConfig#configure|https://github.com/apache/flink/blob/cd01d4c02793d1b29618093f730b3bc521152b62/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L753].
 This was previously set when initializing the {{StateBackend}} in 
[StreamExecutionEnvironment#configure|https://github.com/apache/flink/blob/b4c385e41832f16e39d5cbe4fb69ead9bbe077b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L919]
 which calls 
[StateBackendLoader#loadStateBackendFromConfig|https://github.com/apache/flink/blob/641c31e92fd8ff3702d2ac3510a63b0653802a2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L146].

{{FsStateBackend}} was initialized using the checkpoint directory. 
{{HashMapStateBackend}} does not include this pointer anymore but relies on the 
{{CheckpointStorage}} instead that is loaded through 
[CheckpointStorageLoader.load|https://github.com/apache/flink/blob/658fac3736b73adf54b629242ede91313947e7e1/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java#L158]
 which is called when creating the ExecutionGraph in 
[DefaultExecutionGraphBuilder:269|https://github.com/apache/flink/blob/eba8f574c550123004ed4f557cef28ff557cd88e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java#L269].
 The {{CheckpointStorage}} is either loaded from the JobManager configuration 
(i.e. the Session cluster’s configuration) or from the application (i.e. the 
{{{}JobGraph{}}}). But the {{JobGraph}} does not have this setting set due to 
it not being written 
[CheckpointConfig#configure|https://github.com/apache/flink/blob/cd01d4c02793d1b29618093f730b3bc521152b62/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L753]
 as written earlier already.

The {{CheckpointStorage}} is loaded in three locations:
 * Savepoint Disposal (through 
[Checkpoints.loadCheckpointStorage|https://github.com/apache/flink/blob/4597d5557c640e0ef5a526cbb6d46686be5dd813/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L351])
 where it only relies on the configuration passed in by the cluster 
configuration (no application checkpoint storage is passed)

 * Scheduler initialization (through 
[DefaultExecutionGraphBuilder|https://github.com/apache/flink/blob/f7bedb0603c33cb4e25c62c9899edb709b264371/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java#L268])
 where it’s based on the cluster’s configuration but also the application 
configuration (i.e. the JobGraph’s setting) that would be considered if 
[CheckpointConfig#configure|https://github.com/apache/flink/blob/cd01d4c02793d1b29618093f730b3bc521152b62/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L753]
 would have the checkpoint storage included

 * 
[StreamTask|https://github.com/apache/flink/blob/79a801a7bf669813d88784fd642d724d6dab69f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1505]
 on the TaskManager’s side where it’s based on the configuration passed in by 
the {{JobVertex}} for the application’s {{CheckpointStorage}} and the 
TaskManager’s configuration (coming from the session cluster) for the fallback 
{{CheckpointStorage}}

For the latter two, we could solve the issue of not having the checkpoint 
directory being loaded from the job spec by considering the option in 
[CheckpointConfig#configure|https://github.com/apache/flink/blob/cd01d4c02793d1b29618093f730b3bc521152b62/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L753]

The Savepoint Disposal is save even with this change because it relies on an 
external pointer for the savepoint and does not use the checkpoint directory at 
all.

> checkpoint directory is not configurable through the Flink configuration 
> passed into the StreamExecutionEnvironment
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24506
>                 URL: https://issues.apache.org/jira/browse/FLINK-24506
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Configuration, Runtime / State Backends
>    Affects Versions: 1.14.0, 1.13.2
>            Reporter: Matthias
>            Assignee: Matthias
>            Priority: Major
>              Labels: pull-request-available
>
> FLINK-19463 introduced the separation of {{StateBackend}} and 
> {{{}CheckpointStorage{}}}. Before that, both were included in the same 
> interface implementation 
> [AbstractFileStateBackend|https://github.com/apache/flink/blob/0a76daba0a428a322f0273d7dc6a70966f62bf26/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java].
>  {{FsStateBackend}} was used as a default implementation pre-1.13.
> pre-{{{}1.13{}}} initialized the checkpoint directory when instantiating the 
> state backend (see 
> [FsStateBackendFactory|https://github.com/apache/flink/blob/release-1.12/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java#L46]).
>  Starting from {{1.13}} loading the {{CheckpointStorage}} is done by the 
> {{CheckpointStorageLoader.load}} method that is called in various places:
>  * Savepoint Disposal (through {{{}Checkpoints.loadCheckpointStorage{}}}) 
> where it only relies on the configuration passed in by the cluster 
> configuration (no application checkpoint storage is passed)
>  * {{SchedulerBase}} initialization (through DefaultExecutionGraphBuilder) 
> where it’s based on the cluster’s configuration but also the application 
> configuration (i.e. the {{{}JobGraph{}}}’s setting) that would be considered 
> if {{CheckpointConfig#configure}} would have the checkpoint storage included
>  * {{StreamTask}} on the {{{}TaskManager{}}}’s side where it’s based on the 
> configuration passed in by the {{JobVertex}} for the application’s 
> {{CheckpointStorage}} and the {{{}TaskManager{}}}’s configuration (coming 
> from the session cluster) for the fallback {{CheckpointStorage}}
> The issue is that we don't set the checkpoint directory in the 
> {{{}CheckpointConfig{}}}. Hence, it's not going to get picked up as a 
> job-related property. Flink always uses the fallback provided by the session 
> cluster configuration.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to