fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r854060011
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
final CheckpointCoordinator checkpointCoordinator =
executionGraphToRestore.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
+ Optional<String> stateBackendName =
executionGraphToRestore.getStateBackendName();
+ boolean changelogEnabled =
+ stateBackendName.isPresent()
+ &&
"ChangelogStateBackend".equals(stateBackendName.get());
Review Comment:
> will this work if flink-conf.yaml contains
state.backend.changelog.enabled=true and job doesn't specify changelog or even
any backend?
Yes, it works from the test results.
I set `state.backend.changelog.enabled: true`in flink-conf.yaml and run
examples/SocketWindowWordCount.jar, here is JM log, we can see that changelog
is enabled.
```
2022-04-20 18:52:46,580 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job 'Socket Window WordCount'
(23bf9f9518357ade7bec63df910fae98).
2022-04-20 18:52:46,585 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
NoRestartBackoffTimeStrategy for Socket Window WordCount
(23bf9f9518357ade7bec63df910fae98).
2022-04-20 18:52:46,595 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job Socket Window
WordCount (23bf9f9518357ade7bec63df910fae98).
2022-04-20 18:52:46,595 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2022-04-20 18:52:46,600 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
2022-04-20 18:52:46,600 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - No state backend has been configured, using default
(HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@37e3b93e
2022-04-20 18:52:46,612 INFO
org.apache.flink.state.changelog.ChangelogStateBackend [] -
ChangelogStateBackend is used, delegating HashMapStateBackend.
```
Meanwhile, we can see that the `StateBackendName` is set in
`DefaultExecutionGraph.enableCheckpointing()`, and `enableCheckpoint()` is
called by `DefaultExecutionGraphBuilder.buildGraph()` in
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java#L307
So, before we call `executionGraphToRestore.getStateBackendName()`, the
StateBackendName has been set.
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L149
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]