This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c0176fd6794 [FLINK-33677][core] Remove flink-conf.yaml from flink dist.
c0176fd6794 is described below
commit c0176fd6794295dc3706ed7f536f3f70b6110923
Author: JunRuiLee <[email protected]>
AuthorDate: Thu Aug 22 11:18:27 2024 +0800
[FLINK-33677][core] Remove flink-conf.yaml from flink dist.
This closes #25234.
---
.../common/serialization/SerializerConfigImpl.java | 7 -
.../apache/flink/configuration/Configuration.java | 44 ++---
.../ConfigurationFileMigrationUtils.java | 173 +++++++++++++++++++
.../flink/configuration/ConfigurationUtils.java | 83 +++-------
.../configuration/DelegatingConfiguration.java | 18 +-
.../flink/configuration/GlobalConfiguration.java | 128 ++------------
.../serialization/SerializerConfigImplTest.java | 16 --
.../ConfigurationFileMigrationUtilsTest.java | 93 +++++++++++
.../flink/configuration/ConfigurationTest.java | 146 +++++++---------
.../configuration/ConfigurationUtilsTest.java | 112 ++++---------
.../configuration/GlobalConfigurationTest.java | 58 -------
.../ReadableWritableConfigurationTest.java | 62 +++----
.../src/main/flink-bin/bin/bash-java-utils.sh | 5 +-
.../src/main/flink-bin/bin/config-parser-utils.sh | 9 +-
flink-dist/src/main/flink-bin/bin/config.sh | 2 +-
.../src/main/flink-bin/bin/migrate-config-file.sh | 2 +-
flink-dist/src/test/bin/runBashJavaUtilsCmd.sh | 4 +-
flink-end-to-end-tests/test-scripts/common.sh | 3 +-
.../reference/pyflink.table/table_environment.rst | 2 +-
flink-python/pyflink/common/configuration.py | 12 +-
flink-python/pyflink/pyflink_gateway_server.py | 36 +---
.../runtime/util/ConfigurationParserUtils.java | 19 +--
.../clusterframework/BootstrapToolsTest.java | 18 +-
.../runtime/util/bash/FlinkConfigLoaderTest.java | 183 ++++++---------------
.../state/forst/ForStStateBackendConfigTest.java | 3 -
.../state/RocksDBStateBackendConfigTest.java | 3 -
26 files changed, 517 insertions(+), 724 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
index 1ffde12a61b..9a281e4cd4d 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
@@ -389,12 +388,6 @@ public final class SerializerConfigImpl implements
SerializerConfig {
.getOptional(PipelineOptions.SERIALIZATION_CONFIG)
.ifPresent(c ->
parseSerializationConfigWithExceptionHandling(classLoader, c));
} catch (Exception e) {
- if (!GlobalConfiguration.isStandardYaml()) {
- throw new UnsupportedOperationException(
- String.format(
- "%s is only supported with the standard YAML
config parser, please use \"config.yaml\" as the config file.",
- PipelineOptions.SERIALIZATION_CONFIG.key()));
- }
throw e;
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 94e77fbd2a0..23f337681ac 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -21,7 +21,6 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
@@ -79,21 +78,11 @@ public class Configuration extends
ExecutionConfig.GlobalJobParameters
*/
protected final HashMap<String, Object> confData;
- protected final boolean standardYaml;
-
//
--------------------------------------------------------------------------------------------
/** Creates a new empty configuration. */
public Configuration() {
this.confData = new HashMap<>();
- this.standardYaml = GlobalConfiguration.isStandardYaml();
- }
-
- @VisibleForTesting
- @Internal
- public Configuration(boolean standardYaml) {
- this.confData = new HashMap<>();
- this.standardYaml = standardYaml;
}
/**
@@ -103,7 +92,6 @@ public class Configuration extends
ExecutionConfig.GlobalJobParameters
*/
public Configuration(Configuration other) {
this.confData = new HashMap<>(other.confData);
- this.standardYaml = other.standardYaml;
}
//
--------------------------------------------------------------------------------------------
@@ -175,7 +163,7 @@ public class Configuration extends
ExecutionConfig.GlobalJobParameters
*/
public String getString(String key, String defaultValue) {
return getRawValue(key)
- .map(o -> ConfigurationUtils.convertToString(o, standardYaml))
+ .map(o -> ConfigurationUtils.convertToString(o))
.orElse(defaultValue);
}
@@ -818,9 +806,9 @@ public class Configuration extends
ExecutionConfig.GlobalJobParameters
try {
if (option.isList()) {
- return rawValue.map(v -> ConfigurationUtils.convertToList(v,
clazz, standardYaml));
+ return rawValue.map(v -> ConfigurationUtils.convertToList(v,
clazz));
} else {
- return rawValue.map(v -> ConfigurationUtils.convertValue(v,
clazz, standardYaml));
+ return rawValue.map(v -> ConfigurationUtils.convertValue(v,
clazz));
}
} catch (Exception e) {
throw new IllegalArgumentException(
@@ -848,9 +836,7 @@ public class Configuration extends
ExecutionConfig.GlobalJobParameters
Map<String, String> ret =
CollectionUtil.newHashMapWithExpectedSize(this.confData.size());
for (Map.Entry<String, Object> entry : confData.entrySet()) {
- ret.put(
- entry.getKey(),
- ConfigurationUtils.convertToString(entry.getValue(),
standardYaml));
+ ret.put(entry.getKey(),
ConfigurationUtils.convertToString(entry.getValue()));
}
return ret;
}
@@ -867,19 +853,15 @@ public class Configuration extends
ExecutionConfig.GlobalJobParameters
*/
@Internal
public Map<String, String> toFileWritableMap() {
- if (standardYaml) {
- synchronized (this.confData) {
- Map<String, String> ret =
-
CollectionUtil.newHashMapWithExpectedSize(this.confData.size());
- for (Map.Entry<String, Object> entry : confData.entrySet()) {
- // Because some character in standard yaml should be
escaped by quotes, such as
- // '*', here we should wrap the value by Yaml pattern
- ret.put(entry.getKey(),
YamlParserUtils.toYAMLString(entry.getValue()));
- }
- return ret;
+ synchronized (this.confData) {
+ Map<String, String> ret =
+
CollectionUtil.newHashMapWithExpectedSize(this.confData.size());
+ for (Map.Entry<String, Object> entry : confData.entrySet()) {
+ // Because some character in standard yaml should be escaped
by quotes, such as
+ // '*', here we should wrap the value by Yaml pattern
+ ret.put(entry.getKey(),
YamlParserUtils.toYAMLString(entry.getValue()));
}
- } else {
- return toMap();
+ return ret;
}
}
@@ -955,7 +937,7 @@ public class Configuration extends
ExecutionConfig.GlobalJobParameters
return Optional.ofNullable(valueFromExactKey);
}
final Map<String, String> valueFromPrefixMap =
- convertToPropertiesPrefixed(confData, key, standardYaml);
+ convertToPropertiesPrefixed(confData, key);
if (valueFromPrefixMap.isEmpty()) {
return Optional.empty();
}
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationFileMigrationUtils.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationFileMigrationUtils.java
new file mode 100644
index 00000000000..869e0019e86
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationFileMigrationUtils.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class for migrating legacy Flink configuration file {@code
flink-conf.yaml} to the new
+ * format starting from Flink 2.0. This class provides methods to load legacy
configuration files
+ * and convert them into the new configuration format.
+ */
+public class ConfigurationFileMigrationUtils {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ConfigurationFileMigrationUtils.class);
+
+ /**
+ * This file is only used to help users migrate their legacy configuration
files to the new
+ * configuration file `config.yaml` starting from Flink 2.0.
+ */
+ @VisibleForTesting public static final String LEGACY_FLINK_CONF_FILENAME =
"flink-conf.yaml";
+
+ /**
+ * Migrates the legacy Flink configuration from the specified directory to
a standard YAML
+ * format representation.
+ *
+ * <p>This method loads the legacy configuration file named {@code
flink-conf.yaml} from the
+ * specified directory. If the file is found, it converts the legacy
format into a standard
+ * {@link Configuration} object in YAML format.
+ *
+ * @param configDir the directory where the legacy configuration file is
located
+ * @return a {@link Configuration} object in standard YAML format
+ */
+ public static Configuration migrateLegacyToStandardYamlConfig(final String
configDir) {
+ if (configDir == null) {
+ throw new IllegalArgumentException(
+ "Given configuration directory is null, cannot load
configuration");
+ }
+
+ final File confDirFile = new File(configDir);
+ if (!(confDirFile.exists())) {
+ throw new IllegalConfigurationException(
+ "The given configuration directory name '"
+ + configDir
+ + "' ("
+ + confDirFile.getAbsolutePath()
+ + ") does not describe an existing directory.");
+ }
+
+ // get Flink yaml configuration file
+ Map<String, String> configuration;
+ File yamlConfigFile = new File(confDirFile,
LEGACY_FLINK_CONF_FILENAME);
+ if (!yamlConfigFile.exists()) {
+ throw new IllegalConfigurationException(
+ "The Flink config file '"
+ + yamlConfigFile
+ + "' ("
+ + yamlConfigFile.getAbsolutePath()
+ + ") does not exist.");
+ } else {
+ LOG.info(
+ "Using legacy YAML parser to load flink configuration file
from {}.",
+ yamlConfigFile.getAbsolutePath());
+ configuration = loadLegacyYAMLResource(yamlConfigFile);
+ }
+
+ Configuration standardYamlConfig = new Configuration();
+ configuration.forEach(standardYamlConfig::setString);
+ return standardYamlConfig;
+ }
+
+ /**
+ * Loads a YAML-file of key-value pairs.
+ *
+ * <p>Colon and whitespace ": " separate key and value (one per line). The
hash tag "#" starts a
+ * single-line comment.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * jobmanager.rpc.address: localhost # network address for communication
with the job manager
+ * jobmanager.rpc.port : 6123 # network port to connect to for
communication with the job manager
+ * taskmanager.rpc.port : 6122 # network port the task manager
expects incoming IPC connections
+ * </pre>
+ *
+ * <p>This does not span the whole YAML specification, but only the
*syntax* of simple YAML
+ * key-value pairs (see issue #113 on GitHub). If at any point in time,
there is a need to go
+ * beyond simple key-value pairs syntax compatibility will allow to
introduce a YAML parser
+ * library.
+ *
+ * @param file the YAML file to read from
+ * @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2
specification</a>
+ */
+ @VisibleForTesting
+ public static Map<String, String> loadLegacyYAMLResource(File file) {
+ Map<String, String> config = new HashMap<>();
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(new
FileInputStream(file)))) {
+
+ String line;
+ int lineNo = 0;
+ while ((line = reader.readLine()) != null) {
+ lineNo++;
+ // 1. check for comments
+ String[] comments = line.split("#", 2);
+ String conf = comments[0].trim();
+
+ // 2. get key and value
+ if (conf.length() > 0) {
+ String[] kv = conf.split(": ", 2);
+
+ // skip line with no valid key-value pair
+ if (kv.length == 1) {
+ LOG.warn(
+ "Error while trying to split key and value in
configuration file "
+ + file
+ + ":"
+ + lineNo
+ + ": Line is not a key-value pair
(missing space after ':'?)");
+ continue;
+ }
+
+ String key = kv[0].trim();
+ String value = kv[1].trim();
+
+ // sanity check
+ if (key.length() == 0 || value.length() == 0) {
+ LOG.warn(
+ "Error after splitting key and value in
configuration file "
+ + file
+ + ":"
+ + lineNo
+ + ": Key or value was empty");
+ continue;
+ }
+
+ config.put(key, value);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error parsing YAML configuration.", e);
+ }
+
+ return config;
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 9cbf8810023..1802253474d 100755
---
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -41,7 +41,6 @@ import java.util.stream.Collectors;
import static
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
import static
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
-import static
org.apache.flink.configuration.StructuredOptionsSplitter.escapeWithSingleQuote;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Utility class for {@link Configuration} related helper functions. */
@@ -131,11 +130,11 @@ public class ConfigurationUtils {
* @return parsed map
*/
public static Map<String, String> parseStringToMap(String
stringSerializedMap) {
- return convertToProperties(stringSerializedMap,
GlobalConfiguration.isStandardYaml());
+ return convertToProperties(stringSerializedMap);
}
public static String parseMapToString(Map<String, String> map) {
- return convertToString(map, GlobalConfiguration.isStandardYaml());
+ return convertToString(map);
}
public static Time getStandaloneClusterStartupPeriodTime(Configuration
configuration) {
@@ -247,7 +246,7 @@ public class ConfigurationUtils {
*/
public static List<String> convertConfigToWritableLines(
Configuration configuration, boolean flattenYaml) {
- if (configuration.standardYaml && !flattenYaml) {
+ if (!flattenYaml) {
return YamlParserUtils.convertAndDumpYamlFromFlatMap(
Collections.unmodifiableMap(configuration.confData));
} else {
@@ -372,11 +371,6 @@ public class ConfigurationUtils {
*/
@SuppressWarnings("unchecked")
public static <T> T convertValue(Object rawValue, Class<?> clazz) {
- return convertValue(rawValue, clazz,
GlobalConfiguration.isStandardYaml());
- }
-
- @SuppressWarnings("unchecked")
- public static <T> T convertValue(Object rawValue, Class<?> clazz, boolean
standardYaml) {
if (Integer.class.equals(clazz)) {
return (T) convertToInt(rawValue);
} else if (Long.class.equals(clazz)) {
@@ -388,7 +382,7 @@ public class ConfigurationUtils {
} else if (Double.class.equals(clazz)) {
return (T) convertToDouble(rawValue);
} else if (String.class.equals(clazz)) {
- return (T) convertToString(rawValue, standardYaml);
+ return (T) convertToString(rawValue);
} else if (clazz.isEnum()) {
return (T) convertToEnum(rawValue, (Class<? extends Enum<?>>)
clazz);
} else if (clazz == Duration.class) {
@@ -396,17 +390,17 @@ public class ConfigurationUtils {
} else if (clazz == MemorySize.class) {
return (T) convertToMemorySize(rawValue);
} else if (clazz == Map.class) {
- return (T) convertToProperties(rawValue, standardYaml);
+ return (T) convertToProperties(rawValue);
}
throw new IllegalArgumentException("Unsupported type: " + clazz);
}
@SuppressWarnings("unchecked")
- static <T> T convertToList(Object rawValue, Class<?> atomicClass, boolean
standardYaml) {
+ public static <T> T convertToList(Object rawValue, Class<?> atomicClass) {
if (rawValue instanceof List) {
return (T) rawValue;
- } else if (standardYaml) {
+ } else {
try {
List<Object> data =
YamlParserUtils.convertToObject(rawValue.toString(),
List.class);
@@ -417,20 +411,18 @@ public class ConfigurationUtils {
if (atomicClass == Map.class) {
return (T)
data.stream()
- .map(map ->
convertToStringMap((Map<Object, Object>) map, true))
+ .map(map ->
convertToStringMap((Map<Object, Object>) map))
.collect(Collectors.toList());
}
return (T)
data.stream()
- .map(s -> convertValue(s, atomicClass, true))
+ .map(s -> convertValue(s, atomicClass))
.collect(Collectors.toList());
} catch (Exception e) {
// Fallback to legacy pattern
return convertToListWithLegacyProperties(rawValue,
atomicClass);
}
- } else {
- return convertToListWithLegacyProperties(rawValue, atomicClass);
}
}
@@ -438,24 +430,22 @@ public class ConfigurationUtils {
private static <T> T convertToListWithLegacyProperties(Object rawValue,
Class<?> atomicClass) {
return (T)
StructuredOptionsSplitter.splitEscaped(rawValue.toString(),
';').stream()
- .map(s -> convertValue(s, atomicClass, false))
+ .map(s -> convertValue(s, atomicClass))
.collect(Collectors.toList());
}
@SuppressWarnings("unchecked")
- static Map<String, String> convertToProperties(Object o, boolean
standardYaml) {
+ static Map<String, String> convertToProperties(Object o) {
if (o instanceof Map) {
return (Map<String, String>) o;
- } else if (standardYaml) {
+ } else {
try {
Map<Object, Object> map =
YamlParserUtils.convertToObject(o.toString(), Map.class);
- return convertToStringMap(map, true);
+ return convertToStringMap(map);
} catch (Exception e) {
// Fallback to legacy pattern
return convertToPropertiesWithLegacyPattern(o);
}
- } else {
- return convertToPropertiesWithLegacyPattern(o);
}
}
@@ -475,13 +465,12 @@ public class ConfigurationUtils {
.collect(Collectors.toMap(a -> a.get(0), a -> a.get(1)));
}
- private static Map<String, String> convertToStringMap(
- Map<Object, Object> map, boolean standardYaml) {
+ private static Map<String, String> convertToStringMap(Map<Object, Object>
map) {
return map.entrySet().stream()
.collect(
Collectors.toMap(
- entry -> convertToString(entry.getKey(),
standardYaml),
- entry -> convertToString(entry.getValue(),
standardYaml)));
+ entry -> convertToString(entry.getKey()),
+ entry -> convertToString(entry.getValue())));
}
@SuppressWarnings("unchecked")
@@ -521,42 +510,12 @@ public class ConfigurationUtils {
return MemorySize.parse(o.toString());
}
- static String convertToString(Object o, boolean standardYaml) {
- if (standardYaml) {
- if (o.getClass() == String.class) {
- return (String) o;
- } else {
- return YamlParserUtils.toYAMLString(o);
- }
- }
-
+ static String convertToString(Object o) {
if (o.getClass() == String.class) {
return (String) o;
- } else if (o.getClass() == Duration.class) {
- Duration duration = (Duration) o;
- return TimeUtils.formatWithHighestUnit(duration);
- } else if (o instanceof List) {
- return ((List<?>) o)
- .stream()
- .map(e -> escapeWithSingleQuote(convertToString(e,
false), ";"))
- .collect(Collectors.joining(";"));
- } else if (o instanceof Map) {
- return ((Map<?, ?>) o)
- .entrySet().stream()
- .map(
- e -> {
- String escapedKey =
-
escapeWithSingleQuote(e.getKey().toString(), ":");
- String escapedValue =
-
escapeWithSingleQuote(e.getValue().toString(), ":");
-
- return escapeWithSingleQuote(
- escapedKey + ":" +
escapedValue, ",");
- })
- .collect(Collectors.joining(","));
+ } else {
+ return YamlParserUtils.toYAMLString(o);
}
-
- return o.toString();
}
static Integer convertToInt(Object o) {
@@ -666,14 +625,14 @@ public class ConfigurationUtils {
}
static Map<String, String> convertToPropertiesPrefixed(
- Map<String, Object> confData, String key, boolean standardYaml) {
+ Map<String, Object> confData, String key) {
final String prefixKey = key + ".";
return confData.keySet().stream()
.filter(k -> k.startsWith(prefixKey))
.collect(
Collectors.toMap(
k -> k.substring(prefixKey.length()),
- k -> convertToString(confData.get(k),
standardYaml)));
+ k -> convertToString(confData.get(k))));
}
static boolean containsPrefixMap(Map<String, Object> confData, String key)
{
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index f0f147b884f..0f3748f62f5 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -322,19 +322,15 @@ public final class DelegatingConfiguration extends
Configuration {
@Override
public Map<String, String> toFileWritableMap() {
- if (backingConfig.standardYaml) {
- Map<String, String> map = backingConfig.toFileWritableMap();
- Map<String, String> prefixed = new HashMap<>();
- for (Map.Entry<String, String> entry : map.entrySet()) {
- if (entry.getKey().startsWith(prefix)) {
- String keyWithoutPrefix =
entry.getKey().substring(prefix.length());
- prefixed.put(keyWithoutPrefix,
YamlParserUtils.toYAMLString(entry.getValue()));
- }
+ Map<String, String> map = backingConfig.toFileWritableMap();
+ Map<String, String> prefixed = new HashMap<>();
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ if (entry.getKey().startsWith(prefix)) {
+ String keyWithoutPrefix =
entry.getKey().substring(prefix.length());
+ prefixed.put(keyWithoutPrefix,
YamlParserUtils.toYAMLString(entry.getValue()));
}
- return prefixed;
- } else {
- return toMap();
}
+ return prefixed;
}
@Override
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 8e680de327b..66225307287 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -19,7 +19,6 @@
package org.apache.flink.configuration;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -27,11 +26,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,8 +40,6 @@ public final class GlobalConfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(GlobalConfiguration.class);
- public static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
-
public static final String FLINK_CONF_FILENAME = "config.yaml";
// key separator character
@@ -70,8 +63,6 @@ public final class GlobalConfiguration {
// the hidden content to be displayed
public static final String HIDDEN_CONTENT = "******";
- private static boolean standardYaml = true;
-
//
--------------------------------------------------------------------------------------------
private GlobalConfiguration() {}
@@ -143,31 +134,20 @@ public final class GlobalConfiguration {
}
// get Flink yaml configuration file
- File yamlConfigFile = new File(confDirFile,
LEGACY_FLINK_CONF_FILENAME);
Configuration configuration;
-
+ File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
if (!yamlConfigFile.exists()) {
- yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
- if (!yamlConfigFile.exists()) {
- throw new IllegalConfigurationException(
- "The Flink config file '"
- + yamlConfigFile
- + "' ("
- + yamlConfigFile.getAbsolutePath()
- + ") does not exist.");
- } else {
- standardYaml = true;
- LOG.info(
- "Using standard YAML parser to load flink
configuration file from {}.",
- yamlConfigFile.getAbsolutePath());
- configuration = loadYAMLResource(yamlConfigFile);
- }
+ throw new IllegalConfigurationException(
+ "The Flink config file '"
+ + yamlConfigFile
+ + "' ("
+ + yamlConfigFile.getAbsolutePath()
+ + ") does not exist.");
} else {
- standardYaml = false;
LOG.info(
- "Using legacy YAML parser to load flink configuration file
from {}.",
+ "Using standard YAML parser to load flink configuration
file from {}.",
yamlConfigFile.getAbsolutePath());
- configuration = loadLegacyYAMLResource(yamlConfigFile);
+ configuration = loadYAMLResource(yamlConfigFile);
}
logConfiguration("Loading", configuration);
@@ -190,81 +170,6 @@ public final class GlobalConfiguration {
isSensitive(key) ? HIDDEN_CONTENT : value));
}
- /**
- * Loads a YAML-file of key-value pairs.
- *
- * <p>Colon and whitespace ": " separate key and value (one per line). The
hash tag "#" starts a
- * single-line comment.
- *
- * <p>Example:
- *
- * <pre>
- * jobmanager.rpc.address: localhost # network address for communication
with the job manager
- * jobmanager.rpc.port : 6123 # network port to connect to for
communication with the job manager
- * taskmanager.rpc.port : 6122 # network port the task manager
expects incoming IPC connections
- * </pre>
- *
- * <p>This does not span the whole YAML specification, but only the
*syntax* of simple YAML
- * key-value pairs (see issue #113 on GitHub). If at any point in time,
there is a need to go
- * beyond simple key-value pairs syntax compatibility will allow to
introduce a YAML parser
- * library.
- *
- * @param file the YAML file to read from
- * @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2
specification</a>
- */
- private static Configuration loadLegacyYAMLResource(File file) {
- final Configuration config = new Configuration();
-
- try (BufferedReader reader =
- new BufferedReader(new InputStreamReader(new
FileInputStream(file)))) {
-
- String line;
- int lineNo = 0;
- while ((line = reader.readLine()) != null) {
- lineNo++;
- // 1. check for comments
- String[] comments = line.split("#", 2);
- String conf = comments[0].trim();
-
- // 2. get key and value
- if (conf.length() > 0) {
- String[] kv = conf.split(": ", 2);
-
- // skip line with no valid key-value pair
- if (kv.length == 1) {
- LOG.warn(
- "Error while trying to split key and value in
configuration file "
- + file
- + ":"
- + lineNo
- + ": Line is not a key-value pair
(missing space after ':'?)");
- continue;
- }
-
- String key = kv[0].trim();
- String value = kv[1].trim();
-
- // sanity check
- if (key.length() == 0 || value.length() == 0) {
- LOG.warn(
- "Error after splitting key and value in
configuration file "
- + file
- + ":"
- + lineNo
- + ": Key or value was empty");
- continue;
- }
-
- config.setString(key, value);
- }
- }
- } catch (IOException e) {
- throw new RuntimeException("Error parsing YAML configuration.", e);
- }
-
- return config;
- }
-
/**
* Flattens a nested configuration map to be only one level deep.
*
@@ -370,19 +275,6 @@ public final class GlobalConfiguration {
}
public static String getFlinkConfFilename() {
- if (isStandardYaml()) {
- return FLINK_CONF_FILENAME;
- } else {
- return LEGACY_FLINK_CONF_FILENAME;
- }
- }
-
- public static boolean isStandardYaml() {
- return standardYaml;
- }
-
- @VisibleForTesting
- public static void setStandardYaml(boolean standardYaml) {
- GlobalConfiguration.standardYaml = standardYaml;
+ return FLINK_CONF_FILENAME;
}
}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
index e3b361eee23..96799e42f51 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
@@ -271,21 +270,6 @@ class SerializerConfigImplTest {
SerializerConfigImplTest.class,
TestTypeInfoFactory.class));
}
- @Test
- void testLoadingSerializationConfigWithLegacyParser() {
- GlobalConfiguration.setStandardYaml(false);
- String serializationConfigStr =
-
"{org.apache.flink.api.common.serialization.SerializerConfigImplTest:"
- + " {type: pojo},"
- + "
org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1:"
- + " {type: pojo}}";
- assertThatThrownBy(() ->
getConfiguredSerializerConfig(serializationConfigStr))
- .isInstanceOf(UnsupportedOperationException.class);
-
- // Clear the standard yaml flag to avoid impact to other cases.
- GlobalConfiguration.setStandardYaml(true);
- }
-
@Test
void testLoadingIllegalSerializationConfig() {
String duplicateClassConfigStr =
diff --git
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationFileMigrationUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationFileMigrationUtilsTest.java
new file mode 100644
index 00000000000..ff5e235fe49
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationFileMigrationUtilsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConfigurationFileMigrationUtilsTest {
+
+ @TempDir private File tmpDir;
+
+ @Test
+ void testConfigurationWithLegacyYAML() throws FileNotFoundException {
+ File confFile =
+ new File(tmpDir,
ConfigurationFileMigrationUtils.LEGACY_FLINK_CONF_FILENAME);
+ try (PrintWriter pw = new PrintWriter(confFile)) {
+ pw.println("###########################"); // should be skipped
+ pw.println("# Some : comments : to skip"); // should be skipped
+ pw.println("###########################"); // should be skipped
+ pw.println("mykey1: myvalue1"); // OK, simple correct case
+ pw.println("mykey2 : myvalue2"); // OK, whitespace before
colon is correct
+ pw.println("mykey3:myvalue3"); // SKIP, missing white space after
colon
+ pw.println(" some nonsense without colon and whitespace
separator"); // SKIP
+ pw.println(" : "); // SKIP
+ pw.println(" "); // SKIP (silently)
+ pw.println(" "); // SKIP (silently)
+ pw.println("mykey4: myvalue4# some comments"); // OK, skip
comments only
+ pw.println(" mykey5 : myvalue5 "); // OK, trim
unnecessary whitespace
+ pw.println("mykey6: my: value6"); // OK, only use first ': ' as
separator
+ pw.println("mykey7: "); // SKIP, no value provided
+ pw.println(": myvalue8"); // SKIP, no key provided
+
+ pw.println("mykey9: myvalue9"); // OK
+ pw.println("mykey9: myvalue10"); // OK, overwrite last value
+ }
+
+ Map<String, String> conf =
ConfigurationFileMigrationUtils.loadLegacyYAMLResource(confFile);
+
+ // all distinct keys from confFile1 + confFile2 key
+ assertThat(conf.keySet()).hasSize(6);
+
+ // keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected values
+ assertThat(conf.getOrDefault("mykey1", null)).isEqualTo("myvalue1");
+ assertThat(conf.getOrDefault("mykey1", null)).isEqualTo("myvalue1");
+ assertThat(conf.getOrDefault("mykey2", null)).isEqualTo("myvalue2");
+ assertThat(conf.getOrDefault("mykey3", "null")).isEqualTo("null");
+ assertThat(conf.getOrDefault("mykey4", null)).isEqualTo("myvalue4");
+ assertThat(conf.getOrDefault("mykey5", null)).isEqualTo("myvalue5");
+ assertThat(conf.getOrDefault("mykey6", null)).isEqualTo("my: value6");
+ assertThat(conf.getOrDefault("mykey7", "null")).isEqualTo("null");
+ assertThat(conf.getOrDefault("mykey8", "null")).isEqualTo("null");
+ assertThat(conf.getOrDefault("mykey9", null)).isEqualTo("myvalue10");
+ }
+
+ @Test
+ // We allow malformed YAML files if loading legacy flink conf
+ void testInvalidLegacyYamlFile() throws IOException {
+ final File confFile =
+ new File(
+ tmpDir.getPath(),
+
ConfigurationFileMigrationUtils.LEGACY_FLINK_CONF_FILENAME);
+
+ try (PrintWriter pw = new PrintWriter(confFile)) {
+ pw.append("invalid");
+ }
+
+
assertThat(ConfigurationFileMigrationUtils.loadLegacyYAMLResource(confFile)).isNotNull();
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index ff6d6bcf9db..7fde75c5b32 100644
---
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -18,20 +18,15 @@
package org.apache.flink.configuration;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.data.Offset;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -44,17 +39,9 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
* This class contains test for the configuration package. In particular, the
serialization of
* {@link Configuration} objects is tested.
*/
-@ExtendWith(ParameterizedTestExtension.class)
@SuppressWarnings("deprecation")
class ConfigurationTest {
- @Parameter private boolean standardYaml;
-
- @Parameters(name = "standardYaml: {0}")
- private static Collection<Boolean> parameters() {
- return Arrays.asList(true, false);
- }
-
private static final ConfigOption<String> STRING_OPTION =
ConfigOptions.key("test-string-key").stringType().noDefaultValue();
@@ -79,9 +66,9 @@ class ConfigurationTest {
private static final String MAP_PROPERTY_2 = MAP_OPTION.key() + ".prop2";
/** This test checks the serialization/deserialization of configuration
objects. */
- @TestTemplate
+ @Test
void testConfigurationSerializationAndGetters() throws
ClassNotFoundException, IOException {
- final Configuration orig = new Configuration(standardYaml);
+ final Configuration orig = new Configuration();
orig.setString("mykey", "myvalue");
orig.setInteger("mynumber", 100);
orig.setLong("longvalue", 478236947162389746L);
@@ -107,11 +94,11 @@ class ConfigurationTest {
assertThat(copy).hasSameHashCodeAs(orig);
}
- @TestTemplate
+ @Test
void testCopyConstructor() {
final String key = "theKey";
- Configuration cfg1 = new Configuration(standardYaml);
+ Configuration cfg1 = new Configuration();
cfg1.setString(key, "value");
Configuration cfg2 = new Configuration(cfg1);
@@ -120,9 +107,9 @@ class ConfigurationTest {
assertThat(cfg1.getString(key, "")).isEqualTo("value");
}
- @TestTemplate
+ @Test
void testOptionWithDefault() {
- Configuration cfg = new Configuration(standardYaml);
+ Configuration cfg = new Configuration();
cfg.setInteger("int-key", 11);
cfg.setString("string-key", "abc");
@@ -155,9 +142,9 @@ class ConfigurationTest {
assertThat(cfg.getValue(intOption)).isEqualTo("87");
}
- @TestTemplate
+ @Test
void testOptionWithNoDefault() {
- Configuration cfg = new Configuration(standardYaml);
+ Configuration cfg = new Configuration();
cfg.setInteger("int-key", 11);
cfg.setString("string-key", "abc");
@@ -179,9 +166,9 @@ class ConfigurationTest {
assertThat(cfg.getString(stringOption,
"override")).isEqualTo("override");
}
- @TestTemplate
+ @Test
void testDeprecatedKeys() {
- Configuration cfg = new Configuration(standardYaml);
+ Configuration cfg = new Configuration();
cfg.setInteger("the-key", 11);
cfg.setInteger("old-key", 12);
cfg.setInteger("older-key", 13);
@@ -216,9 +203,9 @@ class ConfigurationTest {
assertThat(cfg.getInteger(notContained)).isEqualTo(-1);
}
- @TestTemplate
+ @Test
void testFallbackKeys() {
- Configuration cfg = new Configuration(standardYaml);
+ Configuration cfg = new Configuration();
cfg.setInteger("the-key", 11);
cfg.setInteger("old-key", 12);
cfg.setInteger("older-key", 13);
@@ -253,7 +240,7 @@ class ConfigurationTest {
assertThat(cfg.getInteger(notContained)).isEqualTo(-1);
}
- @TestTemplate
+ @Test
void testFallbackAndDeprecatedKeys() {
final ConfigOption<Integer> fallback =
ConfigOptions.key("fallback").intType().defaultValue(-1);
@@ -268,11 +255,11 @@ class ConfigurationTest {
.withFallbackKeys(fallback.key())
.withDeprecatedKeys(deprecated.key());
- final Configuration fallbackCfg = new Configuration(standardYaml);
+ final Configuration fallbackCfg = new Configuration();
fallbackCfg.setInteger(fallback, 1);
assertThat(fallbackCfg.getInteger(mainOption)).isOne();
- final Configuration deprecatedCfg = new Configuration(standardYaml);
+ final Configuration deprecatedCfg = new Configuration();
deprecatedCfg.setInteger(deprecated, 2);
assertThat(deprecatedCfg.getInteger(mainOption)).isEqualTo(2);
@@ -285,16 +272,16 @@ class ConfigurationTest {
.withDeprecatedKeys(deprecated.key())
.withFallbackKeys(fallback.key());
- final Configuration deprecatedAndFallBackConfig = new
Configuration(standardYaml);
+ final Configuration deprecatedAndFallBackConfig = new Configuration();
deprecatedAndFallBackConfig.setInteger(fallback, 1);
deprecatedAndFallBackConfig.setInteger(deprecated, 2);
assertThat(deprecatedAndFallBackConfig.getInteger(mainOption)).isOne();
assertThat(deprecatedAndFallBackConfig.getInteger(reversedMainOption)).isOne();
}
- @TestTemplate
+ @Test
void testRemove() {
- Configuration cfg = new Configuration(standardYaml);
+ Configuration cfg = new Configuration();
cfg.setInteger("a", 1);
cfg.setInteger("b", 2);
@@ -318,9 +305,9 @@ class ConfigurationTest {
.as("Expected 'unexistedOption' is not removed");
}
- @TestTemplate
+ @Test
void testRemoveKey() {
- Configuration cfg = new Configuration(standardYaml);
+ Configuration cfg = new Configuration();
String key1 = "a.b";
String key2 = "c.d";
cfg.setInteger(key1, 42);
@@ -337,27 +324,27 @@ class ConfigurationTest {
assertThat(cfg.keySet()).containsExactlyInAnyOrder("e.f");
}
- @TestTemplate
+ @Test
void testShouldParseValidStringToEnum() {
- final Configuration configuration = new Configuration(standardYaml);
+ final Configuration configuration = new Configuration();
configuration.setString(STRING_OPTION.key(),
TestEnum.VALUE1.toString());
final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class,
STRING_OPTION);
assertThat(TestEnum.VALUE1).isEqualTo(parsedEnumValue);
}
- @TestTemplate
+ @Test
void testShouldParseValidStringToEnumIgnoringCase() {
- final Configuration configuration = new Configuration(standardYaml);
+ final Configuration configuration = new Configuration();
configuration.setString(STRING_OPTION.key(),
TestEnum.VALUE1.toString().toLowerCase());
final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class,
STRING_OPTION);
assertThat(TestEnum.VALUE1).isEqualTo(parsedEnumValue);
}
- @TestTemplate
+ @Test
void testThrowsExceptionIfTryingToParseInvalidStringForEnum() {
- final Configuration configuration = new Configuration(standardYaml);
+ final Configuration configuration = new Configuration();
final String invalidValueForTestEnum = "InvalidValueForTestEnum";
configuration.setString(STRING_OPTION.key(), invalidValueForTestEnum);
@@ -371,9 +358,9 @@ class ConfigurationTest {
+ ")");
}
- @TestTemplate
+ @Test
void testToMap() {
- final Configuration configuration = new Configuration(standardYaml);
+ final Configuration configuration = new Configuration();
final String listValues = "value1;value2;value3";
final String yamlListValues = "[value1, value2, value3]";
configuration.set(LIST_STRING_OPTION,
Arrays.asList(listValues.split(";")));
@@ -388,20 +375,14 @@ class ConfigurationTest {
final Duration duration = Duration.ofMillis(3000);
configuration.set(DURATION_OPTION, duration);
- if (standardYaml) {
- assertThat(yamlListValues)
-
.isEqualTo(configuration.toMap().get(LIST_STRING_OPTION.key()));
-
assertThat(yamlMapValues).isEqualTo(configuration.toMap().get(MAP_OPTION.key()));
- } else {
-
assertThat(listValues).isEqualTo(configuration.toMap().get(LIST_STRING_OPTION.key()));
-
assertThat(mapValues).isEqualTo(configuration.toMap().get(MAP_OPTION.key()));
- }
+
assertThat(yamlListValues).isEqualTo(configuration.toMap().get(LIST_STRING_OPTION.key()));
+
assertThat(yamlMapValues).isEqualTo(configuration.toMap().get(MAP_OPTION.key()));
assertThat("3
s").isEqualTo(configuration.toMap().get(DURATION_OPTION.key()));
}
- @TestTemplate
+ @Test
void testToFileWritableMap() {
- final Configuration configuration = new Configuration(standardYaml);
+ final Configuration configuration = new Configuration();
final String listValues = "value1;value2;value3";
final String yamlListValues = "[value1, value2, value3]";
configuration.set(LIST_STRING_OPTION,
Arrays.asList(listValues.split(";")));
@@ -420,35 +401,26 @@ class ConfigurationTest {
final String yamlStrValues = "'*'";
configuration.set(STRING_OPTION, strValues);
- if (standardYaml) {
-
assertThat(configuration.toFileWritableMap().get(LIST_STRING_OPTION.key()))
- .isEqualTo(yamlListValues);
- assertThat(configuration.toFileWritableMap().get(MAP_OPTION.key()))
- .isEqualTo(yamlMapValues);
-
assertThat(configuration.toFileWritableMap().get(STRING_OPTION.key()))
- .isEqualTo(yamlStrValues);
- } else {
-
assertThat(configuration.toFileWritableMap().get(LIST_STRING_OPTION.key()))
- .isEqualTo(listValues);
- assertThat(configuration.toFileWritableMap().get(MAP_OPTION.key()))
- .isEqualTo(mapValues);
-
assertThat(configuration.toFileWritableMap().get(STRING_OPTION.key()))
- .isEqualTo(strValues);
- }
+
assertThat(configuration.toFileWritableMap().get(LIST_STRING_OPTION.key()))
+ .isEqualTo(yamlListValues);
+ assertThat(configuration.toFileWritableMap().get(MAP_OPTION.key()))
+ .isEqualTo(yamlMapValues);
+ assertThat(configuration.toFileWritableMap().get(STRING_OPTION.key()))
+ .isEqualTo(yamlStrValues);
assertThat(configuration.toMap().get(DURATION_OPTION.key())).isEqualTo("3 s");
}
- @TestTemplate
+ @Test
void testMapNotContained() {
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
assertThat(cfg.getOptional(MAP_OPTION)).isNotPresent();
assertThat(cfg.contains(MAP_OPTION)).isFalse();
}
- @TestTemplate
+ @Test
void testMapWithPrefix() {
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
cfg.setString(MAP_PROPERTY_1, "value1");
cfg.setInteger(MAP_PROPERTY_2, 12);
@@ -456,18 +428,18 @@ class ConfigurationTest {
assertThat(cfg.contains(MAP_OPTION)).isTrue();
}
- @TestTemplate
+ @Test
void testMapWithoutPrefix() {
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
cfg.set(MAP_OPTION, PROPERTIES_MAP);
assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP);
assertThat(cfg.contains(MAP_OPTION)).isTrue();
}
- @TestTemplate
+ @Test
void testMapNonPrefixHasPrecedence() {
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
cfg.set(MAP_OPTION, PROPERTIES_MAP);
cfg.setString(MAP_PROPERTY_1, "value1");
cfg.setInteger(MAP_PROPERTY_2, 99999);
@@ -477,9 +449,9 @@ class ConfigurationTest {
assertThat(cfg.containsKey(MAP_PROPERTY_1)).isTrue();
}
- @TestTemplate
+ @Test
void testMapThatOverwritesPrefix() {
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
cfg.setString(MAP_PROPERTY_1, "value1");
cfg.setInteger(MAP_PROPERTY_2, 99999);
cfg.set(MAP_OPTION, PROPERTIES_MAP);
@@ -489,9 +461,9 @@ class ConfigurationTest {
assertThat(cfg.containsKey(MAP_PROPERTY_1)).isFalse();
}
- @TestTemplate
+ @Test
void testMapRemovePrefix() {
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
cfg.setString(MAP_PROPERTY_1, "value1");
cfg.setInteger(MAP_PROPERTY_2, 99999);
cfg.removeConfig(MAP_OPTION);
@@ -501,14 +473,14 @@ class ConfigurationTest {
assertThat(cfg.containsKey(MAP_PROPERTY_2)).isFalse();
}
- @TestTemplate
+ @Test
void testListParserErrorDoesNotLeakSensitiveData() {
ConfigOption<List<String>> secret =
ConfigOptions.key("secret").stringType().asList().noDefaultValue();
assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
// missing closing quote
cfg.setString(secret.key(), "'secret_value");
@@ -520,14 +492,14 @@ class ConfigurationTest {
.doesNotContain("secret_value"));
}
- @TestTemplate
+ @Test
void testMapParserErrorDoesNotLeakSensitiveData() {
ConfigOption<Map<String, String>> secret =
ConfigOptions.key("secret").mapType().noDefaultValue();
assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
// malformed map representation
cfg.setString(secret.key(), "secret_value");
@@ -539,22 +511,22 @@ class ConfigurationTest {
.doesNotContain("secret_value"));
}
- @TestTemplate
+ @Test
void testToStringDoesNotLeakSensitiveData() {
ConfigOption<Map<String, String>> secret =
ConfigOptions.key("secret").mapType().noDefaultValue();
assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
- final Configuration cfg = new Configuration(standardYaml);
+ final Configuration cfg = new Configuration();
cfg.setString(secret.key(), "secret_value");
assertThat(cfg.toString()).doesNotContain("secret_value");
}
- @TestTemplate
+ @Test
void testGetWithOverrideDefault() {
- final Configuration conf = new Configuration(standardYaml);
+ final Configuration conf = new Configuration();
// Test for integer without default value.
ConfigOption<Integer> integerOption0 =
diff --git
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
index 547b91f9a92..12c1dab1def 100644
---
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
+++
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
@@ -18,20 +18,14 @@
package org.apache.flink.configuration;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.TimeUtils;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -44,17 +38,9 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link ConfigurationUtils}. */
-@ExtendWith(ParameterizedTestExtension.class)
class ConfigurationUtilsTest {
- @Parameter private boolean standardYaml;
-
- @Parameters(name = "standardYaml: {0}")
- private static Collection<Boolean> parameters() {
- return Arrays.asList(true, false);
- }
-
- @TestTemplate
+ @Test
void testPropertiesToConfiguration() {
final Properties properties = new Properties();
final int entries = 10;
@@ -82,7 +68,7 @@ class ConfigurationUtilsTest {
expectedMap.put("k2", "v2");
String legacyMapPattern = "k1:v1,k2:v2";
- Configuration configuration = new Configuration(true);
+ Configuration configuration = new Configuration();
configuration.setString("listKey", legacyListPattern);
configuration.setString("mapKey", legacyMapPattern);
@@ -97,18 +83,18 @@ class ConfigurationUtilsTest {
.isEqualTo(expectedMap);
}
- @TestTemplate
+ @Test
void testConvertConfigToWritableLinesAndFlattenYaml() {
testConvertConfigToWritableLines(true);
}
- @TestTemplate
+ @Test
void testConvertConfigToWritableLinesAndNoFlattenYaml() {
testConvertConfigToWritableLines(false);
}
private void testConvertConfigToWritableLines(boolean flattenYaml) {
- final Configuration configuration = new Configuration(standardYaml);
+ final Configuration configuration = new Configuration();
ConfigOption<List<String>> nestedListOption =
ConfigOptions.key("nested.test-list-key").stringType().asList().noDefaultValue();
final String listValues = "value1;value2;value3";
@@ -143,47 +129,35 @@ class ConfigurationUtilsTest {
List<String> actualData =
ConfigurationUtils.convertConfigToWritableLines(configuration,
flattenYaml);
List<String> expected;
- if (standardYaml) {
- if (flattenYaml) {
- expected =
- Arrays.asList(
- nestedListOption.key() + ": " + yamlListValues,
- nestedMapOption.key() + ": " + yamlMapValues,
- nestedDurationOption.key()
- + ": "
- +
TimeUtils.formatWithHighestUnit(duration),
- nestedStringOption.key() + ": " +
yamlStrValues,
- intOption.key() + ": " + intValue);
- } else {
- expected =
- Arrays.asList(
- "nested:",
- " test-list-key:",
- " - value1",
- " - value2",
- " - value3",
- " test-map-key:",
- " key1: value1",
- " key2: value2",
- " test-duration-key: 3 s",
- " test-string-key: '*'",
- "test-int-key: 1");
- }
- } else {
+ if (flattenYaml) {
expected =
Arrays.asList(
- nestedListOption.key() + ": " + listValues,
- nestedMapOption.key() + ": " + mapValues,
+ nestedListOption.key() + ": " + yamlListValues,
+ nestedMapOption.key() + ": " + yamlMapValues,
nestedDurationOption.key()
+ ": "
+
TimeUtils.formatWithHighestUnit(duration),
- nestedStringOption.key() + ": " + strValues,
+ nestedStringOption.key() + ": " + yamlStrValues,
intOption.key() + ": " + intValue);
+ } else {
+ expected =
+ Arrays.asList(
+ "nested:",
+ " test-list-key:",
+ " - value1",
+ " - value2",
+ " - value3",
+ " test-map-key:",
+ " key1: value1",
+ " key2: value2",
+ " test-duration-key: 3 s",
+ " test-string-key: '*'",
+ "test-int-key: 1");
}
assertThat(expected).containsExactlyInAnyOrderElementsOf(actualData);
}
- @TestTemplate
+ @Test
void testHideSensitiveValues() {
final Map<String, String> keyValuePairs = new HashMap<>();
keyValuePairs.put("foobar", "barfoo");
@@ -204,7 +178,7 @@ class ConfigurationUtilsTest {
assertThat(hiddenSensitiveValues).isEqualTo(expectedKeyValuePairs);
}
- @TestTemplate
+ @Test
void testGetPrefixedKeyValuePairs() {
final String prefix = "test.prefix.";
final Map<String, String> expectedKeyValuePairs =
@@ -224,48 +198,34 @@ class ConfigurationUtilsTest {
assertThat(resultKeyValuePairs).isEqualTo(expectedKeyValuePairs);
}
- @TestTemplate
+ @Test
void testConvertToString() {
// String
- assertThat(ConfigurationUtils.convertToString("Simple String",
standardYaml))
- .isEqualTo("Simple String");
+ assertThat(ConfigurationUtils.convertToString("Simple
String")).isEqualTo("Simple String");
// Duration
- assertThat(ConfigurationUtils.convertToString(Duration.ZERO,
standardYaml))
- .isEqualTo("0 ms");
- assertThat(ConfigurationUtils.convertToString(Duration.ofMillis(123L),
standardYaml))
- .isEqualTo("123 ms");
-
assertThat(ConfigurationUtils.convertToString(Duration.ofMillis(1_234_000L),
standardYaml))
+
assertThat(ConfigurationUtils.convertToString(Duration.ZERO)).isEqualTo("0 ms");
+
assertThat(ConfigurationUtils.convertToString(Duration.ofMillis(123L))).isEqualTo("123
ms");
+
assertThat(ConfigurationUtils.convertToString(Duration.ofMillis(1_234_000L)))
.isEqualTo("1234 s");
- assertThat(ConfigurationUtils.convertToString(Duration.ofHours(25L),
standardYaml))
- .isEqualTo("25 h");
+
assertThat(ConfigurationUtils.convertToString(Duration.ofHours(25L))).isEqualTo("25
h");
// List
List<Object> listElements = new ArrayList<>();
listElements.add("Test;String");
listElements.add(Duration.ZERO);
listElements.add(42);
- if (standardYaml) {
- assertThat("[Test;String, 0 ms, 42]")
-
.isEqualTo(ConfigurationUtils.convertToString(listElements, true));
- } else {
- assertThat("'Test;String';0 ms;42")
-
.isEqualTo(ConfigurationUtils.convertToString(listElements, false));
- }
+ assertThat("[Test;String, 0 ms, 42]")
+ .isEqualTo(ConfigurationUtils.convertToString(listElements));
// Map
Map<Object, Object> mapElements = new HashMap<>();
mapElements.put("A:,B", "C:,D");
mapElements.put(10, 20);
- if (standardYaml) {
- assertThat("{'A:,B': 'C:,D', 10: 20}")
- .isEqualTo(ConfigurationUtils.convertToString(mapElements,
true));
- } else {
- assertThat("'''A:,B'':''C:,D''',10:20")
- .isEqualTo(ConfigurationUtils.convertToString(mapElements,
false));
- }
+ assertThat("{'A:,B': 'C:,D', 10: 20}")
+ .isEqualTo(ConfigurationUtils.convertToString(mapElements));
}
- @TestTemplate
+ @Test
void testRandomTempDirectorySelection() {
final Configuration configuration = new Configuration();
final StringBuilder tempDirectories = new StringBuilder();
diff --git
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index 603527ad8f7..5f57777d264 100644
---
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -41,49 +41,6 @@ class GlobalConfigurationTest {
@TempDir private File tmpDir;
- @Test
- void testConfigurationWithLegacyYAML() throws FileNotFoundException {
- File confFile = new File(tmpDir,
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
- try (PrintWriter pw = new PrintWriter(confFile)) {
- pw.println("###########################"); // should be skipped
- pw.println("# Some : comments : to skip"); // should be skipped
- pw.println("###########################"); // should be skipped
- pw.println("mykey1: myvalue1"); // OK, simple correct case
- pw.println("mykey2 : myvalue2"); // OK, whitespace before
colon is correct
- pw.println("mykey3:myvalue3"); // SKIP, missing white space after
colon
- pw.println(" some nonsense without colon and whitespace
separator"); // SKIP
- pw.println(" : "); // SKIP
- pw.println(" "); // SKIP (silently)
- pw.println(" "); // SKIP (silently)
- pw.println("mykey4: myvalue4# some comments"); // OK, skip
comments only
- pw.println(" mykey5 : myvalue5 "); // OK, trim
unnecessary whitespace
- pw.println("mykey6: my: value6"); // OK, only use first ': ' as
separator
- pw.println("mykey7: "); // SKIP, no value provided
- pw.println(": myvalue8"); // SKIP, no key provided
-
- pw.println("mykey9: myvalue9"); // OK
- pw.println("mykey9: myvalue10"); // OK, overwrite last value
- }
- Configuration conf =
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-
- // all distinct keys from confFile1 + confFile2 key
- assertThat(conf.keySet()).hasSize(6);
-
- // keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected values
- assertThat(conf.getString("mykey1", null)).isEqualTo("myvalue1");
- assertThat(conf.getString("mykey1", null)).isEqualTo("myvalue1");
- assertThat(conf.getString("mykey2", null)).isEqualTo("myvalue2");
- assertThat(conf.getString("mykey3", "null")).isEqualTo("null");
- assertThat(conf.getString("mykey4", null)).isEqualTo("myvalue4");
- assertThat(conf.getString("mykey5", null)).isEqualTo("myvalue5");
- assertThat(conf.getString("mykey6", null)).isEqualTo("my: value6");
- assertThat(conf.getString("mykey7", "null")).isEqualTo("null");
- assertThat(conf.getString("mykey8", "null")).isEqualTo("null");
- assertThat(conf.getString("mykey9", null)).isEqualTo("myvalue10");
- // Clear the standard yaml flag to avoid impact to other cases.
- GlobalConfiguration.setStandardYaml(true);
- }
-
@Test
void testConfigurationWithStandardYAML() throws FileNotFoundException {
File confFile = new File(tmpDir,
GlobalConfiguration.FLINK_CONF_FILENAME);
@@ -154,21 +111,6 @@ class GlobalConfigurationTest {
.isInstanceOf(IllegalConfigurationException.class);
}
- @Test
- // We allow malformed YAML files if loaded legacy flink conf
- void testInvalidLegacyYamlFile() throws IOException {
- final File confFile =
- new File(tmpDir.getPath(),
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
-
- try (PrintWriter pw = new PrintWriter(confFile)) {
- pw.append("invalid");
- }
-
-
assertThat(GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath())).isNotNull();
- // Clear the standard yaml flag to avoid impact to other cases.
- GlobalConfiguration.setStandardYaml(true);
- }
-
@Test
// We do not allow malformed YAML files if loaded standard yaml
void testInvalidStandardYamlFile() throws IOException {
diff --git
a/flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java
b/flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java
index e5af433a566..c94814450cc 100644
---
a/flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java
+++
b/flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -57,56 +56,53 @@ import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(ParameterizedTestExtension.class)
class ReadableWritableConfigurationTest {
- @Parameters(name = "testSpec = {0}, standardYaml = {1}")
- private static Collection<Object[]> parameters() {
+ @Parameters(name = "testSpec = {0}")
+ private static Collection<TestSpec<?>> parameters() {
List<TestSpec<?>> testSpecs =
Arrays.asList(
new
TestSpec<>(ConfigOptions.key("int").intType().defaultValue(-1))
- .valueEquals(12345, "12345", "12345")
+ .valueEquals(12345, "12345")
.checkDefaultOverride(5),
new
TestSpec<>(ConfigOptions.key("long").longType().defaultValue(-1L))
- .valueEquals(12345L, "12345", "12345")
+ .valueEquals(12345L, "12345")
.checkDefaultOverride(5L),
new
TestSpec<>(ConfigOptions.key("float").floatType().defaultValue(0.01F))
- .valueEquals(0.003F, "0.003", "0.003")
+ .valueEquals(0.003F, "0.003")
.checkDefaultOverride(1.23F),
new
TestSpec<>(ConfigOptions.key("double").doubleType().defaultValue(0.01D))
- .valueEquals(0.003D, "0.003", "0.003")
+ .valueEquals(0.003D, "0.003")
.checkDefaultOverride(1.23D),
new TestSpec<>(
ConfigOptions.key("boolean")
.booleanType()
.defaultValue(false))
- .valueEquals(true, "true", "true")
+ .valueEquals(true, "true")
.checkDefaultOverride(true),
new TestSpec<>(
ConfigOptions.key("list<int>")
.intType()
.asList()
.defaultValues(-1, 2, 3))
- .valueEquals(
- Arrays.asList(1, 2, 3, 4, 5),
- "1;2;3;4;5",
- "[1, 2, 3, 4, 5]")
+ .valueEquals(Arrays.asList(1, 2, 3, 4, 5),
"[1, 2, 3, 4, 5]")
.checkDefaultOverride(Arrays.asList(1, 2)),
new TestSpec<>(
ConfigOptions.key("list<string>")
.stringType()
.asList()
.defaultValues("A", "B", "C"))
- .valueEquals(Arrays.asList("A;B", "C"),
"'A;B';C", "['A;B', C]")
+ .valueEquals(Arrays.asList("A;B", "C"),
"['A;B', C]")
.checkDefaultOverride(Collections.singletonList("C")),
new TestSpec<>(
ConfigOptions.key("interval")
.durationType()
.defaultValue(Duration.ofHours(3)))
- .valueEquals(Duration.ofMinutes(3), "3 min",
"3 min")
+ .valueEquals(Duration.ofMinutes(3), "3 min")
.checkDefaultOverride(Duration.ofSeconds(1)),
new TestSpec<>(
ConfigOptions.key("memory")
.memoryType()
.defaultValue(new
MemorySize(1024)))
- .valueEquals(new MemorySize(1024 * 1024 *
1024), "1g", "1g")
+ .valueEquals(new MemorySize(1024 * 1024 *
1024), "1g")
.checkDefaultOverride(new MemorySize(2048)),
new TestSpec<>(
ConfigOptions.key("properties")
@@ -122,7 +118,6 @@ class ReadableWritableConfigurationTest {
Arrays.asList(
Tuple2.of("key1",
"value1"),
Tuple2.of("key2",
"value2"))),
- "key1:value1,key2:value2",
"{key1: value1, key2: value2}")
.checkDefaultOverride(Collections.emptyMap()),
new TestSpec<>(
@@ -144,15 +139,9 @@ class ReadableWritableConfigurationTest {
asMap(
Collections.singletonList(
Tuple2.of("key3", "value3")))),
- "key1:value1,key2:value2;key3:value3",
"[{key1: value1, key2: value2}, {key3:
value3}]")
.checkDefaultOverride(Collections.emptyList()));
- List<Object[]> list = new ArrayList<>();
- for (TestSpec<?> testSpec : testSpecs) {
- list.add(new Object[] {testSpec, true});
- list.add(new Object[] {testSpec, false});
- }
- return list;
+ return testSpecs;
}
private static Map<String, String> asMap(List<Tuple2<String, String>>
entries) {
@@ -161,12 +150,9 @@ class ReadableWritableConfigurationTest {
@Parameter TestSpec<?> testSpec;
- @Parameter(value = 1)
- boolean standardYaml;
-
@TestTemplate
void testGetOptionalFromObject() {
- Configuration configuration = new Configuration(standardYaml);
+ Configuration configuration = new Configuration();
testSpec.setValue(configuration);
Optional<?> optional = configuration.getOptional(testSpec.getOption());
@@ -176,8 +162,8 @@ class ReadableWritableConfigurationTest {
@TestTemplate
void testGetOptionalFromString() {
ConfigOption<?> option = testSpec.getOption();
- Configuration configuration = new Configuration(standardYaml);
- configuration.setString(option.key(),
testSpec.getStringValue(standardYaml));
+ Configuration configuration = new Configuration();
+ configuration.setString(option.key(), testSpec.getStringValue());
Optional<?> optional = configuration.getOptional(option);
assertThat(optional).isPresent().get().isEqualTo(testSpec.getValue());
@@ -185,7 +171,7 @@ class ReadableWritableConfigurationTest {
@TestTemplate
void testGetDefaultValue() {
- Configuration configuration = new Configuration(standardYaml);
+ Configuration configuration = new Configuration();
ConfigOption<?> option = testSpec.getOption();
Object value = configuration.get(option);
@@ -195,7 +181,7 @@ class ReadableWritableConfigurationTest {
@TestTemplate
@SuppressWarnings("unchecked")
void testGetOptionalDefaultValueOverride() {
- ReadableConfig configuration = new Configuration(standardYaml);
+ ReadableConfig configuration = new Configuration();
ConfigOption<?> option = testSpec.getOption();
Object value =
@@ -207,7 +193,6 @@ class ReadableWritableConfigurationTest {
private static class TestSpec<T> {
private final ConfigOption<T> option;
private T value;
- private String stringValue;
private String yamlStringValue;
private T defaultValueOverride;
@@ -215,9 +200,8 @@ class ReadableWritableConfigurationTest {
this.option = option;
}
- TestSpec<T> valueEquals(T objectValue, String stringValue, String
yamlStringValue) {
+ TestSpec<T> valueEquals(T objectValue, String yamlStringValue) {
this.value = objectValue;
- this.stringValue = stringValue;
this.yamlStringValue = yamlStringValue;
return this;
}
@@ -238,12 +222,8 @@ class ReadableWritableConfigurationTest {
return value;
}
- String getStringValue(boolean standardYaml) {
- if (standardYaml) {
- return yamlStringValue;
- } else {
- return stringValue;
- }
+ String getStringValue() {
+ return yamlStringValue;
}
T getDefaultValueOverride() {
@@ -265,8 +245,6 @@ class ReadableWritableConfigurationTest {
+ option
+ ", value="
+ value
- + ", stringValue='"
- + stringValue
+ ", yamlStringValue='"
+ yamlStringValue
+ '\''
diff --git a/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
b/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
index 4be918ff139..594ff7e361c 100755
--- a/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
+++ b/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
@@ -35,10 +35,7 @@ setJavaHome() {
# NOTE: we need to obtain JAVA_HOME before using BashJavaUtils, so the
value for env.java.home must
# be in a flattened format, rather than nested, allowing us to retrieve
the corresponding value via
# shell script.
- CONF_FILE="$1/flink-conf.yaml"
- if [ ! -e "$1/flink-conf.yaml" ]; then
- CONF_FILE="$1/config.yaml"
- fi;
+ CONF_FILE="$1"
KEY_ENV_JAVA_HOME="env.java.home"
MY_JAVA_HOME=$(readFromConfigFile ${KEY_ENV_JAVA_HOME} "" "${CONF_FILE}")
diff --git a/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
b/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
index e714428583a..22095aa1fcb 100755
--- a/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
+++ b/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
@@ -25,7 +25,9 @@ if [ "$#" -lt 3 ]; then
fi
source "$2"/bash-java-utils.sh
-setJavaRun "$1"
+
+CONF_FILE="$1/config.yaml"
+setJavaRun "$CONF_FILE"
ARGS=("${@:1}")
result=$(updateAndGetFlinkConfiguration "${ARGS[@]}")
@@ -36,10 +38,5 @@ if [[ $? -ne 0 ]]; then
exit 1
fi
-CONF_FILE="$1/flink-conf.yaml"
-if [ ! -e "$1/flink-conf.yaml" ]; then
- CONF_FILE="$1/config.yaml"
-fi;
-
# Output the result
echo "${result}" > "$CONF_FILE";
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh
b/flink-dist/src/main/flink-bin/bin/config.sh
index e45e1fb4f36..c5859ad8382 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -213,7 +213,7 @@ export FLINK_LIB_DIR
export FLINK_OPT_DIR
source "${FLINK_BIN_DIR}/bash-java-utils.sh"
-setJavaRun "$FLINK_CONF_DIR"
+setJavaRun "$FLINK_CONF_DIR/config.yaml"
YAML_CONF=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}"
"${FLINK_BIN_DIR}" ${FLINK_LIB_DIR} -flatten)
########################################################################################################################
diff --git a/flink-dist/src/main/flink-bin/bin/migrate-config-file.sh
b/flink-dist/src/main/flink-bin/bin/migrate-config-file.sh
index a82d1725e55..a797a814afc 100755
--- a/flink-dist/src/main/flink-bin/bin/migrate-config-file.sh
+++ b/flink-dist/src/main/flink-bin/bin/migrate-config-file.sh
@@ -26,7 +26,7 @@ FLINK_BIN_DIR=`cd "$FLINK_BIN_DIR"; pwd`
FLINK_CONF_DIR="$FLINK_BIN_DIR"/../conf
echo "Using Flink configuration directory: $FLINK_CONF_DIR"
-setJavaRun "$FLINK_CONF_DIR"
+setJavaRun "$FLINK_CONF_DIR/flink-conf.yaml"
FLINK_LIB_DIR="$FLINK_BIN_DIR"/../lib
echo "Using Flink library directory: $FLINK_LIB_DIR"
diff --git a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
index 8b1f43225ca..77c14f9f5c1 100755
--- a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
+++ b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
@@ -34,15 +34,17 @@ bin=`cd "$bin"; pwd`
if [ "$COMMAND" = "MIGRATE_LEGACY_FLINK_CONFIGURATION_TO_STANDARD_YAML" ]; then
FLINK_CONF_DIR="${bin}/../resources"
+ FLINK_CONF="$FLINK_CONF_DIR/flink-conf.yaml"
else
FLINK_CONF_DIR="${bin}/../../main/resources"
+ FLINK_CONF="$FLINK_CONF_DIR/config.yaml"
fi
FLINK_TARGET_DIR=${bin}/../../../target
FLINK_DIST_JARS=(`find ${FLINK_TARGET_DIR} -maxdepth 1 -name
'flink-dist*.jar'`)
FLINK_DIST_CLASSPATH=`echo ${FLINK_DIST_JARS[@]} | tr ' ' ':'`
. ${bin}/../../main/flink-bin/bin/bash-java-utils.sh > /dev/null
-setJavaRun "$FLINK_CONF_DIR"
+setJavaRun "$FLINK_CONF"
output=$(runBashJavaUtilsCmd ${COMMAND} ${FLINK_CONF_DIR}
"$FLINK_TARGET_DIR/bash-java-utils.jar:${FLINK_DIST_CLASSPATH}" $DYNAMIC_OPTS)
extractExecutionResults "${output}" ${EXPECTED_LINES}
diff --git a/flink-end-to-end-tests/test-scripts/common.sh
b/flink-end-to-end-tests/test-scripts/common.sh
index a36cbbbbab1..65ca25ebf4f 100755
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -55,8 +55,9 @@ source "${FLINK_DIR}/bin/bash-java-utils.sh"
if [[ -z "${FLINK_CONF_DIR:-}" ]]; then
FLINK_CONF_DIR="$FLINK_DIR/conf"
fi
-setJavaRun "$FLINK_CONF_DIR"
FLINK_CONF=${FLINK_CONF_DIR}/config.yaml
+setJavaRun "$FLINK_CONF"
+
# Flatten the configuration file config.yaml to enable end-to-end test cases
which will modify
# it directly through shell scripts.
output=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" "${FLINK_DIR}/bin"
"${FLINK_DIR}/lib" -flatten)
diff --git a/flink-python/docs/reference/pyflink.table/table_environment.rst
b/flink-python/docs/reference/pyflink.table/table_environment.rst
index 302dc3d5743..1039f5b2d33 100644
--- a/flink-python/docs/reference/pyflink.table/table_environment.rst
+++ b/flink-python/docs/reference/pyflink.table/table_environment.rst
@@ -72,7 +72,7 @@ programs.
This class is a pure API class that abstracts configuration from various
sources. Currently,
configuration can be set in any of the following layers (in the given order):
-- flink-conf.yaml
+- config.yaml
- CLI parameters
- :class:`~pyflink.datastream.StreamExecutionEnvironment` when bridging to
DataStream API
- :func:`~EnvironmentSettings.Builder.with_configuration`
diff --git a/flink-python/pyflink/common/configuration.py
b/flink-python/pyflink/common/configuration.py
index 779568da151..d833f83ba02 100644
--- a/flink-python/pyflink/common/configuration.py
+++ b/flink-python/pyflink/common/configuration.py
@@ -76,13 +76,11 @@ class Configuration:
@staticmethod
def parse_jars_value(value: str, jvm):
- is_standard_yaml =
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
- if is_standard_yaml:
- from ruamel.yaml import YAML
- yaml = YAML(typ='safe')
- jar_urls_list = yaml.load(value)
- if isinstance(jar_urls_list, list):
- return jar_urls_list
+ from ruamel.yaml import YAML
+ yaml = YAML(typ='safe')
+ jar_urls_list = yaml.load(value)
+ if isinstance(jar_urls_list, list):
+ return jar_urls_list
return value.split(";")
def get_integer(self, key: str, default_value: int) -> int:
diff --git a/flink-python/pyflink/pyflink_gateway_server.py
b/flink-python/pyflink/pyflink_gateway_server.py
index 0eae1e961ee..a2c90797151 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -46,34 +46,14 @@ def on_windows():
def read_from_config(key, default_value, flink_conf_directory):
from ruamel.yaml import YAML
yaml = YAML(typ='safe')
- # try to find flink-conf.yaml file in flink_conf_directory
- flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml")
- if os.path.isfile(flink_conf_file):
- # If flink-conf.yaml exists, use the old parsing logic to read the
value
- # get the realpath of tainted path value to avoid CWE22 problem that
constructs a path
- # or URI using the tainted value and might allow an attacker to
access, modify, or test
- # the existence of critical or sensitive files.
- with open(os.path.realpath(flink_conf_file), "r") as f:
- while True:
- line = f.readline()
- if not line:
- break
- if line.startswith("#") or len(line.strip()) == 0:
- continue
- k, v = line.split(":", 1)
- if k.strip() == key:
- return v.strip()
- else:
- # If flink-conf.yaml does not exist, try to find config.yaml instead
- config_file = os.path.join(flink_conf_directory, "config.yaml")
- if os.path.isfile(config_file):
- # If config.yaml exists, use YAML parser to read the value
- with open(os.path.realpath(config_file), "r") as f:
- config = yaml.load(f)
- flat_config = flatten_config(config)
- return flat_config.get(key, default_value)
-
- # If neither file exists, return the default value
+ config_file = os.path.join(flink_conf_directory, "config.yaml")
+ if os.path.isfile(config_file):
+ # If config.yaml exists, use YAML parser to read the value
+ with open(os.path.realpath(config_file), "r") as f:
+ config = yaml.load(f)
+ flat_config = flatten_config(config)
+ return flat_config.get(key, default_value)
+
return default_value
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
index 0e92e238a84..7d645dcf605 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationFileMigrationUtils;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
@@ -37,13 +38,11 @@ import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import static org.apache.flink.util.MathUtils.checkedDownCast;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* Utility class to extract related parameters from {@link Configuration} and
to sanity check them.
@@ -230,17 +229,9 @@ public class ConfigurationParserUtils {
throw e;
}
- checkState(
- new File(
- clusterConfiguration.getConfigDir(),
- GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME)
- .exists());
- Configuration configuration =
-
GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(),
null);
-
- Configuration standardYamlConfig = new Configuration(true);
- standardYamlConfig.addAll(configuration);
-
- return
ConfigurationUtils.convertConfigToWritableLines(standardYamlConfig, false);
+ return ConfigurationUtils.convertConfigToWritableLines(
+
ConfigurationFileMigrationUtils.migrateLegacyToStandardYamlConfig(
+ clusterConfiguration.getConfigDir()),
+ false);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 26c832eefc5..ceb2177cff9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
-import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -225,16 +224,6 @@ class BootstrapToolsTest {
@Test
void testWriteConfigurationAndReloadWithStandardYaml() throws Exception {
- testWriteConfigurationAndReloadInternal(true);
- }
-
- @Test
- void testWriteConfigurationAndReloadWithLegacyYaml() throws Exception {
- testWriteConfigurationAndReloadInternal(false);
- }
-
- private void testWriteConfigurationAndReloadInternal(boolean standardYaml)
throws IOException {
- GlobalConfiguration.setStandardYaml(standardYaml);
final File flinkConfDir =
TempDirUtils.newFolder(temporaryFolder).getAbsoluteFile();
final Configuration flinkConfig = new Configuration();
@@ -298,16 +287,13 @@ class BootstrapToolsTest {
BootstrapTools.writeConfiguration(
flinkConfig, new File(flinkConfDir,
GlobalConfiguration.getFlinkConfFilename()));
- final Configuration loadedFlinkConfig =
-
GlobalConfiguration.loadConfiguration(flinkConfDir.getAbsolutePath());
+ final Configuration loadedFlinkConfig;
+ loadedFlinkConfig =
GlobalConfiguration.loadConfiguration(flinkConfDir.getAbsolutePath());
assertThat(loadedFlinkConfig.get(listStringConfigOption))
.containsExactlyInAnyOrderElementsOf(list);
assertThat(loadedFlinkConfig.get(listDurationConfigOption))
.containsExactlyInAnyOrderElementsOf(durationList);
assertThat(loadedFlinkConfig.get(mapConfigOption)).containsAllEntriesOf(map);
assertThat(loadedFlinkConfig.get(durationConfigOption)).isEqualTo(duration);
-
- // clean the standard yaml flag to avoid impact to other cases.
- GlobalConfiguration.setStandardYaml(true);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
index 9c72922895f..5312db4ba9b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
@@ -21,43 +21,29 @@ package org.apache.flink.runtime.util.bash;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationFileMigrationUtils;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link FlinkConfigLoader}. */
-@ExtendWith(ParameterizedTestExtension.class)
public class FlinkConfigLoaderTest {
- @Parameter public boolean standardYaml;
-
- @Parameters(name = "standardYaml: {0}")
- public static Collection<Boolean> parameters() {
- return Arrays.asList(true, false);
- }
-
private static final String TEST_CONFIG_KEY = "test.key";
private static final String TEST_CONFIG_VALUE = "test_value";
@@ -65,105 +51,71 @@ public class FlinkConfigLoaderTest {
@BeforeEach
void setUp() throws IOException {
- File flinkConfFile;
- if (standardYaml) {
- flinkConfFile =
- TempDirUtils.newFile(
- confDir.toAbsolutePath(),
GlobalConfiguration.FLINK_CONF_FILENAME);
- } else {
- flinkConfFile =
- TempDirUtils.newFile(
- confDir.toAbsolutePath(),
- GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
- }
+ File flinkConfFile =
+ TempDirUtils.newFile(
+ confDir.toAbsolutePath(),
GlobalConfiguration.FLINK_CONF_FILENAME);
FileWriter fw = new FileWriter(flinkConfFile);
fw.write(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE + "\n");
fw.close();
}
- @AfterAll
- static void after() {
- // clean the standard yaml flag to avoid impact to other cases.
- GlobalConfiguration.setStandardYaml(true);
- }
-
- @TestTemplate
+ @Test
void testLoadConfigurationConfigDirLongOpt() throws Exception {
String[] args = {"--configDir", confDir.toFile().getAbsolutePath()};
Configuration configuration =
FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationConfigDirLongOpt() throws Exception {
String[] args = {"--configDir", confDir.toFile().getAbsolutePath()};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
- } else {
- assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " +
TEST_CONFIG_VALUE);
- }
+ assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
}
- @TestTemplate
+ @Test
void testLoadConfigurationConfigDirShortOpt() throws Exception {
String[] args = {"-c", confDir.toFile().getAbsolutePath()};
Configuration configuration =
FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationConfigDirShortOpt() throws Exception {
String[] args = {"-c", confDir.toFile().getAbsolutePath()};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
- } else {
- assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " +
TEST_CONFIG_VALUE);
- }
+ assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
}
- @TestTemplate
+ @Test
void testLoadConfigurationDynamicPropertyWithSpace() throws Exception {
String[] args = {"--configDir", confDir.toFile().getAbsolutePath(),
"-D", "key=value"};
Configuration configuration =
FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, "key", "value");
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationDynamicPropertyWithSpace() throws
Exception {
String[] args = {"--configDir", confDir.toFile().getAbsolutePath(),
"-D", "key=value"};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE, "key: value");
- } else {
- assertThat(list)
- .containsExactlyInAnyOrder(
- TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key:
value");
- }
+ assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE, "key: value");
}
- @TestTemplate
+ @Test
void testLoadConfigurationDynamicPropertyWithoutSpace() throws Exception {
String[] args = {"--configDir", confDir.toFile().getAbsolutePath(),
"-Dkey=value"};
Configuration configuration =
FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, "key", "value");
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationDynamicPropertyWithoutSpace() throws
Exception {
String[] args = {"--configDir", confDir.toFile().getAbsolutePath(),
"-Dkey=value"};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE, "key: value");
- } else {
- assertThat(list)
- .containsExactlyInAnyOrder(
- TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key:
value");
- }
+ assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE, "key: value");
}
- @TestTemplate
+ @Test
void testLoadConfigurationIgnoreUnknownToken() throws Exception {
String[] args = {
"unknown",
@@ -178,7 +130,7 @@ public class FlinkConfigLoaderTest {
verifyConfiguration(configuration, "key", "value");
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationIgnoreUnknownToken() throws Exception {
String[] args = {
"unknown",
@@ -189,16 +141,10 @@ public class FlinkConfigLoaderTest {
"-Dkey=value"
};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE, "key: value");
- } else {
- assertThat(list)
- .containsExactlyInAnyOrder(
- TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key:
value");
- }
+ assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE, "key: value");
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationRemoveKeysMatched() throws Exception {
String key = "key";
@@ -210,14 +156,10 @@ public class FlinkConfigLoaderTest {
key
};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
- } else {
- assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": "
+ TEST_CONFIG_VALUE);
- }
+ assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationRemoveKeysNotMatched() throws Exception
{
String key = "key";
String value = "value";
@@ -231,17 +173,11 @@ public class FlinkConfigLoaderTest {
removeKey
};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list)
- .containsExactly("test:", " key: " + TEST_CONFIG_VALUE,
key + ": " + value);
- } else {
- assertThat(list)
- .containsExactlyInAnyOrder(
- TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, key +
": " + value);
- }
+ assertThat(list)
+ .containsExactly("test:", " key: " + TEST_CONFIG_VALUE, key +
": " + value);
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationRemoveKeyValuesMatched() throws
Exception {
String removeKey = "removeKey";
String removeValue = "removeValue";
@@ -254,14 +190,10 @@ public class FlinkConfigLoaderTest {
String.format("%s=%s", removeKey, removeValue)
};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
- } else {
- assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": "
+ TEST_CONFIG_VALUE);
- }
+ assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationRemoveKeyValuesNotMatched() throws
Exception {
String removeKey = "removeKey";
String removeValue = "removeValue";
@@ -275,19 +207,12 @@ public class FlinkConfigLoaderTest {
String.format("%s=%s", removeKey, nonExistentValue)
};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list)
- .containsExactlyInAnyOrder(
- "test:", " key: " + TEST_CONFIG_VALUE, removeKey
+ ": " + removeValue);
- } else {
- assertThat(list)
- .containsExactlyInAnyOrder(
- TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE,
- removeKey + ": " + removeValue);
- }
+ assertThat(list)
+ .containsExactlyInAnyOrder(
+ "test:", " key: " + TEST_CONFIG_VALUE, removeKey + ":
" + removeValue);
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationReplaceKeyValuesMatched() throws
Exception {
String newValue = "newValue";
@@ -298,14 +223,10 @@ public class FlinkConfigLoaderTest {
String.format("%s,%s,%s", TEST_CONFIG_KEY, TEST_CONFIG_VALUE,
newValue)
};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " + newValue);
- } else {
- assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": "
+ newValue);
- }
+ assertThat(list).containsExactly("test:", " key: " + newValue);
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationReplaceKeyValuesNotMatched() throws
Exception {
String nonExistentValue = "nonExistentValue";
String newValue = "newValue";
@@ -317,14 +238,10 @@ public class FlinkConfigLoaderTest {
String.format("%s,%s,%s", TEST_CONFIG_KEY, nonExistentValue,
newValue)
};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
- if (standardYaml) {
- assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
- } else {
- assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": "
+ TEST_CONFIG_VALUE);
- }
+ assertThat(list).containsExactly("test:", " key: " +
TEST_CONFIG_VALUE);
}
- @TestTemplate
+ @Test
void testloadAndModifyConfigurationWithFlatten() throws Exception {
String[] args = {"-c", confDir.toFile().getAbsolutePath(), "-flatten"};
List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
@@ -333,11 +250,11 @@ public class FlinkConfigLoaderTest {
@Test
void testMigrateLegacyConfigToStandardYaml() throws Exception {
- try (FileWriter fw =
- new FileWriter(
- new File(
- confDir.toFile(),
-
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME))) {
+ File file =
+ new File(
+ confDir.toFile(),
+
ConfigurationFileMigrationUtils.LEGACY_FLINK_CONF_FILENAME);
+ try (FileWriter fw = new FileWriter(file)) {
fw.write(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE + "\n");
fw.write(
"pipeline.cached-files"
@@ -351,7 +268,8 @@ public class FlinkConfigLoaderTest {
+ "
class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"
+ "\n");
}
- Configuration configuration =
GlobalConfiguration.loadConfiguration(confDir.toString());
+ Map<String, String> configuration =
+ ConfigurationFileMigrationUtils.loadLegacyYAMLResource(file);
File newConfigFolder = TempDirUtils.newFolder(confDir);
try (FileWriter fw =
@@ -367,11 +285,15 @@ public class FlinkConfigLoaderTest {
Configuration standardYamlConfig =
GlobalConfiguration.loadConfiguration(newConfigFolder.toString());
- assertThat(configuration.getString(TEST_CONFIG_KEY, null))
+ assertThat(configuration.getOrDefault(TEST_CONFIG_KEY, null))
.isEqualTo(standardYamlConfig.getString(TEST_CONFIG_KEY,
null));
+ List<String> serializers =
+ ConfigurationUtils.convertToList(
+
configuration.get(PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key()),
+ String.class);
assertThat(
-
configuration.get(PipelineOptions.KRYO_DEFAULT_SERIALIZERS).stream()
+ serializers.stream()
.map(ConfigurationUtils::parseStringToMap)
.collect(Collectors.toList()))
.isEqualTo(
@@ -379,9 +301,10 @@ public class FlinkConfigLoaderTest {
.map(ConfigurationUtils::parseStringToMap)
.collect(Collectors.toList()));
- assertThat(
- DistributedCache.parseCachedFilesFromString(
-
configuration.get(PipelineOptions.CACHED_FILES)))
+ List<String> cachedFiles =
+ ConfigurationUtils.convertToList(
+ configuration.get(PipelineOptions.CACHED_FILES.key()),
String.class);
+ assertThat(DistributedCache.parseCachedFilesFromString(cachedFiles))
.isEqualTo(
DistributedCache.parseCachedFilesFromString(
standardYamlConfig.get(PipelineOptions.CACHED_FILES)));
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
index 1ed5fa133c7..9fa6db466a1 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
@@ -163,7 +162,6 @@ public class ForStStateBackendConfigTest {
@Test
public void testConfigureForStCompressionPerLevel() throws Exception {
- GlobalConfiguration.setStandardYaml(false);
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
ForStStateBackend forStStateBackend = new ForStStateBackend();
CompressionType[] compressionTypes = {
@@ -184,7 +182,6 @@ public class ForStStateBackendConfigTest {
resourceContainer.close();
env.close();
- GlobalConfiguration.setStandardYaml(true);
}
@Test
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index bcd9023845c..b19c80eb6f5 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
@@ -291,7 +290,6 @@ public class RocksDBStateBackendConfigTest {
@Test
public void testConfigureRocksDBCompressionPerLevel() throws Exception {
- GlobalConfiguration.setStandardYaml(false);
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
EmbeddedRocksDBStateBackend rocksDbBackend = new
EmbeddedRocksDBStateBackend();
CompressionType[] compressionTypes = {
@@ -312,7 +310,6 @@ public class RocksDBStateBackendConfigTest {
resourceContainer.close();
env.close();
- GlobalConfiguration.setStandardYaml(true);
}
@Test