This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4a852fee28f2d87529dc05f5ba2e79202a0e00b6
Author: Rui Fan <[email protected]>
AuthorDate: Mon Dec 25 15:50:33 2023 +0800

    [FLINK-33935][checkpoint] Improve the default value logic related to 
`state.backend.type`
---
 .../generated/common_state_backends_section.html   |  2 +-
 .../generated/state_backend_configuration.html     |  2 +-
 .../flink/configuration/StateBackendOptions.java   |  2 +-
 .../flink/runtime/checkpoint/Checkpoints.java      | 19 ++--------------
 .../flink/runtime/state/StateBackendLoader.java    | 25 ++++------------------
 .../runtime/state/StateBackendLoadingTest.java     |  4 ++--
 .../environment/StreamExecutionEnvironment.java    |  6 ++++--
 7 files changed, 15 insertions(+), 45 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/common_state_backends_section.html 
b/docs/layouts/shortcodes/generated/common_state_backends_section.html
index 383bb165c7a..157cb75c81f 100644
--- a/docs/layouts/shortcodes/generated/common_state_backends_section.html
+++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html
@@ -10,7 +10,7 @@
     <tbody>
         <tr>
             <td><h5>state.backend.type</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">"hashmap"</td>
             <td>String</td>
             <td>The state backend to be used to store state.<br />The 
implementation can be specified either via their shortcut  name, or via the 
class name of a <code class="highlighter-rouge">StateBackendFactory</code>. If 
a factory is specified it is instantiated via its zero argument constructor and 
its <code 
class="highlighter-rouge">StateBackendFactory#createFromConfig(ReadableConfig, 
ClassLoader)</code> method is called.<br />Recognized shortcut names are 
'hashmap' and 'rocksdb'.</td>
         </tr>
diff --git a/docs/layouts/shortcodes/generated/state_backend_configuration.html 
b/docs/layouts/shortcodes/generated/state_backend_configuration.html
index 677cedb90ba..10e9e45a762 100644
--- a/docs/layouts/shortcodes/generated/state_backend_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_backend_configuration.html
@@ -34,7 +34,7 @@
         </tr>
         <tr>
             <td><h5>state.backend.type</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">"hashmap"</td>
             <td>String</td>
             <td>The state backend to be used to store state.<br />The 
implementation can be specified either via their shortcut  name, or via the 
class name of a <code class="highlighter-rouge">StateBackendFactory</code>. If 
a factory is specified it is instantiated via its zero argument constructor and 
its <code 
class="highlighter-rouge">StateBackendFactory#createFromConfig(ReadableConfig, 
ClassLoader)</code> method is called.<br />Recognized shortcut names are 
'hashmap' and 'rocksdb'.</td>
         </tr>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
index 1ff9487f2a6..2778c0f59bd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
@@ -44,7 +44,7 @@ public class StateBackendOptions {
     public static final ConfigOption<String> STATE_BACKEND =
             ConfigOptions.key("state.backend.type")
                     .stringType()
-                    .noDefaultValue()
+                    .defaultValue("hashmap")
                     .withDeprecatedKeys("state.backend")
                     .withDescription(
                             Description.builder()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index a748a62d5b0..97d378be02c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -319,31 +319,16 @@ public class Checkpoints {
             logger.info("Attempting to load configured state backend for 
savepoint disposal");
         }
 
-        StateBackend backend = null;
         try {
-            backend =
-                    
StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null);
-
-            if (backend == null && logger != null) {
-                logger.debug(
-                        "No state backend configured, attempting to dispose 
savepoint "
-                                + "with configured checkpoint storage");
-            }
+            return 
StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null);
         } catch (Throwable t) {
             // catches exceptions and errors (like linking errors)
             if (logger != null) {
                 logger.info("Could not load configured state backend.");
                 logger.debug("Detailed exception:", t);
             }
+            return new HashMapStateBackend();
         }
-
-        if (backend == null) {
-            // We use the hashmap state backend by default. This will
-            // force the checkpoint storage loader to load
-            // the configured storage backend.
-            backend = new HashMapStateBackend();
-        }
-        return backend;
     }
 
     @Nonnull
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
index 6f8ef3263e4..eb5de95aee8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.TernaryBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -110,6 +111,7 @@ public class StateBackendLoader {
      * @throws IOException May be thrown by the StateBackendFactory when 
instantiating the state
      *     backend
      */
+    @Nonnull
     public static StateBackend loadStateBackendFromConfig(
             ReadableConfig config, ClassLoader classLoader, @Nullable Logger 
logger)
             throws IllegalConfigurationException, DynamicCodeLoadingException, 
IOException {
@@ -118,9 +120,6 @@ public class StateBackendLoader {
         checkNotNull(classLoader, "classLoader");
 
         final String backendName = 
config.get(StateBackendOptions.STATE_BACKEND);
-        if (backendName == null) {
-            return null;
-        }
 
         // by default the factory class is the backend name
         String factoryClassName = backendName;
@@ -253,18 +252,7 @@ public class StateBackendLoader {
             }
         } else {
             // (2) check if the config defines a state backend
-            final StateBackend fromConfig = loadStateBackendFromConfig(config, 
classLoader, logger);
-            if (fromConfig != null) {
-                backend = fromConfig;
-            } else {
-                // (3) use the default
-                backend = new 
HashMapStateBackendFactory().createFromConfig(config, classLoader);
-                if (logger != null) {
-                    logger.info(
-                            "No state backend has been configured, using 
default (HashMap) {}",
-                            backend);
-                }
-            }
+            backend = loadStateBackendFromConfig(config, classLoader, logger);
         }
 
         return backend;
@@ -350,18 +338,13 @@ public class StateBackendLoader {
         // (2) check if the config defines a state backend
         try {
             final StateBackend fromConfig = loadStateBackendFromConfig(config, 
classLoader, LOG);
-            if (fromConfig != null) {
-                return fromConfig.useManagedMemory();
-            }
+            return fromConfig.useManagedMemory();
         } catch (IllegalConfigurationException | DynamicCodeLoadingException | 
IOException e) {
             LOG.warn(
                     "Cannot decide whether state backend uses managed memory. 
Will reserve managed memory by default.",
                     e);
             return true;
         }
-
-        // (3) use the default MemoryStateBackend
-        return false;
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
index 17cc2bd8206..96d6c909378 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -59,9 +59,9 @@ class StateBackendLoadingTest {
     // ------------------------------------------------------------------------
 
     @Test
-    void testNoStateBackendDefined() throws Exception {
+    void testDefaultStateBackend() throws Exception {
         assertThat(StateBackendLoader.loadStateBackendFromConfig(new 
Configuration(), cl, null))
-                .isNull();
+                .isInstanceOf(HashMapStateBackend.class);
     }
 
     @Test
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4b116805166..dff237cce20 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -62,6 +62,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.StateChangelogOptions;
 import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
 import org.apache.flink.connector.datagen.source.DataGeneratorSource;
@@ -1040,8 +1041,9 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         configuration
                 .getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)
                 .ifPresent(this::enableChangelogStateBackend);
-        Optional.ofNullable(loadStateBackend(configuration, classLoader))
-                .ifPresent(this::setStateBackend);
+        configuration
+                .getOptional(StateBackendOptions.STATE_BACKEND)
+                .ifPresent(conf -> 
setStateBackend(loadStateBackend(configuration, classLoader)));
         configuration
                 .getOptional(PipelineOptions.OPERATOR_CHAINING)
                 .ifPresent(c -> this.isChainingEnabled = c);

Reply via email to