fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r854060011


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = 
executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && 
"ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   > will this work if flink-conf.yaml contains 
state.backend.changelog.enabled=true and job doesn't specify changelog or even 
any backend?
   
   Yes, it works from the test results.
    I set `state.backend.changelog.enabled: true`in flink-conf.yaml and run 
examples/SocketWindowWordCount.jar, here is JM log, we can see that changelog 
is enabled.
   ```
   2022-04-20 18:52:46,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - Initializing job 'Socket Window WordCount' 
(23bf9f9518357ade7bec63df910fae98).
   2022-04-20 18:52:46,585 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - Using restart back off time strategy 
NoRestartBackoffTimeStrategy for Socket Window WordCount 
(23bf9f9518357ade7bec63df910fae98).
   2022-04-20 18:52:46,595 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - Running initialization on master for job Socket Window 
WordCount (23bf9f9518357ade7bec63df910fae98).
   2022-04-20 18:52:46,595 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - Successfully ran initialization on master in 0 ms.
   2022-04-20 18:52:46,600 INFO  
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
   2022-04-20 18:52:46,600 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - No state backend has been configured, using default 
(HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@37e3b93e
   2022-04-20 18:52:46,612 INFO  
org.apache.flink.state.changelog.ChangelogStateBackend       [] - 
ChangelogStateBackend is used, delegating HashMapStateBackend.
   ```
   Meanwhile, we can see that the `StateBackendName` is set in 
`DefaultExecutionGraph.enableCheckpointing()`, and `enableCheckpoint()` is 
called by `DefaultExecutionGraphBuilder.buildGraph()`  in 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java#L307
   
   So, before we call `executionGraphToRestore.getStateBackendName()`, the  
StateBackendName has been set. 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L149
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to