[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220213#comment-16220213
 ] 

ASF GitHub Bot commented on FLINK-5823:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4907#discussion_r147089325
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 ---
    @@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph(
                                        metrics);
     
                        // The default directory for externalized checkpoints
    -                   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
    +                   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
     
    -                   // load the state backend for checkpoint metadata.
    -                   // if specified in the application, use from there, 
otherwise load from configuration
    -                   final StateBackend metadataBackend;
    +                   // load the state backend from the application settings
    +                   final StateBackend applicationConfiguredBackend;
    +                   final SerializedValue<StateBackend> 
serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
     
    -                   final SerializedValue<StateBackend> 
applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
    -                   if (applicationConfiguredBackend != null) {
    +                   if (serializedAppConfigured == null) {
    +                           applicationConfiguredBackend = null;
    +                   }
    +                   else {
                                try {
    -                                   metadataBackend = 
applicationConfiguredBackend.deserializeValue(classLoader);
    +                                   applicationConfiguredBackend = 
serializedAppConfigured.deserializeValue(classLoader);
                                } catch (IOException | ClassNotFoundException 
e) {
    -                                   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend.", e);
    +                                   throw new JobExecutionException(jobId, 
    +                                                   "Could not deserialize 
application-defined state backend.", e);
                                }
    +                   }
     
    -                           log.info("Using application-defined state 
backend for checkpoint/savepoint metadata: {}.",
    -                                   metadataBackend);
    -                   } else {
    -                           try {
    -                                   metadataBackend = AbstractStateBackend
    -                                                   
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
    -                           } catch (IllegalConfigurationException | 
IOException | DynamicCodeLoadingException e) {
    -                                   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend", e);
    -                           }
    +                   final StateBackend rootBackend;
    +                   try {
    +                           rootBackend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(
    --- End diff --
    
    True, commits are not self contained. I tries as much, but sich it comes 
all from a single original commit, it would have been crazy time intensive to 
make every commit self contained.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> -----------------------------------------------------------------------
>
>                 Key: FLINK-5823
>                 URL: https://issues.apache.org/jira/browse/FLINK-5823
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to