Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4907#discussion_r146987131
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
---
@@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph(
metrics);
// The default directory for externalized checkpoints
- String externalizedCheckpointsDir =
jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
+ String externalizedCheckpointsDir =
jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
- // load the state backend for checkpoint metadata.
- // if specified in the application, use from there,
otherwise load from configuration
- final StateBackend metadataBackend;
+ // load the state backend from the application settings
+ final StateBackend applicationConfiguredBackend;
+ final SerializedValue<StateBackend>
serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
- final SerializedValue<StateBackend>
applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
- if (applicationConfiguredBackend != null) {
+ if (serializedAppConfigured == null) {
+ applicationConfiguredBackend = null;
+ }
+ else {
try {
- metadataBackend =
applicationConfiguredBackend.deserializeValue(classLoader);
+ applicationConfiguredBackend =
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException
e) {
- throw new JobExecutionException(jobId,
"Could not instantiate configured state backend.", e);
+ throw new JobExecutionException(jobId,
+ "Could not deserialize
application-defined state backend.", e);
}
+ }
- log.info("Using application-defined state
backend for checkpoint/savepoint metadata: {}.",
- metadataBackend);
- } else {
- try {
- metadataBackend = AbstractStateBackend
-
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
- } catch (IllegalConfigurationException |
IOException | DynamicCodeLoadingException e) {
- throw new JobExecutionException(jobId,
"Could not instantiate configured state backend", e);
- }
+ final StateBackend rootBackend;
+ try {
+ rootBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(
--- End diff --
nit: This is only defined in the later commit "[FLINK-5823] [checkpoints]
State backends define checkpoint and savepoint directories, improved
configuration", so technically the commit that adds this code is broken.
---