[
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220213#comment-16220213
]
ASF GitHub Bot commented on FLINK-5823:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/4907#discussion_r147089325
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
---
@@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph(
metrics);
// The default directory for externalized checkpoints
- String externalizedCheckpointsDir =
jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
+ String externalizedCheckpointsDir =
jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
- // load the state backend for checkpoint metadata.
- // if specified in the application, use from there,
otherwise load from configuration
- final StateBackend metadataBackend;
+ // load the state backend from the application settings
+ final StateBackend applicationConfiguredBackend;
+ final SerializedValue<StateBackend>
serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
- final SerializedValue<StateBackend>
applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
- if (applicationConfiguredBackend != null) {
+ if (serializedAppConfigured == null) {
+ applicationConfiguredBackend = null;
+ }
+ else {
try {
- metadataBackend =
applicationConfiguredBackend.deserializeValue(classLoader);
+ applicationConfiguredBackend =
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException
e) {
- throw new JobExecutionException(jobId,
"Could not instantiate configured state backend.", e);
+ throw new JobExecutionException(jobId,
+ "Could not deserialize
application-defined state backend.", e);
}
+ }
- log.info("Using application-defined state
backend for checkpoint/savepoint metadata: {}.",
- metadataBackend);
- } else {
- try {
- metadataBackend = AbstractStateBackend
-
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
- } catch (IllegalConfigurationException |
IOException | DynamicCodeLoadingException e) {
- throw new JobExecutionException(jobId,
"Could not instantiate configured state backend", e);
- }
+ final StateBackend rootBackend;
+ try {
+ rootBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(
--- End diff --
True, commits are not self contained. I tries as much, but sich it comes
all from a single original commit, it would have been crazy time intensive to
make every commit self contained.
> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> -----------------------------------------------------------------------
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Blocker
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)