[
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219562#comment-16219562
]
ASF GitHub Bot commented on FLINK-5823:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4907#discussion_r146987131
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
---
@@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph(
metrics);
// The default directory for externalized checkpoints
- String externalizedCheckpointsDir =
jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
+ String externalizedCheckpointsDir =
jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
- // load the state backend for checkpoint metadata.
- // if specified in the application, use from there,
otherwise load from configuration
- final StateBackend metadataBackend;
+ // load the state backend from the application settings
+ final StateBackend applicationConfiguredBackend;
+ final SerializedValue<StateBackend>
serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
- final SerializedValue<StateBackend>
applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
- if (applicationConfiguredBackend != null) {
+ if (serializedAppConfigured == null) {
+ applicationConfiguredBackend = null;
+ }
+ else {
try {
- metadataBackend =
applicationConfiguredBackend.deserializeValue(classLoader);
+ applicationConfiguredBackend =
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException
e) {
- throw new JobExecutionException(jobId,
"Could not instantiate configured state backend.", e);
+ throw new JobExecutionException(jobId,
+ "Could not deserialize
application-defined state backend.", e);
}
+ }
- log.info("Using application-defined state
backend for checkpoint/savepoint metadata: {}.",
- metadataBackend);
- } else {
- try {
- metadataBackend = AbstractStateBackend
-
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
- } catch (IllegalConfigurationException |
IOException | DynamicCodeLoadingException e) {
- throw new JobExecutionException(jobId,
"Could not instantiate configured state backend", e);
- }
+ final StateBackend rootBackend;
+ try {
+ rootBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(
--- End diff --
nit: This is only defined in the later commit "[FLINK-5823] [checkpoints]
State backends define checkpoint and savepoint directories, improved
configuration", so technically the commit that adds this code is broken.
> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> -----------------------------------------------------------------------
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Blocker
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)