This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c0176fd6794 [FLINK-33677][core] Remove flink-conf.yaml from flink dist.
c0176fd6794 is described below

commit c0176fd6794295dc3706ed7f536f3f70b6110923
Author: JunRuiLee <[email protected]>
AuthorDate: Thu Aug 22 11:18:27 2024 +0800

    [FLINK-33677][core] Remove flink-conf.yaml from flink dist.
    
    This closes #25234.
---
 .../common/serialization/SerializerConfigImpl.java |   7 -
 .../apache/flink/configuration/Configuration.java  |  44 ++---
 .../ConfigurationFileMigrationUtils.java           | 173 +++++++++++++++++++
 .../flink/configuration/ConfigurationUtils.java    |  83 +++-------
 .../configuration/DelegatingConfiguration.java     |  18 +-
 .../flink/configuration/GlobalConfiguration.java   | 128 ++------------
 .../serialization/SerializerConfigImplTest.java    |  16 --
 .../ConfigurationFileMigrationUtilsTest.java       |  93 +++++++++++
 .../flink/configuration/ConfigurationTest.java     | 146 +++++++---------
 .../configuration/ConfigurationUtilsTest.java      | 112 ++++---------
 .../configuration/GlobalConfigurationTest.java     |  58 -------
 .../ReadableWritableConfigurationTest.java         |  62 +++----
 .../src/main/flink-bin/bin/bash-java-utils.sh      |   5 +-
 .../src/main/flink-bin/bin/config-parser-utils.sh  |   9 +-
 flink-dist/src/main/flink-bin/bin/config.sh        |   2 +-
 .../src/main/flink-bin/bin/migrate-config-file.sh  |   2 +-
 flink-dist/src/test/bin/runBashJavaUtilsCmd.sh     |   4 +-
 flink-end-to-end-tests/test-scripts/common.sh      |   3 +-
 .../reference/pyflink.table/table_environment.rst  |   2 +-
 flink-python/pyflink/common/configuration.py       |  12 +-
 flink-python/pyflink/pyflink_gateway_server.py     |  36 +---
 .../runtime/util/ConfigurationParserUtils.java     |  19 +--
 .../clusterframework/BootstrapToolsTest.java       |  18 +-
 .../runtime/util/bash/FlinkConfigLoaderTest.java   | 183 ++++++---------------
 .../state/forst/ForStStateBackendConfigTest.java   |   3 -
 .../state/RocksDBStateBackendConfigTest.java       |   3 -
 26 files changed, 517 insertions(+), 724 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
index 1ffde12a61b..9a281e4cd4d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.Preconditions;
@@ -389,12 +388,6 @@ public final class SerializerConfigImpl implements 
SerializerConfig {
                     .getOptional(PipelineOptions.SERIALIZATION_CONFIG)
                     .ifPresent(c -> 
parseSerializationConfigWithExceptionHandling(classLoader, c));
         } catch (Exception e) {
-            if (!GlobalConfiguration.isStandardYaml()) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "%s is only supported with the standard YAML 
config parser, please use \"config.yaml\" as the config file.",
-                                PipelineOptions.SERIALIZATION_CONFIG.key()));
-            }
             throw e;
         }
     }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 94e77fbd2a0..23f337681ac 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -21,7 +21,6 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -79,21 +78,11 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
      */
     protected final HashMap<String, Object> confData;
 
-    protected final boolean standardYaml;
-
     // 
--------------------------------------------------------------------------------------------
 
     /** Creates a new empty configuration. */
     public Configuration() {
         this.confData = new HashMap<>();
-        this.standardYaml = GlobalConfiguration.isStandardYaml();
-    }
-
-    @VisibleForTesting
-    @Internal
-    public Configuration(boolean standardYaml) {
-        this.confData = new HashMap<>();
-        this.standardYaml = standardYaml;
     }
 
     /**
@@ -103,7 +92,6 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
      */
     public Configuration(Configuration other) {
         this.confData = new HashMap<>(other.confData);
-        this.standardYaml = other.standardYaml;
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -175,7 +163,7 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
      */
     public String getString(String key, String defaultValue) {
         return getRawValue(key)
-                .map(o -> ConfigurationUtils.convertToString(o, standardYaml))
+                .map(o -> ConfigurationUtils.convertToString(o))
                 .orElse(defaultValue);
     }
 
@@ -818,9 +806,9 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
 
         try {
             if (option.isList()) {
-                return rawValue.map(v -> ConfigurationUtils.convertToList(v, 
clazz, standardYaml));
+                return rawValue.map(v -> ConfigurationUtils.convertToList(v, 
clazz));
             } else {
-                return rawValue.map(v -> ConfigurationUtils.convertValue(v, 
clazz, standardYaml));
+                return rawValue.map(v -> ConfigurationUtils.convertValue(v, 
clazz));
             }
         } catch (Exception e) {
             throw new IllegalArgumentException(
@@ -848,9 +836,7 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
             Map<String, String> ret =
                     
CollectionUtil.newHashMapWithExpectedSize(this.confData.size());
             for (Map.Entry<String, Object> entry : confData.entrySet()) {
-                ret.put(
-                        entry.getKey(),
-                        ConfigurationUtils.convertToString(entry.getValue(), 
standardYaml));
+                ret.put(entry.getKey(), 
ConfigurationUtils.convertToString(entry.getValue()));
             }
             return ret;
         }
@@ -867,19 +853,15 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
      */
     @Internal
     public Map<String, String> toFileWritableMap() {
-        if (standardYaml) {
-            synchronized (this.confData) {
-                Map<String, String> ret =
-                        
CollectionUtil.newHashMapWithExpectedSize(this.confData.size());
-                for (Map.Entry<String, Object> entry : confData.entrySet()) {
-                    // Because some character in standard yaml should be 
escaped by quotes, such as
-                    // '*', here we should wrap the value by Yaml pattern
-                    ret.put(entry.getKey(), 
YamlParserUtils.toYAMLString(entry.getValue()));
-                }
-                return ret;
+        synchronized (this.confData) {
+            Map<String, String> ret =
+                    
CollectionUtil.newHashMapWithExpectedSize(this.confData.size());
+            for (Map.Entry<String, Object> entry : confData.entrySet()) {
+                // Because some character in standard yaml should be escaped 
by quotes, such as
+                // '*', here we should wrap the value by Yaml pattern
+                ret.put(entry.getKey(), 
YamlParserUtils.toYAMLString(entry.getValue()));
             }
-        } else {
-            return toMap();
+            return ret;
         }
     }
 
@@ -955,7 +937,7 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                 return Optional.ofNullable(valueFromExactKey);
             }
             final Map<String, String> valueFromPrefixMap =
-                    convertToPropertiesPrefixed(confData, key, standardYaml);
+                    convertToPropertiesPrefixed(confData, key);
             if (valueFromPrefixMap.isEmpty()) {
                 return Optional.empty();
             }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationFileMigrationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationFileMigrationUtils.java
new file mode 100644
index 00000000000..869e0019e86
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationFileMigrationUtils.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class for migrating legacy Flink configuration file {@code 
flink-conf.yaml} to the new
+ * format starting from Flink 2.0. This class provides methods to load legacy 
configuration files
+ * and convert them into the new configuration format.
+ */
+public class ConfigurationFileMigrationUtils {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ConfigurationFileMigrationUtils.class);
+
+    /**
+     * This file is only used to help users migrate their legacy configuration 
files to the new
+     * configuration file `config.yaml` starting from Flink 2.0.
+     */
+    @VisibleForTesting public static final String LEGACY_FLINK_CONF_FILENAME = 
"flink-conf.yaml";
+
+    /**
+     * Migrates the legacy Flink configuration from the specified directory to 
a standard YAML
+     * format representation.
+     *
+     * <p>This method loads the legacy configuration file named {@code 
flink-conf.yaml} from the
+     * specified directory. If the file is found, it converts the legacy 
format into a standard
+     * {@link Configuration} object in YAML format.
+     *
+     * @param configDir the directory where the legacy configuration file is 
located
+     * @return a {@link Configuration} object in standard YAML format
+     */
+    public static Configuration migrateLegacyToStandardYamlConfig(final String 
configDir) {
+        if (configDir == null) {
+            throw new IllegalArgumentException(
+                    "Given configuration directory is null, cannot load 
configuration");
+        }
+
+        final File confDirFile = new File(configDir);
+        if (!(confDirFile.exists())) {
+            throw new IllegalConfigurationException(
+                    "The given configuration directory name '"
+                            + configDir
+                            + "' ("
+                            + confDirFile.getAbsolutePath()
+                            + ") does not describe an existing directory.");
+        }
+
+        // get Flink yaml configuration file
+        Map<String, String> configuration;
+        File yamlConfigFile = new File(confDirFile, 
LEGACY_FLINK_CONF_FILENAME);
+        if (!yamlConfigFile.exists()) {
+            throw new IllegalConfigurationException(
+                    "The Flink config file '"
+                            + yamlConfigFile
+                            + "' ("
+                            + yamlConfigFile.getAbsolutePath()
+                            + ") does not exist.");
+        } else {
+            LOG.info(
+                    "Using legacy YAML parser to load flink configuration file 
from {}.",
+                    yamlConfigFile.getAbsolutePath());
+            configuration = loadLegacyYAMLResource(yamlConfigFile);
+        }
+
+        Configuration standardYamlConfig = new Configuration();
+        configuration.forEach(standardYamlConfig::setString);
+        return standardYamlConfig;
+    }
+
+    /**
+     * Loads a YAML-file of key-value pairs.
+     *
+     * <p>Colon and whitespace ": " separate key and value (one per line). The 
hash tag "#" starts a
+     * single-line comment.
+     *
+     * <p>Example:
+     *
+     * <pre>
+     * jobmanager.rpc.address: localhost # network address for communication 
with the job manager
+     * jobmanager.rpc.port   : 6123      # network port to connect to for 
communication with the job manager
+     * taskmanager.rpc.port  : 6122      # network port the task manager 
expects incoming IPC connections
+     * </pre>
+     *
+     * <p>This does not span the whole YAML specification, but only the 
*syntax* of simple YAML
+     * key-value pairs (see issue #113 on GitHub). If at any point in time, 
there is a need to go
+     * beyond simple key-value pairs syntax compatibility will allow to 
introduce a YAML parser
+     * library.
+     *
+     * @param file the YAML file to read from
+     * @see <a href="http://www.yaml.org/spec/1.2/spec.html";>YAML 1.2 
specification</a>
+     */
+    @VisibleForTesting
+    public static Map<String, String> loadLegacyYAMLResource(File file) {
+        Map<String, String> config = new HashMap<>();
+        try (BufferedReader reader =
+                new BufferedReader(new InputStreamReader(new 
FileInputStream(file)))) {
+
+            String line;
+            int lineNo = 0;
+            while ((line = reader.readLine()) != null) {
+                lineNo++;
+                // 1. check for comments
+                String[] comments = line.split("#", 2);
+                String conf = comments[0].trim();
+
+                // 2. get key and value
+                if (conf.length() > 0) {
+                    String[] kv = conf.split(": ", 2);
+
+                    // skip line with no valid key-value pair
+                    if (kv.length == 1) {
+                        LOG.warn(
+                                "Error while trying to split key and value in 
configuration file "
+                                        + file
+                                        + ":"
+                                        + lineNo
+                                        + ": Line is not a key-value pair 
(missing space after ':'?)");
+                        continue;
+                    }
+
+                    String key = kv[0].trim();
+                    String value = kv[1].trim();
+
+                    // sanity check
+                    if (key.length() == 0 || value.length() == 0) {
+                        LOG.warn(
+                                "Error after splitting key and value in 
configuration file "
+                                        + file
+                                        + ":"
+                                        + lineNo
+                                        + ": Key or value was empty");
+                        continue;
+                    }
+
+                    config.put(key, value);
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error parsing YAML configuration.", e);
+        }
+
+        return config;
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 9cbf8810023..1802253474d 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -41,7 +41,6 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
-import static 
org.apache.flink.configuration.StructuredOptionsSplitter.escapeWithSingleQuote;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Utility class for {@link Configuration} related helper functions. */
@@ -131,11 +130,11 @@ public class ConfigurationUtils {
      * @return parsed map
      */
     public static Map<String, String> parseStringToMap(String 
stringSerializedMap) {
-        return convertToProperties(stringSerializedMap, 
GlobalConfiguration.isStandardYaml());
+        return convertToProperties(stringSerializedMap);
     }
 
     public static String parseMapToString(Map<String, String> map) {
-        return convertToString(map, GlobalConfiguration.isStandardYaml());
+        return convertToString(map);
     }
 
     public static Time getStandaloneClusterStartupPeriodTime(Configuration 
configuration) {
@@ -247,7 +246,7 @@ public class ConfigurationUtils {
      */
     public static List<String> convertConfigToWritableLines(
             Configuration configuration, boolean flattenYaml) {
-        if (configuration.standardYaml && !flattenYaml) {
+        if (!flattenYaml) {
             return YamlParserUtils.convertAndDumpYamlFromFlatMap(
                     Collections.unmodifiableMap(configuration.confData));
         } else {
@@ -372,11 +371,6 @@ public class ConfigurationUtils {
      */
     @SuppressWarnings("unchecked")
     public static <T> T convertValue(Object rawValue, Class<?> clazz) {
-        return convertValue(rawValue, clazz, 
GlobalConfiguration.isStandardYaml());
-    }
-
-    @SuppressWarnings("unchecked")
-    public static <T> T convertValue(Object rawValue, Class<?> clazz, boolean 
standardYaml) {
         if (Integer.class.equals(clazz)) {
             return (T) convertToInt(rawValue);
         } else if (Long.class.equals(clazz)) {
@@ -388,7 +382,7 @@ public class ConfigurationUtils {
         } else if (Double.class.equals(clazz)) {
             return (T) convertToDouble(rawValue);
         } else if (String.class.equals(clazz)) {
-            return (T) convertToString(rawValue, standardYaml);
+            return (T) convertToString(rawValue);
         } else if (clazz.isEnum()) {
             return (T) convertToEnum(rawValue, (Class<? extends Enum<?>>) 
clazz);
         } else if (clazz == Duration.class) {
@@ -396,17 +390,17 @@ public class ConfigurationUtils {
         } else if (clazz == MemorySize.class) {
             return (T) convertToMemorySize(rawValue);
         } else if (clazz == Map.class) {
-            return (T) convertToProperties(rawValue, standardYaml);
+            return (T) convertToProperties(rawValue);
         }
 
         throw new IllegalArgumentException("Unsupported type: " + clazz);
     }
 
     @SuppressWarnings("unchecked")
-    static <T> T convertToList(Object rawValue, Class<?> atomicClass, boolean 
standardYaml) {
+    public static <T> T convertToList(Object rawValue, Class<?> atomicClass) {
         if (rawValue instanceof List) {
             return (T) rawValue;
-        } else if (standardYaml) {
+        } else {
             try {
                 List<Object> data =
                         YamlParserUtils.convertToObject(rawValue.toString(), 
List.class);
@@ -417,20 +411,18 @@ public class ConfigurationUtils {
                 if (atomicClass == Map.class) {
                     return (T)
                             data.stream()
-                                    .map(map -> 
convertToStringMap((Map<Object, Object>) map, true))
+                                    .map(map -> 
convertToStringMap((Map<Object, Object>) map))
                                     .collect(Collectors.toList());
                 }
 
                 return (T)
                         data.stream()
-                                .map(s -> convertValue(s, atomicClass, true))
+                                .map(s -> convertValue(s, atomicClass))
                                 .collect(Collectors.toList());
             } catch (Exception e) {
                 // Fallback to legacy pattern
                 return convertToListWithLegacyProperties(rawValue, 
atomicClass);
             }
-        } else {
-            return convertToListWithLegacyProperties(rawValue, atomicClass);
         }
     }
 
@@ -438,24 +430,22 @@ public class ConfigurationUtils {
     private static <T> T convertToListWithLegacyProperties(Object rawValue, 
Class<?> atomicClass) {
         return (T)
                 StructuredOptionsSplitter.splitEscaped(rawValue.toString(), 
';').stream()
-                        .map(s -> convertValue(s, atomicClass, false))
+                        .map(s -> convertValue(s, atomicClass))
                         .collect(Collectors.toList());
     }
 
     @SuppressWarnings("unchecked")
-    static Map<String, String> convertToProperties(Object o, boolean 
standardYaml) {
+    static Map<String, String> convertToProperties(Object o) {
         if (o instanceof Map) {
             return (Map<String, String>) o;
-        } else if (standardYaml) {
+        } else {
             try {
                 Map<Object, Object> map = 
YamlParserUtils.convertToObject(o.toString(), Map.class);
-                return convertToStringMap(map, true);
+                return convertToStringMap(map);
             } catch (Exception e) {
                 // Fallback to legacy pattern
                 return convertToPropertiesWithLegacyPattern(o);
             }
-        } else {
-            return convertToPropertiesWithLegacyPattern(o);
         }
     }
 
@@ -475,13 +465,12 @@ public class ConfigurationUtils {
                 .collect(Collectors.toMap(a -> a.get(0), a -> a.get(1)));
     }
 
-    private static Map<String, String> convertToStringMap(
-            Map<Object, Object> map, boolean standardYaml) {
+    private static Map<String, String> convertToStringMap(Map<Object, Object> 
map) {
         return map.entrySet().stream()
                 .collect(
                         Collectors.toMap(
-                                entry -> convertToString(entry.getKey(), 
standardYaml),
-                                entry -> convertToString(entry.getValue(), 
standardYaml)));
+                                entry -> convertToString(entry.getKey()),
+                                entry -> convertToString(entry.getValue())));
     }
 
     @SuppressWarnings("unchecked")
@@ -521,42 +510,12 @@ public class ConfigurationUtils {
         return MemorySize.parse(o.toString());
     }
 
-    static String convertToString(Object o, boolean standardYaml) {
-        if (standardYaml) {
-            if (o.getClass() == String.class) {
-                return (String) o;
-            } else {
-                return YamlParserUtils.toYAMLString(o);
-            }
-        }
-
+    static String convertToString(Object o) {
         if (o.getClass() == String.class) {
             return (String) o;
-        } else if (o.getClass() == Duration.class) {
-            Duration duration = (Duration) o;
-            return TimeUtils.formatWithHighestUnit(duration);
-        } else if (o instanceof List) {
-            return ((List<?>) o)
-                    .stream()
-                            .map(e -> escapeWithSingleQuote(convertToString(e, 
false), ";"))
-                            .collect(Collectors.joining(";"));
-        } else if (o instanceof Map) {
-            return ((Map<?, ?>) o)
-                    .entrySet().stream()
-                            .map(
-                                    e -> {
-                                        String escapedKey =
-                                                
escapeWithSingleQuote(e.getKey().toString(), ":");
-                                        String escapedValue =
-                                                
escapeWithSingleQuote(e.getValue().toString(), ":");
-
-                                        return escapeWithSingleQuote(
-                                                escapedKey + ":" + 
escapedValue, ",");
-                                    })
-                            .collect(Collectors.joining(","));
+        } else {
+            return YamlParserUtils.toYAMLString(o);
         }
-
-        return o.toString();
     }
 
     static Integer convertToInt(Object o) {
@@ -666,14 +625,14 @@ public class ConfigurationUtils {
     }
 
     static Map<String, String> convertToPropertiesPrefixed(
-            Map<String, Object> confData, String key, boolean standardYaml) {
+            Map<String, Object> confData, String key) {
         final String prefixKey = key + ".";
         return confData.keySet().stream()
                 .filter(k -> k.startsWith(prefixKey))
                 .collect(
                         Collectors.toMap(
                                 k -> k.substring(prefixKey.length()),
-                                k -> convertToString(confData.get(k), 
standardYaml)));
+                                k -> convertToString(confData.get(k))));
     }
 
     static boolean containsPrefixMap(Map<String, Object> confData, String key) 
{
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index f0f147b884f..0f3748f62f5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -322,19 +322,15 @@ public final class DelegatingConfiguration extends 
Configuration {
 
     @Override
     public Map<String, String> toFileWritableMap() {
-        if (backingConfig.standardYaml) {
-            Map<String, String> map = backingConfig.toFileWritableMap();
-            Map<String, String> prefixed = new HashMap<>();
-            for (Map.Entry<String, String> entry : map.entrySet()) {
-                if (entry.getKey().startsWith(prefix)) {
-                    String keyWithoutPrefix = 
entry.getKey().substring(prefix.length());
-                    prefixed.put(keyWithoutPrefix, 
YamlParserUtils.toYAMLString(entry.getValue()));
-                }
+        Map<String, String> map = backingConfig.toFileWritableMap();
+        Map<String, String> prefixed = new HashMap<>();
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+            if (entry.getKey().startsWith(prefix)) {
+                String keyWithoutPrefix = 
entry.getKey().substring(prefix.length());
+                prefixed.put(keyWithoutPrefix, 
YamlParserUtils.toYAMLString(entry.getValue()));
             }
-            return prefixed;
-        } else {
-            return toMap();
         }
+        return prefixed;
     }
 
     @Override
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 8e680de327b..66225307287 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -19,7 +19,6 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -27,11 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,8 +40,6 @@ public final class GlobalConfiguration {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(GlobalConfiguration.class);
 
-    public static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
-
     public static final String FLINK_CONF_FILENAME = "config.yaml";
 
     // key separator character
@@ -70,8 +63,6 @@ public final class GlobalConfiguration {
     // the hidden content to be displayed
     public static final String HIDDEN_CONTENT = "******";
 
-    private static boolean standardYaml = true;
-
     // 
--------------------------------------------------------------------------------------------
 
     private GlobalConfiguration() {}
@@ -143,31 +134,20 @@ public final class GlobalConfiguration {
         }
 
         // get Flink yaml configuration file
-        File yamlConfigFile = new File(confDirFile, 
LEGACY_FLINK_CONF_FILENAME);
         Configuration configuration;
-
+        File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
         if (!yamlConfigFile.exists()) {
-            yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
-            if (!yamlConfigFile.exists()) {
-                throw new IllegalConfigurationException(
-                        "The Flink config file '"
-                                + yamlConfigFile
-                                + "' ("
-                                + yamlConfigFile.getAbsolutePath()
-                                + ") does not exist.");
-            } else {
-                standardYaml = true;
-                LOG.info(
-                        "Using standard YAML parser to load flink 
configuration file from {}.",
-                        yamlConfigFile.getAbsolutePath());
-                configuration = loadYAMLResource(yamlConfigFile);
-            }
+            throw new IllegalConfigurationException(
+                    "The Flink config file '"
+                            + yamlConfigFile
+                            + "' ("
+                            + yamlConfigFile.getAbsolutePath()
+                            + ") does not exist.");
         } else {
-            standardYaml = false;
             LOG.info(
-                    "Using legacy YAML parser to load flink configuration file 
from {}.",
+                    "Using standard YAML parser to load flink configuration 
file from {}.",
                     yamlConfigFile.getAbsolutePath());
-            configuration = loadLegacyYAMLResource(yamlConfigFile);
+            configuration = loadYAMLResource(yamlConfigFile);
         }
 
         logConfiguration("Loading", configuration);
@@ -190,81 +170,6 @@ public final class GlobalConfiguration {
                                 isSensitive(key) ? HIDDEN_CONTENT : value));
     }
 
-    /**
-     * Loads a YAML-file of key-value pairs.
-     *
-     * <p>Colon and whitespace ": " separate key and value (one per line). The 
hash tag "#" starts a
-     * single-line comment.
-     *
-     * <p>Example:
-     *
-     * <pre>
-     * jobmanager.rpc.address: localhost # network address for communication 
with the job manager
-     * jobmanager.rpc.port   : 6123      # network port to connect to for 
communication with the job manager
-     * taskmanager.rpc.port  : 6122      # network port the task manager 
expects incoming IPC connections
-     * </pre>
-     *
-     * <p>This does not span the whole YAML specification, but only the 
*syntax* of simple YAML
-     * key-value pairs (see issue #113 on GitHub). If at any point in time, 
there is a need to go
-     * beyond simple key-value pairs syntax compatibility will allow to 
introduce a YAML parser
-     * library.
-     *
-     * @param file the YAML file to read from
-     * @see <a href="http://www.yaml.org/spec/1.2/spec.html";>YAML 1.2 
specification</a>
-     */
-    private static Configuration loadLegacyYAMLResource(File file) {
-        final Configuration config = new Configuration();
-
-        try (BufferedReader reader =
-                new BufferedReader(new InputStreamReader(new 
FileInputStream(file)))) {
-
-            String line;
-            int lineNo = 0;
-            while ((line = reader.readLine()) != null) {
-                lineNo++;
-                // 1. check for comments
-                String[] comments = line.split("#", 2);
-                String conf = comments[0].trim();
-
-                // 2. get key and value
-                if (conf.length() > 0) {
-                    String[] kv = conf.split(": ", 2);
-
-                    // skip line with no valid key-value pair
-                    if (kv.length == 1) {
-                        LOG.warn(
-                                "Error while trying to split key and value in 
configuration file "
-                                        + file
-                                        + ":"
-                                        + lineNo
-                                        + ": Line is not a key-value pair 
(missing space after ':'?)");
-                        continue;
-                    }
-
-                    String key = kv[0].trim();
-                    String value = kv[1].trim();
-
-                    // sanity check
-                    if (key.length() == 0 || value.length() == 0) {
-                        LOG.warn(
-                                "Error after splitting key and value in 
configuration file "
-                                        + file
-                                        + ":"
-                                        + lineNo
-                                        + ": Key or value was empty");
-                        continue;
-                    }
-
-                    config.setString(key, value);
-                }
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Error parsing YAML configuration.", e);
-        }
-
-        return config;
-    }
-
     /**
      * Flattens a nested configuration map to be only one level deep.
      *
@@ -370,19 +275,6 @@ public final class GlobalConfiguration {
     }
 
     public static String getFlinkConfFilename() {
-        if (isStandardYaml()) {
-            return FLINK_CONF_FILENAME;
-        } else {
-            return LEGACY_FLINK_CONF_FILENAME;
-        }
-    }
-
-    public static boolean isStandardYaml() {
-        return standardYaml;
-    }
-
-    @VisibleForTesting
-    public static void setStandardYaml(boolean standardYaml) {
-        GlobalConfiguration.standardYaml = standardYaml;
+        return FLINK_CONF_FILENAME;
     }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
index e3b361eee23..96799e42f51 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
@@ -271,21 +270,6 @@ class SerializerConfigImplTest {
                                 SerializerConfigImplTest.class, 
TestTypeInfoFactory.class));
     }
 
-    @Test
-    void testLoadingSerializationConfigWithLegacyParser() {
-        GlobalConfiguration.setStandardYaml(false);
-        String serializationConfigStr =
-                
"{org.apache.flink.api.common.serialization.SerializerConfigImplTest:"
-                        + " {type: pojo},"
-                        + " 
org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1:"
-                        + " {type: pojo}}";
-        assertThatThrownBy(() -> 
getConfiguredSerializerConfig(serializationConfigStr))
-                .isInstanceOf(UnsupportedOperationException.class);
-
-        // Clear the standard yaml flag to avoid impact to other cases.
-        GlobalConfiguration.setStandardYaml(true);
-    }
-
     @Test
     void testLoadingIllegalSerializationConfig() {
         String duplicateClassConfigStr =
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationFileMigrationUtilsTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationFileMigrationUtilsTest.java
new file mode 100644
index 00000000000..ff5e235fe49
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationFileMigrationUtilsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConfigurationFileMigrationUtilsTest {
+
+    @TempDir private File tmpDir;
+
+    @Test
+    void testConfigurationWithLegacyYAML() throws FileNotFoundException {
+        File confFile =
+                new File(tmpDir, 
ConfigurationFileMigrationUtils.LEGACY_FLINK_CONF_FILENAME);
+        try (PrintWriter pw = new PrintWriter(confFile)) {
+            pw.println("###########################"); // should be skipped
+            pw.println("# Some : comments : to skip"); // should be skipped
+            pw.println("###########################"); // should be skipped
+            pw.println("mykey1: myvalue1"); // OK, simple correct case
+            pw.println("mykey2       : myvalue2"); // OK, whitespace before 
colon is correct
+            pw.println("mykey3:myvalue3"); // SKIP, missing white space after 
colon
+            pw.println(" some nonsense without colon and whitespace 
separator"); // SKIP
+            pw.println(" :  "); // SKIP
+            pw.println("   "); // SKIP (silently)
+            pw.println(" "); // SKIP (silently)
+            pw.println("mykey4: myvalue4# some comments"); // OK, skip 
comments only
+            pw.println("   mykey5    :    myvalue5    "); // OK, trim 
unnecessary whitespace
+            pw.println("mykey6: my: value6"); // OK, only use first ': ' as 
separator
+            pw.println("mykey7: "); // SKIP, no value provided
+            pw.println(": myvalue8"); // SKIP, no key provided
+
+            pw.println("mykey9: myvalue9"); // OK
+            pw.println("mykey9: myvalue10"); // OK, overwrite last value
+        }
+
+        Map<String, String> conf = 
ConfigurationFileMigrationUtils.loadLegacyYAMLResource(confFile);
+
+        // all distinct keys from confFile1 + confFile2 key
+        assertThat(conf.keySet()).hasSize(6);
+
+        // keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected values
+        assertThat(conf.getOrDefault("mykey1", null)).isEqualTo("myvalue1");
+        assertThat(conf.getOrDefault("mykey1", null)).isEqualTo("myvalue1");
+        assertThat(conf.getOrDefault("mykey2", null)).isEqualTo("myvalue2");
+        assertThat(conf.getOrDefault("mykey3", "null")).isEqualTo("null");
+        assertThat(conf.getOrDefault("mykey4", null)).isEqualTo("myvalue4");
+        assertThat(conf.getOrDefault("mykey5", null)).isEqualTo("myvalue5");
+        assertThat(conf.getOrDefault("mykey6", null)).isEqualTo("my: value6");
+        assertThat(conf.getOrDefault("mykey7", "null")).isEqualTo("null");
+        assertThat(conf.getOrDefault("mykey8", "null")).isEqualTo("null");
+        assertThat(conf.getOrDefault("mykey9", null)).isEqualTo("myvalue10");
+    }
+
+    @Test
+    // We allow malformed YAML files if loading legacy flink conf
+    void testInvalidLegacyYamlFile() throws IOException {
+        final File confFile =
+                new File(
+                        tmpDir.getPath(),
+                        
ConfigurationFileMigrationUtils.LEGACY_FLINK_CONF_FILENAME);
+
+        try (PrintWriter pw = new PrintWriter(confFile)) {
+            pw.append("invalid");
+        }
+
+        
assertThat(ConfigurationFileMigrationUtils.loadLegacyYAMLResource(confFile)).isNotNull();
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index ff6d6bcf9db..7fde75c5b32 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -18,20 +18,15 @@
 
 package org.apache.flink.configuration;
 
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.assertj.core.data.Offset;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,17 +39,9 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
  * This class contains test for the configuration package. In particular, the 
serialization of
  * {@link Configuration} objects is tested.
  */
-@ExtendWith(ParameterizedTestExtension.class)
 @SuppressWarnings("deprecation")
 class ConfigurationTest {
 
-    @Parameter private boolean standardYaml;
-
-    @Parameters(name = "standardYaml: {0}")
-    private static Collection<Boolean> parameters() {
-        return Arrays.asList(true, false);
-    }
-
     private static final ConfigOption<String> STRING_OPTION =
             ConfigOptions.key("test-string-key").stringType().noDefaultValue();
 
@@ -79,9 +66,9 @@ class ConfigurationTest {
     private static final String MAP_PROPERTY_2 = MAP_OPTION.key() + ".prop2";
 
     /** This test checks the serialization/deserialization of configuration 
objects. */
-    @TestTemplate
+    @Test
     void testConfigurationSerializationAndGetters() throws 
ClassNotFoundException, IOException {
-        final Configuration orig = new Configuration(standardYaml);
+        final Configuration orig = new Configuration();
         orig.setString("mykey", "myvalue");
         orig.setInteger("mynumber", 100);
         orig.setLong("longvalue", 478236947162389746L);
@@ -107,11 +94,11 @@ class ConfigurationTest {
         assertThat(copy).hasSameHashCodeAs(orig);
     }
 
-    @TestTemplate
+    @Test
     void testCopyConstructor() {
         final String key = "theKey";
 
-        Configuration cfg1 = new Configuration(standardYaml);
+        Configuration cfg1 = new Configuration();
         cfg1.setString(key, "value");
 
         Configuration cfg2 = new Configuration(cfg1);
@@ -120,9 +107,9 @@ class ConfigurationTest {
         assertThat(cfg1.getString(key, "")).isEqualTo("value");
     }
 
-    @TestTemplate
+    @Test
     void testOptionWithDefault() {
-        Configuration cfg = new Configuration(standardYaml);
+        Configuration cfg = new Configuration();
         cfg.setInteger("int-key", 11);
         cfg.setString("string-key", "abc");
 
@@ -155,9 +142,9 @@ class ConfigurationTest {
         assertThat(cfg.getValue(intOption)).isEqualTo("87");
     }
 
-    @TestTemplate
+    @Test
     void testOptionWithNoDefault() {
-        Configuration cfg = new Configuration(standardYaml);
+        Configuration cfg = new Configuration();
         cfg.setInteger("int-key", 11);
         cfg.setString("string-key", "abc");
 
@@ -179,9 +166,9 @@ class ConfigurationTest {
         assertThat(cfg.getString(stringOption, 
"override")).isEqualTo("override");
     }
 
-    @TestTemplate
+    @Test
     void testDeprecatedKeys() {
-        Configuration cfg = new Configuration(standardYaml);
+        Configuration cfg = new Configuration();
         cfg.setInteger("the-key", 11);
         cfg.setInteger("old-key", 12);
         cfg.setInteger("older-key", 13);
@@ -216,9 +203,9 @@ class ConfigurationTest {
         assertThat(cfg.getInteger(notContained)).isEqualTo(-1);
     }
 
-    @TestTemplate
+    @Test
     void testFallbackKeys() {
-        Configuration cfg = new Configuration(standardYaml);
+        Configuration cfg = new Configuration();
         cfg.setInteger("the-key", 11);
         cfg.setInteger("old-key", 12);
         cfg.setInteger("older-key", 13);
@@ -253,7 +240,7 @@ class ConfigurationTest {
         assertThat(cfg.getInteger(notContained)).isEqualTo(-1);
     }
 
-    @TestTemplate
+    @Test
     void testFallbackAndDeprecatedKeys() {
         final ConfigOption<Integer> fallback =
                 ConfigOptions.key("fallback").intType().defaultValue(-1);
@@ -268,11 +255,11 @@ class ConfigurationTest {
                         .withFallbackKeys(fallback.key())
                         .withDeprecatedKeys(deprecated.key());
 
-        final Configuration fallbackCfg = new Configuration(standardYaml);
+        final Configuration fallbackCfg = new Configuration();
         fallbackCfg.setInteger(fallback, 1);
         assertThat(fallbackCfg.getInteger(mainOption)).isOne();
 
-        final Configuration deprecatedCfg = new Configuration(standardYaml);
+        final Configuration deprecatedCfg = new Configuration();
         deprecatedCfg.setInteger(deprecated, 2);
         assertThat(deprecatedCfg.getInteger(mainOption)).isEqualTo(2);
 
@@ -285,16 +272,16 @@ class ConfigurationTest {
                         .withDeprecatedKeys(deprecated.key())
                         .withFallbackKeys(fallback.key());
 
-        final Configuration deprecatedAndFallBackConfig = new 
Configuration(standardYaml);
+        final Configuration deprecatedAndFallBackConfig = new Configuration();
         deprecatedAndFallBackConfig.setInteger(fallback, 1);
         deprecatedAndFallBackConfig.setInteger(deprecated, 2);
         assertThat(deprecatedAndFallBackConfig.getInteger(mainOption)).isOne();
         
assertThat(deprecatedAndFallBackConfig.getInteger(reversedMainOption)).isOne();
     }
 
-    @TestTemplate
+    @Test
     void testRemove() {
-        Configuration cfg = new Configuration(standardYaml);
+        Configuration cfg = new Configuration();
         cfg.setInteger("a", 1);
         cfg.setInteger("b", 2);
 
@@ -318,9 +305,9 @@ class ConfigurationTest {
                 .as("Expected 'unexistedOption' is not removed");
     }
 
-    @TestTemplate
+    @Test
     void testRemoveKey() {
-        Configuration cfg = new Configuration(standardYaml);
+        Configuration cfg = new Configuration();
         String key1 = "a.b";
         String key2 = "c.d";
         cfg.setInteger(key1, 42);
@@ -337,27 +324,27 @@ class ConfigurationTest {
         assertThat(cfg.keySet()).containsExactlyInAnyOrder("e.f");
     }
 
-    @TestTemplate
+    @Test
     void testShouldParseValidStringToEnum() {
-        final Configuration configuration = new Configuration(standardYaml);
+        final Configuration configuration = new Configuration();
         configuration.setString(STRING_OPTION.key(), 
TestEnum.VALUE1.toString());
 
         final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, 
STRING_OPTION);
         assertThat(TestEnum.VALUE1).isEqualTo(parsedEnumValue);
     }
 
-    @TestTemplate
+    @Test
     void testShouldParseValidStringToEnumIgnoringCase() {
-        final Configuration configuration = new Configuration(standardYaml);
+        final Configuration configuration = new Configuration();
         configuration.setString(STRING_OPTION.key(), 
TestEnum.VALUE1.toString().toLowerCase());
 
         final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, 
STRING_OPTION);
         assertThat(TestEnum.VALUE1).isEqualTo(parsedEnumValue);
     }
 
-    @TestTemplate
+    @Test
     void testThrowsExceptionIfTryingToParseInvalidStringForEnum() {
-        final Configuration configuration = new Configuration(standardYaml);
+        final Configuration configuration = new Configuration();
         final String invalidValueForTestEnum = "InvalidValueForTestEnum";
         configuration.setString(STRING_OPTION.key(), invalidValueForTestEnum);
 
@@ -371,9 +358,9 @@ class ConfigurationTest {
                                 + ")");
     }
 
-    @TestTemplate
+    @Test
     void testToMap() {
-        final Configuration configuration = new Configuration(standardYaml);
+        final Configuration configuration = new Configuration();
         final String listValues = "value1;value2;value3";
         final String yamlListValues = "[value1, value2, value3]";
         configuration.set(LIST_STRING_OPTION, 
Arrays.asList(listValues.split(";")));
@@ -388,20 +375,14 @@ class ConfigurationTest {
         final Duration duration = Duration.ofMillis(3000);
         configuration.set(DURATION_OPTION, duration);
 
-        if (standardYaml) {
-            assertThat(yamlListValues)
-                    
.isEqualTo(configuration.toMap().get(LIST_STRING_OPTION.key()));
-            
assertThat(yamlMapValues).isEqualTo(configuration.toMap().get(MAP_OPTION.key()));
-        } else {
-            
assertThat(listValues).isEqualTo(configuration.toMap().get(LIST_STRING_OPTION.key()));
-            
assertThat(mapValues).isEqualTo(configuration.toMap().get(MAP_OPTION.key()));
-        }
+        
assertThat(yamlListValues).isEqualTo(configuration.toMap().get(LIST_STRING_OPTION.key()));
+        
assertThat(yamlMapValues).isEqualTo(configuration.toMap().get(MAP_OPTION.key()));
         assertThat("3 
s").isEqualTo(configuration.toMap().get(DURATION_OPTION.key()));
     }
 
-    @TestTemplate
+    @Test
     void testToFileWritableMap() {
-        final Configuration configuration = new Configuration(standardYaml);
+        final Configuration configuration = new Configuration();
         final String listValues = "value1;value2;value3";
         final String yamlListValues = "[value1, value2, value3]";
         configuration.set(LIST_STRING_OPTION, 
Arrays.asList(listValues.split(";")));
@@ -420,35 +401,26 @@ class ConfigurationTest {
         final String yamlStrValues = "'*'";
         configuration.set(STRING_OPTION, strValues);
 
-        if (standardYaml) {
-            
assertThat(configuration.toFileWritableMap().get(LIST_STRING_OPTION.key()))
-                    .isEqualTo(yamlListValues);
-            assertThat(configuration.toFileWritableMap().get(MAP_OPTION.key()))
-                    .isEqualTo(yamlMapValues);
-            
assertThat(configuration.toFileWritableMap().get(STRING_OPTION.key()))
-                    .isEqualTo(yamlStrValues);
-        } else {
-            
assertThat(configuration.toFileWritableMap().get(LIST_STRING_OPTION.key()))
-                    .isEqualTo(listValues);
-            assertThat(configuration.toFileWritableMap().get(MAP_OPTION.key()))
-                    .isEqualTo(mapValues);
-            
assertThat(configuration.toFileWritableMap().get(STRING_OPTION.key()))
-                    .isEqualTo(strValues);
-        }
+        
assertThat(configuration.toFileWritableMap().get(LIST_STRING_OPTION.key()))
+                .isEqualTo(yamlListValues);
+        assertThat(configuration.toFileWritableMap().get(MAP_OPTION.key()))
+                .isEqualTo(yamlMapValues);
+        assertThat(configuration.toFileWritableMap().get(STRING_OPTION.key()))
+                .isEqualTo(yamlStrValues);
         
assertThat(configuration.toMap().get(DURATION_OPTION.key())).isEqualTo("3 s");
     }
 
-    @TestTemplate
+    @Test
     void testMapNotContained() {
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
 
         assertThat(cfg.getOptional(MAP_OPTION)).isNotPresent();
         assertThat(cfg.contains(MAP_OPTION)).isFalse();
     }
 
-    @TestTemplate
+    @Test
     void testMapWithPrefix() {
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
         cfg.setString(MAP_PROPERTY_1, "value1");
         cfg.setInteger(MAP_PROPERTY_2, 12);
 
@@ -456,18 +428,18 @@ class ConfigurationTest {
         assertThat(cfg.contains(MAP_OPTION)).isTrue();
     }
 
-    @TestTemplate
+    @Test
     void testMapWithoutPrefix() {
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
         cfg.set(MAP_OPTION, PROPERTIES_MAP);
 
         assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP);
         assertThat(cfg.contains(MAP_OPTION)).isTrue();
     }
 
-    @TestTemplate
+    @Test
     void testMapNonPrefixHasPrecedence() {
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
         cfg.set(MAP_OPTION, PROPERTIES_MAP);
         cfg.setString(MAP_PROPERTY_1, "value1");
         cfg.setInteger(MAP_PROPERTY_2, 99999);
@@ -477,9 +449,9 @@ class ConfigurationTest {
         assertThat(cfg.containsKey(MAP_PROPERTY_1)).isTrue();
     }
 
-    @TestTemplate
+    @Test
     void testMapThatOverwritesPrefix() {
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
         cfg.setString(MAP_PROPERTY_1, "value1");
         cfg.setInteger(MAP_PROPERTY_2, 99999);
         cfg.set(MAP_OPTION, PROPERTIES_MAP);
@@ -489,9 +461,9 @@ class ConfigurationTest {
         assertThat(cfg.containsKey(MAP_PROPERTY_1)).isFalse();
     }
 
-    @TestTemplate
+    @Test
     void testMapRemovePrefix() {
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
         cfg.setString(MAP_PROPERTY_1, "value1");
         cfg.setInteger(MAP_PROPERTY_2, 99999);
         cfg.removeConfig(MAP_OPTION);
@@ -501,14 +473,14 @@ class ConfigurationTest {
         assertThat(cfg.containsKey(MAP_PROPERTY_2)).isFalse();
     }
 
-    @TestTemplate
+    @Test
     void testListParserErrorDoesNotLeakSensitiveData() {
         ConfigOption<List<String>> secret =
                 
ConfigOptions.key("secret").stringType().asList().noDefaultValue();
 
         assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
 
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
         // missing closing quote
         cfg.setString(secret.key(), "'secret_value");
 
@@ -520,14 +492,14 @@ class ConfigurationTest {
                                         .doesNotContain("secret_value"));
     }
 
-    @TestTemplate
+    @Test
     void testMapParserErrorDoesNotLeakSensitiveData() {
         ConfigOption<Map<String, String>> secret =
                 ConfigOptions.key("secret").mapType().noDefaultValue();
 
         assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
 
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
         // malformed map representation
         cfg.setString(secret.key(), "secret_value");
 
@@ -539,22 +511,22 @@ class ConfigurationTest {
                                         .doesNotContain("secret_value"));
     }
 
-    @TestTemplate
+    @Test
     void testToStringDoesNotLeakSensitiveData() {
         ConfigOption<Map<String, String>> secret =
                 ConfigOptions.key("secret").mapType().noDefaultValue();
 
         assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
 
-        final Configuration cfg = new Configuration(standardYaml);
+        final Configuration cfg = new Configuration();
         cfg.setString(secret.key(), "secret_value");
 
         assertThat(cfg.toString()).doesNotContain("secret_value");
     }
 
-    @TestTemplate
+    @Test
     void testGetWithOverrideDefault() {
-        final Configuration conf = new Configuration(standardYaml);
+        final Configuration conf = new Configuration();
 
         // Test for integer without default value.
         ConfigOption<Integer> integerOption0 =
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
index 547b91f9a92..12c1dab1def 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
@@ -18,20 +18,14 @@
 
 package org.apache.flink.configuration;
 
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.TimeUtils;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -44,17 +38,9 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ConfigurationUtils}. */
-@ExtendWith(ParameterizedTestExtension.class)
 class ConfigurationUtilsTest {
 
-    @Parameter private boolean standardYaml;
-
-    @Parameters(name = "standardYaml: {0}")
-    private static Collection<Boolean> parameters() {
-        return Arrays.asList(true, false);
-    }
-
-    @TestTemplate
+    @Test
     void testPropertiesToConfiguration() {
         final Properties properties = new Properties();
         final int entries = 10;
@@ -82,7 +68,7 @@ class ConfigurationUtilsTest {
         expectedMap.put("k2", "v2");
         String legacyMapPattern = "k1:v1,k2:v2";
 
-        Configuration configuration = new Configuration(true);
+        Configuration configuration = new Configuration();
         configuration.setString("listKey", legacyListPattern);
         configuration.setString("mapKey", legacyMapPattern);
 
@@ -97,18 +83,18 @@ class ConfigurationUtilsTest {
                 .isEqualTo(expectedMap);
     }
 
-    @TestTemplate
+    @Test
     void testConvertConfigToWritableLinesAndFlattenYaml() {
         testConvertConfigToWritableLines(true);
     }
 
-    @TestTemplate
+    @Test
     void testConvertConfigToWritableLinesAndNoFlattenYaml() {
         testConvertConfigToWritableLines(false);
     }
 
     private void testConvertConfigToWritableLines(boolean flattenYaml) {
-        final Configuration configuration = new Configuration(standardYaml);
+        final Configuration configuration = new Configuration();
         ConfigOption<List<String>> nestedListOption =
                 
ConfigOptions.key("nested.test-list-key").stringType().asList().noDefaultValue();
         final String listValues = "value1;value2;value3";
@@ -143,47 +129,35 @@ class ConfigurationUtilsTest {
         List<String> actualData =
                 ConfigurationUtils.convertConfigToWritableLines(configuration, 
flattenYaml);
         List<String> expected;
-        if (standardYaml) {
-            if (flattenYaml) {
-                expected =
-                        Arrays.asList(
-                                nestedListOption.key() + ": " + yamlListValues,
-                                nestedMapOption.key() + ": " + yamlMapValues,
-                                nestedDurationOption.key()
-                                        + ": "
-                                        + 
TimeUtils.formatWithHighestUnit(duration),
-                                nestedStringOption.key() + ": " + 
yamlStrValues,
-                                intOption.key() + ": " + intValue);
-            } else {
-                expected =
-                        Arrays.asList(
-                                "nested:",
-                                "  test-list-key:",
-                                "  - value1",
-                                "  - value2",
-                                "  - value3",
-                                "  test-map-key:",
-                                "    key1: value1",
-                                "    key2: value2",
-                                "  test-duration-key: 3 s",
-                                "  test-string-key: '*'",
-                                "test-int-key: 1");
-            }
-        } else {
+        if (flattenYaml) {
             expected =
                     Arrays.asList(
-                            nestedListOption.key() + ": " + listValues,
-                            nestedMapOption.key() + ": " + mapValues,
+                            nestedListOption.key() + ": " + yamlListValues,
+                            nestedMapOption.key() + ": " + yamlMapValues,
                             nestedDurationOption.key()
                                     + ": "
                                     + 
TimeUtils.formatWithHighestUnit(duration),
-                            nestedStringOption.key() + ": " + strValues,
+                            nestedStringOption.key() + ": " + yamlStrValues,
                             intOption.key() + ": " + intValue);
+        } else {
+            expected =
+                    Arrays.asList(
+                            "nested:",
+                            "  test-list-key:",
+                            "  - value1",
+                            "  - value2",
+                            "  - value3",
+                            "  test-map-key:",
+                            "    key1: value1",
+                            "    key2: value2",
+                            "  test-duration-key: 3 s",
+                            "  test-string-key: '*'",
+                            "test-int-key: 1");
         }
         assertThat(expected).containsExactlyInAnyOrderElementsOf(actualData);
     }
 
-    @TestTemplate
+    @Test
     void testHideSensitiveValues() {
         final Map<String, String> keyValuePairs = new HashMap<>();
         keyValuePairs.put("foobar", "barfoo");
@@ -204,7 +178,7 @@ class ConfigurationUtilsTest {
         assertThat(hiddenSensitiveValues).isEqualTo(expectedKeyValuePairs);
     }
 
-    @TestTemplate
+    @Test
     void testGetPrefixedKeyValuePairs() {
         final String prefix = "test.prefix.";
         final Map<String, String> expectedKeyValuePairs =
@@ -224,48 +198,34 @@ class ConfigurationUtilsTest {
         assertThat(resultKeyValuePairs).isEqualTo(expectedKeyValuePairs);
     }
 
-    @TestTemplate
+    @Test
     void testConvertToString() {
         // String
-        assertThat(ConfigurationUtils.convertToString("Simple String", 
standardYaml))
-                .isEqualTo("Simple String");
+        assertThat(ConfigurationUtils.convertToString("Simple 
String")).isEqualTo("Simple String");
 
         // Duration
-        assertThat(ConfigurationUtils.convertToString(Duration.ZERO, 
standardYaml))
-                .isEqualTo("0 ms");
-        assertThat(ConfigurationUtils.convertToString(Duration.ofMillis(123L), 
standardYaml))
-                .isEqualTo("123 ms");
-        
assertThat(ConfigurationUtils.convertToString(Duration.ofMillis(1_234_000L), 
standardYaml))
+        
assertThat(ConfigurationUtils.convertToString(Duration.ZERO)).isEqualTo("0 ms");
+        
assertThat(ConfigurationUtils.convertToString(Duration.ofMillis(123L))).isEqualTo("123
 ms");
+        
assertThat(ConfigurationUtils.convertToString(Duration.ofMillis(1_234_000L)))
                 .isEqualTo("1234 s");
-        assertThat(ConfigurationUtils.convertToString(Duration.ofHours(25L), 
standardYaml))
-                .isEqualTo("25 h");
+        
assertThat(ConfigurationUtils.convertToString(Duration.ofHours(25L))).isEqualTo("25
 h");
 
         // List
         List<Object> listElements = new ArrayList<>();
         listElements.add("Test;String");
         listElements.add(Duration.ZERO);
         listElements.add(42);
-        if (standardYaml) {
-            assertThat("[Test;String, 0 ms, 42]")
-                    
.isEqualTo(ConfigurationUtils.convertToString(listElements, true));
-        } else {
-            assertThat("'Test;String';0 ms;42")
-                    
.isEqualTo(ConfigurationUtils.convertToString(listElements, false));
-        }
+        assertThat("[Test;String, 0 ms, 42]")
+                .isEqualTo(ConfigurationUtils.convertToString(listElements));
         // Map
         Map<Object, Object> mapElements = new HashMap<>();
         mapElements.put("A:,B", "C:,D");
         mapElements.put(10, 20);
-        if (standardYaml) {
-            assertThat("{'A:,B': 'C:,D', 10: 20}")
-                    .isEqualTo(ConfigurationUtils.convertToString(mapElements, 
true));
-        } else {
-            assertThat("'''A:,B'':''C:,D''',10:20")
-                    .isEqualTo(ConfigurationUtils.convertToString(mapElements, 
false));
-        }
+        assertThat("{'A:,B': 'C:,D', 10: 20}")
+                .isEqualTo(ConfigurationUtils.convertToString(mapElements));
     }
 
-    @TestTemplate
+    @Test
     void testRandomTempDirectorySelection() {
         final Configuration configuration = new Configuration();
         final StringBuilder tempDirectories = new StringBuilder();
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index 603527ad8f7..5f57777d264 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -41,49 +41,6 @@ class GlobalConfigurationTest {
 
     @TempDir private File tmpDir;
 
-    @Test
-    void testConfigurationWithLegacyYAML() throws FileNotFoundException {
-        File confFile = new File(tmpDir, 
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
-        try (PrintWriter pw = new PrintWriter(confFile)) {
-            pw.println("###########################"); // should be skipped
-            pw.println("# Some : comments : to skip"); // should be skipped
-            pw.println("###########################"); // should be skipped
-            pw.println("mykey1: myvalue1"); // OK, simple correct case
-            pw.println("mykey2       : myvalue2"); // OK, whitespace before 
colon is correct
-            pw.println("mykey3:myvalue3"); // SKIP, missing white space after 
colon
-            pw.println(" some nonsense without colon and whitespace 
separator"); // SKIP
-            pw.println(" :  "); // SKIP
-            pw.println("   "); // SKIP (silently)
-            pw.println(" "); // SKIP (silently)
-            pw.println("mykey4: myvalue4# some comments"); // OK, skip 
comments only
-            pw.println("   mykey5    :    myvalue5    "); // OK, trim 
unnecessary whitespace
-            pw.println("mykey6: my: value6"); // OK, only use first ': ' as 
separator
-            pw.println("mykey7: "); // SKIP, no value provided
-            pw.println(": myvalue8"); // SKIP, no key provided
-
-            pw.println("mykey9: myvalue9"); // OK
-            pw.println("mykey9: myvalue10"); // OK, overwrite last value
-        }
-        Configuration conf = 
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-
-        // all distinct keys from confFile1 + confFile2 key
-        assertThat(conf.keySet()).hasSize(6);
-
-        // keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected values
-        assertThat(conf.getString("mykey1", null)).isEqualTo("myvalue1");
-        assertThat(conf.getString("mykey1", null)).isEqualTo("myvalue1");
-        assertThat(conf.getString("mykey2", null)).isEqualTo("myvalue2");
-        assertThat(conf.getString("mykey3", "null")).isEqualTo("null");
-        assertThat(conf.getString("mykey4", null)).isEqualTo("myvalue4");
-        assertThat(conf.getString("mykey5", null)).isEqualTo("myvalue5");
-        assertThat(conf.getString("mykey6", null)).isEqualTo("my: value6");
-        assertThat(conf.getString("mykey7", "null")).isEqualTo("null");
-        assertThat(conf.getString("mykey8", "null")).isEqualTo("null");
-        assertThat(conf.getString("mykey9", null)).isEqualTo("myvalue10");
-        // Clear the standard yaml flag to avoid impact to other cases.
-        GlobalConfiguration.setStandardYaml(true);
-    }
-
     @Test
     void testConfigurationWithStandardYAML() throws FileNotFoundException {
         File confFile = new File(tmpDir, 
GlobalConfiguration.FLINK_CONF_FILENAME);
@@ -154,21 +111,6 @@ class GlobalConfigurationTest {
                 .isInstanceOf(IllegalConfigurationException.class);
     }
 
-    @Test
-    // We allow malformed YAML files if loaded legacy flink conf
-    void testInvalidLegacyYamlFile() throws IOException {
-        final File confFile =
-                new File(tmpDir.getPath(), 
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
-
-        try (PrintWriter pw = new PrintWriter(confFile)) {
-            pw.append("invalid");
-        }
-
-        
assertThat(GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath())).isNotNull();
-        // Clear the standard yaml flag to avoid impact to other cases.
-        GlobalConfiguration.setStandardYaml(true);
-    }
-
     @Test
     // We do not allow malformed YAML files if loaded standard yaml
     void testInvalidStandardYamlFile() throws IOException {
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java
index e5af433a566..c94814450cc 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -57,56 +56,53 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith(ParameterizedTestExtension.class)
 class ReadableWritableConfigurationTest {
 
-    @Parameters(name = "testSpec = {0}, standardYaml = {1}")
-    private static Collection<Object[]> parameters() {
+    @Parameters(name = "testSpec = {0}")
+    private static Collection<TestSpec<?>> parameters() {
         List<TestSpec<?>> testSpecs =
                 Arrays.asList(
                         new 
TestSpec<>(ConfigOptions.key("int").intType().defaultValue(-1))
-                                .valueEquals(12345, "12345", "12345")
+                                .valueEquals(12345, "12345")
                                 .checkDefaultOverride(5),
                         new 
TestSpec<>(ConfigOptions.key("long").longType().defaultValue(-1L))
-                                .valueEquals(12345L, "12345", "12345")
+                                .valueEquals(12345L, "12345")
                                 .checkDefaultOverride(5L),
                         new 
TestSpec<>(ConfigOptions.key("float").floatType().defaultValue(0.01F))
-                                .valueEquals(0.003F, "0.003", "0.003")
+                                .valueEquals(0.003F, "0.003")
                                 .checkDefaultOverride(1.23F),
                         new 
TestSpec<>(ConfigOptions.key("double").doubleType().defaultValue(0.01D))
-                                .valueEquals(0.003D, "0.003", "0.003")
+                                .valueEquals(0.003D, "0.003")
                                 .checkDefaultOverride(1.23D),
                         new TestSpec<>(
                                         ConfigOptions.key("boolean")
                                                 .booleanType()
                                                 .defaultValue(false))
-                                .valueEquals(true, "true", "true")
+                                .valueEquals(true, "true")
                                 .checkDefaultOverride(true),
                         new TestSpec<>(
                                         ConfigOptions.key("list<int>")
                                                 .intType()
                                                 .asList()
                                                 .defaultValues(-1, 2, 3))
-                                .valueEquals(
-                                        Arrays.asList(1, 2, 3, 4, 5),
-                                        "1;2;3;4;5",
-                                        "[1, 2, 3, 4, 5]")
+                                .valueEquals(Arrays.asList(1, 2, 3, 4, 5), 
"[1, 2, 3, 4, 5]")
                                 .checkDefaultOverride(Arrays.asList(1, 2)),
                         new TestSpec<>(
                                         ConfigOptions.key("list<string>")
                                                 .stringType()
                                                 .asList()
                                                 .defaultValues("A", "B", "C"))
-                                .valueEquals(Arrays.asList("A;B", "C"), 
"'A;B';C", "['A;B', C]")
+                                .valueEquals(Arrays.asList("A;B", "C"), 
"['A;B', C]")
                                 
.checkDefaultOverride(Collections.singletonList("C")),
                         new TestSpec<>(
                                         ConfigOptions.key("interval")
                                                 .durationType()
                                                 
.defaultValue(Duration.ofHours(3)))
-                                .valueEquals(Duration.ofMinutes(3), "3 min", 
"3 min")
+                                .valueEquals(Duration.ofMinutes(3), "3 min")
                                 .checkDefaultOverride(Duration.ofSeconds(1)),
                         new TestSpec<>(
                                         ConfigOptions.key("memory")
                                                 .memoryType()
                                                 .defaultValue(new 
MemorySize(1024)))
-                                .valueEquals(new MemorySize(1024 * 1024 * 
1024), "1g", "1g")
+                                .valueEquals(new MemorySize(1024 * 1024 * 
1024), "1g")
                                 .checkDefaultOverride(new MemorySize(2048)),
                         new TestSpec<>(
                                         ConfigOptions.key("properties")
@@ -122,7 +118,6 @@ class ReadableWritableConfigurationTest {
                                                 Arrays.asList(
                                                         Tuple2.of("key1", 
"value1"),
                                                         Tuple2.of("key2", 
"value2"))),
-                                        "key1:value1,key2:value2",
                                         "{key1: value1, key2: value2}")
                                 .checkDefaultOverride(Collections.emptyMap()),
                         new TestSpec<>(
@@ -144,15 +139,9 @@ class ReadableWritableConfigurationTest {
                                                 asMap(
                                                         
Collections.singletonList(
                                                                 
Tuple2.of("key3", "value3")))),
-                                        "key1:value1,key2:value2;key3:value3",
                                         "[{key1: value1, key2: value2}, {key3: 
value3}]")
                                 
.checkDefaultOverride(Collections.emptyList()));
-        List<Object[]> list = new ArrayList<>();
-        for (TestSpec<?> testSpec : testSpecs) {
-            list.add(new Object[] {testSpec, true});
-            list.add(new Object[] {testSpec, false});
-        }
-        return list;
+        return testSpecs;
     }
 
     private static Map<String, String> asMap(List<Tuple2<String, String>> 
entries) {
@@ -161,12 +150,9 @@ class ReadableWritableConfigurationTest {
 
     @Parameter TestSpec<?> testSpec;
 
-    @Parameter(value = 1)
-    boolean standardYaml;
-
     @TestTemplate
     void testGetOptionalFromObject() {
-        Configuration configuration = new Configuration(standardYaml);
+        Configuration configuration = new Configuration();
         testSpec.setValue(configuration);
 
         Optional<?> optional = configuration.getOptional(testSpec.getOption());
@@ -176,8 +162,8 @@ class ReadableWritableConfigurationTest {
     @TestTemplate
     void testGetOptionalFromString() {
         ConfigOption<?> option = testSpec.getOption();
-        Configuration configuration = new Configuration(standardYaml);
-        configuration.setString(option.key(), 
testSpec.getStringValue(standardYaml));
+        Configuration configuration = new Configuration();
+        configuration.setString(option.key(), testSpec.getStringValue());
 
         Optional<?> optional = configuration.getOptional(option);
         assertThat(optional).isPresent().get().isEqualTo(testSpec.getValue());
@@ -185,7 +171,7 @@ class ReadableWritableConfigurationTest {
 
     @TestTemplate
     void testGetDefaultValue() {
-        Configuration configuration = new Configuration(standardYaml);
+        Configuration configuration = new Configuration();
 
         ConfigOption<?> option = testSpec.getOption();
         Object value = configuration.get(option);
@@ -195,7 +181,7 @@ class ReadableWritableConfigurationTest {
     @TestTemplate
     @SuppressWarnings("unchecked")
     void testGetOptionalDefaultValueOverride() {
-        ReadableConfig configuration = new Configuration(standardYaml);
+        ReadableConfig configuration = new Configuration();
 
         ConfigOption<?> option = testSpec.getOption();
         Object value =
@@ -207,7 +193,6 @@ class ReadableWritableConfigurationTest {
     private static class TestSpec<T> {
         private final ConfigOption<T> option;
         private T value;
-        private String stringValue;
         private String yamlStringValue;
         private T defaultValueOverride;
 
@@ -215,9 +200,8 @@ class ReadableWritableConfigurationTest {
             this.option = option;
         }
 
-        TestSpec<T> valueEquals(T objectValue, String stringValue, String 
yamlStringValue) {
+        TestSpec<T> valueEquals(T objectValue, String yamlStringValue) {
             this.value = objectValue;
-            this.stringValue = stringValue;
             this.yamlStringValue = yamlStringValue;
             return this;
         }
@@ -238,12 +222,8 @@ class ReadableWritableConfigurationTest {
             return value;
         }
 
-        String getStringValue(boolean standardYaml) {
-            if (standardYaml) {
-                return yamlStringValue;
-            } else {
-                return stringValue;
-            }
+        String getStringValue() {
+            return yamlStringValue;
         }
 
         T getDefaultValueOverride() {
@@ -265,8 +245,6 @@ class ReadableWritableConfigurationTest {
                     + option
                     + ", value="
                     + value
-                    + ", stringValue='"
-                    + stringValue
                     + ", yamlStringValue='"
                     + yamlStringValue
                     + '\''
diff --git a/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh 
b/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
index 4be918ff139..594ff7e361c 100755
--- a/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
+++ b/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
@@ -35,10 +35,7 @@ setJavaHome() {
     # NOTE: we need to obtain JAVA_HOME before using BashJavaUtils, so the 
value for env.java.home must
     # be in a flattened format, rather than nested, allowing us to retrieve 
the corresponding value via
     # shell script.
-    CONF_FILE="$1/flink-conf.yaml"
-    if [ ! -e "$1/flink-conf.yaml" ]; then
-        CONF_FILE="$1/config.yaml"
-    fi;
+    CONF_FILE="$1"
     
     KEY_ENV_JAVA_HOME="env.java.home"
     MY_JAVA_HOME=$(readFromConfigFile ${KEY_ENV_JAVA_HOME} "" "${CONF_FILE}")
diff --git a/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh 
b/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
index e714428583a..22095aa1fcb 100755
--- a/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
+++ b/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
@@ -25,7 +25,9 @@ if [ "$#" -lt 3 ]; then
 fi
 
 source "$2"/bash-java-utils.sh
-setJavaRun "$1"
+
+CONF_FILE="$1/config.yaml"
+setJavaRun "$CONF_FILE"
 
 ARGS=("${@:1}")
 result=$(updateAndGetFlinkConfiguration "${ARGS[@]}")
@@ -36,10 +38,5 @@ if [[ $? -ne 0 ]]; then
   exit 1
 fi
 
-CONF_FILE="$1/flink-conf.yaml"
-if [ ! -e "$1/flink-conf.yaml" ]; then
-  CONF_FILE="$1/config.yaml"
-fi;
-
 # Output the result
 echo "${result}" > "$CONF_FILE";
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index e45e1fb4f36..c5859ad8382 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -213,7 +213,7 @@ export FLINK_LIB_DIR
 export FLINK_OPT_DIR
 
 source "${FLINK_BIN_DIR}/bash-java-utils.sh"
-setJavaRun "$FLINK_CONF_DIR"
+setJavaRun "$FLINK_CONF_DIR/config.yaml"
 YAML_CONF=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" 
"${FLINK_BIN_DIR}" ${FLINK_LIB_DIR} -flatten)
 
 
########################################################################################################################
diff --git a/flink-dist/src/main/flink-bin/bin/migrate-config-file.sh 
b/flink-dist/src/main/flink-bin/bin/migrate-config-file.sh
index a82d1725e55..a797a814afc 100755
--- a/flink-dist/src/main/flink-bin/bin/migrate-config-file.sh
+++ b/flink-dist/src/main/flink-bin/bin/migrate-config-file.sh
@@ -26,7 +26,7 @@ FLINK_BIN_DIR=`cd "$FLINK_BIN_DIR"; pwd`
   
 FLINK_CONF_DIR="$FLINK_BIN_DIR"/../conf
 echo "Using Flink configuration directory: $FLINK_CONF_DIR"
-setJavaRun "$FLINK_CONF_DIR"
+setJavaRun "$FLINK_CONF_DIR/flink-conf.yaml"
 
 FLINK_LIB_DIR="$FLINK_BIN_DIR"/../lib
 echo "Using Flink library directory: $FLINK_LIB_DIR"
diff --git a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh 
b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
index 8b1f43225ca..77c14f9f5c1 100755
--- a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
+++ b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
@@ -34,15 +34,17 @@ bin=`cd "$bin"; pwd`
   
 if [ "$COMMAND" = "MIGRATE_LEGACY_FLINK_CONFIGURATION_TO_STANDARD_YAML" ]; then
   FLINK_CONF_DIR="${bin}/../resources"
+  FLINK_CONF="$FLINK_CONF_DIR/flink-conf.yaml"
 else
   FLINK_CONF_DIR="${bin}/../../main/resources"
+  FLINK_CONF="$FLINK_CONF_DIR/config.yaml"
 fi
 FLINK_TARGET_DIR=${bin}/../../../target
 FLINK_DIST_JARS=(`find ${FLINK_TARGET_DIR} -maxdepth 1 -name 
'flink-dist*.jar'`)
 FLINK_DIST_CLASSPATH=`echo ${FLINK_DIST_JARS[@]} | tr ' ' ':'`
 
 . ${bin}/../../main/flink-bin/bin/bash-java-utils.sh > /dev/null
-setJavaRun "$FLINK_CONF_DIR"
+setJavaRun "$FLINK_CONF"
 
 output=$(runBashJavaUtilsCmd ${COMMAND} ${FLINK_CONF_DIR} 
"$FLINK_TARGET_DIR/bash-java-utils.jar:${FLINK_DIST_CLASSPATH}" $DYNAMIC_OPTS)
 extractExecutionResults "${output}" ${EXPECTED_LINES}
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index a36cbbbbab1..65ca25ebf4f 100755
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -55,8 +55,9 @@ source "${FLINK_DIR}/bin/bash-java-utils.sh"
 if [[ -z "${FLINK_CONF_DIR:-}" ]]; then
     FLINK_CONF_DIR="$FLINK_DIR/conf"
 fi
-setJavaRun "$FLINK_CONF_DIR"
 FLINK_CONF=${FLINK_CONF_DIR}/config.yaml
+setJavaRun "$FLINK_CONF"
+
 # Flatten the configuration file config.yaml to enable end-to-end test cases 
which will modify 
 # it directly through shell scripts.
 output=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" "${FLINK_DIR}/bin" 
"${FLINK_DIR}/lib" -flatten)
diff --git a/flink-python/docs/reference/pyflink.table/table_environment.rst 
b/flink-python/docs/reference/pyflink.table/table_environment.rst
index 302dc3d5743..1039f5b2d33 100644
--- a/flink-python/docs/reference/pyflink.table/table_environment.rst
+++ b/flink-python/docs/reference/pyflink.table/table_environment.rst
@@ -72,7 +72,7 @@ programs.
 This class is a pure API class that abstracts configuration from various 
sources. Currently,
 configuration can be set in any of the following layers (in the given order):
 
-- flink-conf.yaml
+- config.yaml
 - CLI parameters
 - :class:`~pyflink.datastream.StreamExecutionEnvironment` when bridging to 
DataStream API
 - :func:`~EnvironmentSettings.Builder.with_configuration`
diff --git a/flink-python/pyflink/common/configuration.py 
b/flink-python/pyflink/common/configuration.py
index 779568da151..d833f83ba02 100644
--- a/flink-python/pyflink/common/configuration.py
+++ b/flink-python/pyflink/common/configuration.py
@@ -76,13 +76,11 @@ class Configuration:
 
     @staticmethod
     def parse_jars_value(value: str, jvm):
-        is_standard_yaml = 
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
-        if is_standard_yaml:
-            from ruamel.yaml import YAML
-            yaml = YAML(typ='safe')
-            jar_urls_list = yaml.load(value)
-            if isinstance(jar_urls_list, list):
-                return jar_urls_list
+        from ruamel.yaml import YAML
+        yaml = YAML(typ='safe')
+        jar_urls_list = yaml.load(value)
+        if isinstance(jar_urls_list, list):
+            return jar_urls_list
         return value.split(";")
 
     def get_integer(self, key: str, default_value: int) -> int:
diff --git a/flink-python/pyflink/pyflink_gateway_server.py 
b/flink-python/pyflink/pyflink_gateway_server.py
index 0eae1e961ee..a2c90797151 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -46,34 +46,14 @@ def on_windows():
 def read_from_config(key, default_value, flink_conf_directory):
     from ruamel.yaml import YAML
     yaml = YAML(typ='safe')
-    # try to find flink-conf.yaml file in flink_conf_directory
-    flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml")
-    if os.path.isfile(flink_conf_file):
-        # If flink-conf.yaml exists, use the old parsing logic to read the 
value
-        # get the realpath of tainted path value to avoid CWE22 problem that 
constructs a path
-        # or URI using the tainted value and might allow an attacker to 
access, modify, or test
-        # the existence of critical or sensitive files.
-        with open(os.path.realpath(flink_conf_file), "r") as f:
-            while True:
-                line = f.readline()
-                if not line:
-                    break
-                if line.startswith("#") or len(line.strip()) == 0:
-                    continue
-                k, v = line.split(":", 1)
-                if k.strip() == key:
-                    return v.strip()
-    else:
-        # If flink-conf.yaml does not exist, try to find config.yaml instead
-        config_file = os.path.join(flink_conf_directory, "config.yaml")
-        if os.path.isfile(config_file):
-            # If config.yaml exists, use YAML parser to read the value
-            with open(os.path.realpath(config_file), "r") as f:
-                config = yaml.load(f)
-                flat_config = flatten_config(config)
-                return flat_config.get(key, default_value)
-
-    # If neither file exists, return the default value
+    config_file = os.path.join(flink_conf_directory, "config.yaml")
+    if os.path.isfile(config_file):
+        # If config.yaml exists, use YAML parser to read the value
+        with open(os.path.realpath(config_file), "r") as f:
+            config = yaml.load(f)
+            flat_config = flatten_config(config)
+            return flat_config.get(key, default_value)
+
     return default_value
 
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
index 0e92e238a84..7d645dcf605 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationFileMigrationUtils;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -37,13 +38,11 @@ import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.flink.util.MathUtils.checkedDownCast;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Utility class to extract related parameters from {@link Configuration} and 
to sanity check them.
@@ -230,17 +229,9 @@ public class ConfigurationParserUtils {
             throw e;
         }
 
-        checkState(
-                new File(
-                                clusterConfiguration.getConfigDir(),
-                                GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME)
-                        .exists());
-        Configuration configuration =
-                
GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), 
null);
-
-        Configuration standardYamlConfig = new Configuration(true);
-        standardYamlConfig.addAll(configuration);
-
-        return 
ConfigurationUtils.convertConfigToWritableLines(standardYamlConfig, false);
+        return ConfigurationUtils.convertConfigToWritableLines(
+                
ConfigurationFileMigrationUtils.migrateLegacyToStandardYamlConfig(
+                        clusterConfiguration.getConfigDir()),
+                false);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 26c832eefc5..ceb2177cff9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -225,16 +224,6 @@ class BootstrapToolsTest {
 
     @Test
     void testWriteConfigurationAndReloadWithStandardYaml() throws Exception {
-        testWriteConfigurationAndReloadInternal(true);
-    }
-
-    @Test
-    void testWriteConfigurationAndReloadWithLegacyYaml() throws Exception {
-        testWriteConfigurationAndReloadInternal(false);
-    }
-
-    private void testWriteConfigurationAndReloadInternal(boolean standardYaml) 
throws IOException {
-        GlobalConfiguration.setStandardYaml(standardYaml);
         final File flinkConfDir = 
TempDirUtils.newFolder(temporaryFolder).getAbsoluteFile();
         final Configuration flinkConfig = new Configuration();
 
@@ -298,16 +287,13 @@ class BootstrapToolsTest {
 
         BootstrapTools.writeConfiguration(
                 flinkConfig, new File(flinkConfDir, 
GlobalConfiguration.getFlinkConfFilename()));
-        final Configuration loadedFlinkConfig =
-                
GlobalConfiguration.loadConfiguration(flinkConfDir.getAbsolutePath());
+        final Configuration loadedFlinkConfig;
+        loadedFlinkConfig = 
GlobalConfiguration.loadConfiguration(flinkConfDir.getAbsolutePath());
         assertThat(loadedFlinkConfig.get(listStringConfigOption))
                 .containsExactlyInAnyOrderElementsOf(list);
         assertThat(loadedFlinkConfig.get(listDurationConfigOption))
                 .containsExactlyInAnyOrderElementsOf(durationList);
         
assertThat(loadedFlinkConfig.get(mapConfigOption)).containsAllEntriesOf(map);
         
assertThat(loadedFlinkConfig.get(durationConfigOption)).isEqualTo(duration);
-
-        // clean the standard yaml flag to avoid impact to other cases.
-        GlobalConfiguration.setStandardYaml(true);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
index 9c72922895f..5312db4ba9b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
@@ -21,43 +21,29 @@ package org.apache.flink.runtime.util.bash;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationFileMigrationUtils;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FlinkConfigLoader}. */
-@ExtendWith(ParameterizedTestExtension.class)
 public class FlinkConfigLoaderTest {
 
-    @Parameter public boolean standardYaml;
-
-    @Parameters(name = "standardYaml: {0}")
-    public static Collection<Boolean> parameters() {
-        return Arrays.asList(true, false);
-    }
-
     private static final String TEST_CONFIG_KEY = "test.key";
     private static final String TEST_CONFIG_VALUE = "test_value";
 
@@ -65,105 +51,71 @@ public class FlinkConfigLoaderTest {
 
     @BeforeEach
     void setUp() throws IOException {
-        File flinkConfFile;
-        if (standardYaml) {
-            flinkConfFile =
-                    TempDirUtils.newFile(
-                            confDir.toAbsolutePath(), 
GlobalConfiguration.FLINK_CONF_FILENAME);
-        } else {
-            flinkConfFile =
-                    TempDirUtils.newFile(
-                            confDir.toAbsolutePath(),
-                            GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
-        }
+        File flinkConfFile =
+                TempDirUtils.newFile(
+                        confDir.toAbsolutePath(), 
GlobalConfiguration.FLINK_CONF_FILENAME);
         FileWriter fw = new FileWriter(flinkConfFile);
         fw.write(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE + "\n");
         fw.close();
     }
 
-    @AfterAll
-    static void after() {
-        // clean the standard yaml flag to avoid impact to other cases.
-        GlobalConfiguration.setStandardYaml(true);
-    }
-
-    @TestTemplate
+    @Test
     void testLoadConfigurationConfigDirLongOpt() throws Exception {
         String[] args = {"--configDir", confDir.toFile().getAbsolutePath()};
         Configuration configuration = 
FlinkConfigLoader.loadConfiguration(args);
         verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationConfigDirLongOpt() throws Exception {
         String[] args = {"--configDir", confDir.toFile().getAbsolutePath()};
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
-        } else {
-            assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " + 
TEST_CONFIG_VALUE);
-        }
+        assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
     }
 
-    @TestTemplate
+    @Test
     void testLoadConfigurationConfigDirShortOpt() throws Exception {
         String[] args = {"-c", confDir.toFile().getAbsolutePath()};
         Configuration configuration = 
FlinkConfigLoader.loadConfiguration(args);
         verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationConfigDirShortOpt() throws Exception {
         String[] args = {"-c", confDir.toFile().getAbsolutePath()};
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
-        } else {
-            assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " + 
TEST_CONFIG_VALUE);
-        }
+        assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
     }
 
-    @TestTemplate
+    @Test
     void testLoadConfigurationDynamicPropertyWithSpace() throws Exception {
         String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), 
"-D", "key=value"};
         Configuration configuration = 
FlinkConfigLoader.loadConfiguration(args);
         verifyConfiguration(configuration, "key", "value");
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationDynamicPropertyWithSpace() throws 
Exception {
         String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), 
"-D", "key=value"};
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
-        } else {
-            assertThat(list)
-                    .containsExactlyInAnyOrder(
-                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: 
value");
-        }
+        assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
     }
 
-    @TestTemplate
+    @Test
     void testLoadConfigurationDynamicPropertyWithoutSpace() throws Exception {
         String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), 
"-Dkey=value"};
         Configuration configuration = 
FlinkConfigLoader.loadConfiguration(args);
         verifyConfiguration(configuration, "key", "value");
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationDynamicPropertyWithoutSpace() throws 
Exception {
         String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), 
"-Dkey=value"};
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
-        } else {
-            assertThat(list)
-                    .containsExactlyInAnyOrder(
-                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: 
value");
-        }
+        assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
     }
 
-    @TestTemplate
+    @Test
     void testLoadConfigurationIgnoreUnknownToken() throws Exception {
         String[] args = {
             "unknown",
@@ -178,7 +130,7 @@ public class FlinkConfigLoaderTest {
         verifyConfiguration(configuration, "key", "value");
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationIgnoreUnknownToken() throws Exception {
         String[] args = {
             "unknown",
@@ -189,16 +141,10 @@ public class FlinkConfigLoaderTest {
             "-Dkey=value"
         };
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
-        } else {
-            assertThat(list)
-                    .containsExactlyInAnyOrder(
-                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: 
value");
-        }
+        assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationRemoveKeysMatched() throws Exception {
         String key = "key";
 
@@ -210,14 +156,10 @@ public class FlinkConfigLoaderTest {
             key
         };
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
-        } else {
-            assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " 
+ TEST_CONFIG_VALUE);
-        }
+        assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationRemoveKeysNotMatched() throws Exception 
{
         String key = "key";
         String value = "value";
@@ -231,17 +173,11 @@ public class FlinkConfigLoaderTest {
             removeKey
         };
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list)
-                    .containsExactly("test:", "  key: " + TEST_CONFIG_VALUE, 
key + ": " + value);
-        } else {
-            assertThat(list)
-                    .containsExactlyInAnyOrder(
-                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, key + 
": " + value);
-        }
+        assertThat(list)
+                .containsExactly("test:", "  key: " + TEST_CONFIG_VALUE, key + 
": " + value);
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationRemoveKeyValuesMatched() throws 
Exception {
         String removeKey = "removeKey";
         String removeValue = "removeValue";
@@ -254,14 +190,10 @@ public class FlinkConfigLoaderTest {
             String.format("%s=%s", removeKey, removeValue)
         };
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
-        } else {
-            assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " 
+ TEST_CONFIG_VALUE);
-        }
+        assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationRemoveKeyValuesNotMatched() throws 
Exception {
         String removeKey = "removeKey";
         String removeValue = "removeValue";
@@ -275,19 +207,12 @@ public class FlinkConfigLoaderTest {
             String.format("%s=%s", removeKey, nonExistentValue)
         };
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list)
-                    .containsExactlyInAnyOrder(
-                            "test:", "  key: " + TEST_CONFIG_VALUE, removeKey 
+ ": " + removeValue);
-        } else {
-            assertThat(list)
-                    .containsExactlyInAnyOrder(
-                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE,
-                            removeKey + ": " + removeValue);
-        }
+        assertThat(list)
+                .containsExactlyInAnyOrder(
+                        "test:", "  key: " + TEST_CONFIG_VALUE, removeKey + ": 
" + removeValue);
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationReplaceKeyValuesMatched() throws 
Exception {
         String newValue = "newValue";
 
@@ -298,14 +223,10 @@ public class FlinkConfigLoaderTest {
             String.format("%s,%s,%s", TEST_CONFIG_KEY, TEST_CONFIG_VALUE, 
newValue)
         };
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + newValue);
-        } else {
-            assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " 
+ newValue);
-        }
+        assertThat(list).containsExactly("test:", "  key: " + newValue);
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationReplaceKeyValuesNotMatched() throws 
Exception {
         String nonExistentValue = "nonExistentValue";
         String newValue = "newValue";
@@ -317,14 +238,10 @@ public class FlinkConfigLoaderTest {
             String.format("%s,%s,%s", TEST_CONFIG_KEY, nonExistentValue, 
newValue)
         };
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
-        if (standardYaml) {
-            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
-        } else {
-            assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " 
+ TEST_CONFIG_VALUE);
-        }
+        assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
     }
 
-    @TestTemplate
+    @Test
     void testloadAndModifyConfigurationWithFlatten() throws Exception {
         String[] args = {"-c", confDir.toFile().getAbsolutePath(), "-flatten"};
         List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
@@ -333,11 +250,11 @@ public class FlinkConfigLoaderTest {
 
     @Test
     void testMigrateLegacyConfigToStandardYaml() throws Exception {
-        try (FileWriter fw =
-                new FileWriter(
-                        new File(
-                                confDir.toFile(),
-                                
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME))) {
+        File file =
+                new File(
+                        confDir.toFile(),
+                        
ConfigurationFileMigrationUtils.LEGACY_FLINK_CONF_FILENAME);
+        try (FileWriter fw = new FileWriter(file)) {
             fw.write(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE + "\n");
             fw.write(
                     "pipeline.cached-files"
@@ -351,7 +268,8 @@ public class FlinkConfigLoaderTest {
                             + " 
class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"
                             + "\n");
         }
-        Configuration configuration = 
GlobalConfiguration.loadConfiguration(confDir.toString());
+        Map<String, String> configuration =
+                ConfigurationFileMigrationUtils.loadLegacyYAMLResource(file);
 
         File newConfigFolder = TempDirUtils.newFolder(confDir);
         try (FileWriter fw =
@@ -367,11 +285,15 @@ public class FlinkConfigLoaderTest {
 
         Configuration standardYamlConfig =
                 
GlobalConfiguration.loadConfiguration(newConfigFolder.toString());
-        assertThat(configuration.getString(TEST_CONFIG_KEY, null))
+        assertThat(configuration.getOrDefault(TEST_CONFIG_KEY, null))
                 .isEqualTo(standardYamlConfig.getString(TEST_CONFIG_KEY, 
null));
 
+        List<String> serializers =
+                ConfigurationUtils.convertToList(
+                        
configuration.get(PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key()),
+                        String.class);
         assertThat(
-                        
configuration.get(PipelineOptions.KRYO_DEFAULT_SERIALIZERS).stream()
+                        serializers.stream()
                                 .map(ConfigurationUtils::parseStringToMap)
                                 .collect(Collectors.toList()))
                 .isEqualTo(
@@ -379,9 +301,10 @@ public class FlinkConfigLoaderTest {
                                 .map(ConfigurationUtils::parseStringToMap)
                                 .collect(Collectors.toList()));
 
-        assertThat(
-                        DistributedCache.parseCachedFilesFromString(
-                                
configuration.get(PipelineOptions.CACHED_FILES)))
+        List<String> cachedFiles =
+                ConfigurationUtils.convertToList(
+                        configuration.get(PipelineOptions.CACHED_FILES.key()), 
String.class);
+        assertThat(DistributedCache.parseCachedFilesFromString(cachedFiles))
                 .isEqualTo(
                         DistributedCache.parseCachedFilesFromString(
                                 
standardYamlConfig.get(PipelineOptions.CACHED_FILES)));
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
index 1ed5fa133c7..9fa6db466a1 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -163,7 +162,6 @@ public class ForStStateBackendConfigTest {
 
     @Test
     public void testConfigureForStCompressionPerLevel() throws Exception {
-        GlobalConfiguration.setStandardYaml(false);
         final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
         ForStStateBackend forStStateBackend = new ForStStateBackend();
         CompressionType[] compressionTypes = {
@@ -184,7 +182,6 @@ public class ForStStateBackendConfigTest {
 
         resourceContainer.close();
         env.close();
-        GlobalConfiguration.setStandardYaml(true);
     }
 
     @Test
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index bcd9023845c..b19c80eb6f5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -291,7 +290,6 @@ public class RocksDBStateBackendConfigTest {
 
     @Test
     public void testConfigureRocksDBCompressionPerLevel() throws Exception {
-        GlobalConfiguration.setStandardYaml(false);
         final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
         EmbeddedRocksDBStateBackend rocksDbBackend = new 
EmbeddedRocksDBStateBackend();
         CompressionType[] compressionTypes = {
@@ -312,7 +310,6 @@ public class RocksDBStateBackendConfigTest {
 
         resourceContainer.close();
         env.close();
-        GlobalConfiguration.setStandardYaml(true);
     }
 
     @Test

Reply via email to