This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4a852fee28f2d87529dc05f5ba2e79202a0e00b6 Author: Rui Fan <[email protected]> AuthorDate: Mon Dec 25 15:50:33 2023 +0800 [FLINK-33935][checkpoint] Improve the default value logic related to `state.backend.type` --- .../generated/common_state_backends_section.html | 2 +- .../generated/state_backend_configuration.html | 2 +- .../flink/configuration/StateBackendOptions.java | 2 +- .../flink/runtime/checkpoint/Checkpoints.java | 19 ++-------------- .../flink/runtime/state/StateBackendLoader.java | 25 ++++------------------ .../runtime/state/StateBackendLoadingTest.java | 4 ++-- .../environment/StreamExecutionEnvironment.java | 6 ++++-- 7 files changed, 15 insertions(+), 45 deletions(-) diff --git a/docs/layouts/shortcodes/generated/common_state_backends_section.html b/docs/layouts/shortcodes/generated/common_state_backends_section.html index 383bb165c7a..157cb75c81f 100644 --- a/docs/layouts/shortcodes/generated/common_state_backends_section.html +++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html @@ -10,7 +10,7 @@ <tbody> <tr> <td><h5>state.backend.type</h5></td> - <td style="word-wrap: break-word;">(none)</td> + <td style="word-wrap: break-word;">"hashmap"</td> <td>String</td> <td>The state backend to be used to store state.<br />The implementation can be specified either via their shortcut name, or via the class name of a <code class="highlighter-rouge">StateBackendFactory</code>. If a factory is specified it is instantiated via its zero argument constructor and its <code class="highlighter-rouge">StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)</code> method is called.<br />Recognized shortcut names are 'hashmap' and 'rocksdb'.</td> </tr> diff --git a/docs/layouts/shortcodes/generated/state_backend_configuration.html b/docs/layouts/shortcodes/generated/state_backend_configuration.html index 677cedb90ba..10e9e45a762 100644 --- a/docs/layouts/shortcodes/generated/state_backend_configuration.html +++ b/docs/layouts/shortcodes/generated/state_backend_configuration.html @@ -34,7 +34,7 @@ </tr> <tr> <td><h5>state.backend.type</h5></td> - <td style="word-wrap: break-word;">(none)</td> + <td style="word-wrap: break-word;">"hashmap"</td> <td>String</td> <td>The state backend to be used to store state.<br />The implementation can be specified either via their shortcut name, or via the class name of a <code class="highlighter-rouge">StateBackendFactory</code>. If a factory is specified it is instantiated via its zero argument constructor and its <code class="highlighter-rouge">StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)</code> method is called.<br />Recognized shortcut names are 'hashmap' and 'rocksdb'.</td> </tr> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java index 1ff9487f2a6..2778c0f59bd 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java @@ -44,7 +44,7 @@ public class StateBackendOptions { public static final ConfigOption<String> STATE_BACKEND = ConfigOptions.key("state.backend.type") .stringType() - .noDefaultValue() + .defaultValue("hashmap") .withDeprecatedKeys("state.backend") .withDescription( Description.builder() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index a748a62d5b0..97d378be02c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -319,31 +319,16 @@ public class Checkpoints { logger.info("Attempting to load configured state backend for savepoint disposal"); } - StateBackend backend = null; try { - backend = - StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null); - - if (backend == null && logger != null) { - logger.debug( - "No state backend configured, attempting to dispose savepoint " - + "with configured checkpoint storage"); - } + return StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null); } catch (Throwable t) { // catches exceptions and errors (like linking errors) if (logger != null) { logger.info("Could not load configured state backend."); logger.debug("Detailed exception:", t); } + return new HashMapStateBackend(); } - - if (backend == null) { - // We use the hashmap state backend by default. This will - // force the checkpoint storage loader to load - // the configured storage backend. - backend = new HashMapStateBackend(); - } - return backend; } @Nonnull diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java index 6f8ef3263e4..eb5de95aee8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java @@ -36,6 +36,7 @@ import org.apache.flink.util.TernaryBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -110,6 +111,7 @@ public class StateBackendLoader { * @throws IOException May be thrown by the StateBackendFactory when instantiating the state * backend */ + @Nonnull public static StateBackend loadStateBackendFromConfig( ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { @@ -118,9 +120,6 @@ public class StateBackendLoader { checkNotNull(classLoader, "classLoader"); final String backendName = config.get(StateBackendOptions.STATE_BACKEND); - if (backendName == null) { - return null; - } // by default the factory class is the backend name String factoryClassName = backendName; @@ -253,18 +252,7 @@ public class StateBackendLoader { } } else { // (2) check if the config defines a state backend - final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger); - if (fromConfig != null) { - backend = fromConfig; - } else { - // (3) use the default - backend = new HashMapStateBackendFactory().createFromConfig(config, classLoader); - if (logger != null) { - logger.info( - "No state backend has been configured, using default (HashMap) {}", - backend); - } - } + backend = loadStateBackendFromConfig(config, classLoader, logger); } return backend; @@ -350,18 +338,13 @@ public class StateBackendLoader { // (2) check if the config defines a state backend try { final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, LOG); - if (fromConfig != null) { - return fromConfig.useManagedMemory(); - } + return fromConfig.useManagedMemory(); } catch (IllegalConfigurationException | DynamicCodeLoadingException | IOException e) { LOG.warn( "Cannot decide whether state backend uses managed memory. Will reserve managed memory by default.", e); return true; } - - // (3) use the default MemoryStateBackend - return false; } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java index 17cc2bd8206..96d6c909378 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java @@ -59,9 +59,9 @@ class StateBackendLoadingTest { // ------------------------------------------------------------------------ @Test - void testNoStateBackendDefined() throws Exception { + void testDefaultStateBackend() throws Exception { assertThat(StateBackendLoader.loadStateBackendFromConfig(new Configuration(), cl, null)) - .isNull(); + .isInstanceOf(HashMapStateBackend.class); } @Test diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 4b116805166..dff237cce20 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -62,6 +62,7 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; import org.apache.flink.connector.datagen.source.DataGeneratorSource; @@ -1040,8 +1041,9 @@ public class StreamExecutionEnvironment implements AutoCloseable { configuration .getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG) .ifPresent(this::enableChangelogStateBackend); - Optional.ofNullable(loadStateBackend(configuration, classLoader)) - .ifPresent(this::setStateBackend); + configuration + .getOptional(StateBackendOptions.STATE_BACKEND) + .ifPresent(conf -> setStateBackend(loadStateBackend(configuration, classLoader))); configuration .getOptional(PipelineOptions.OPERATOR_CHAINING) .ifPresent(c -> this.isChainingEnabled = c);
