This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 081051a2cacaddf6dfe613da061f15f28a015a41 Author: JunRuiLee <[email protected]> AuthorDate: Mon Jan 29 17:35:19 2024 +0800 [FLINK-34257][core] Update Flink YAML Parser to Support YAML 1.2 Specification. This closes #24213. --- flink-core/pom.xml | 7 + .../flink/configuration/YamlParserUtils.java | 167 +++++++++++---------- .../flink/configuration/YamlParserUtilsTest.java | 63 +++++++- flink-dist/src/main/resources/META-INF/NOTICE | 1 + flink-python/dev/dev-requirements.txt | 2 +- flink-python/pyflink/common/configuration.py | 5 +- flink-python/pyflink/pyflink_gateway_server.py | 5 +- flink-python/setup.py | 2 +- 8 files changed, 159 insertions(+), 93 deletions(-) diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 917ef53cfaf..618ac8c15dc 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -96,6 +96,13 @@ under the License. <!-- managed version --> </dependency> + <!-- YAML parser utilities --> + <dependency> + <groupId>org.snakeyaml</groupId> + <artifactId>snakeyaml-engine</artifactId> + <version>2.6</version> + </dependency> + <!-- standard utilities --> <dependency> <groupId>org.apache.commons</groupId> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java index ae9280b3a86..a8b845043f6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java @@ -20,20 +20,21 @@ package org.apache.flink.configuration; import org.apache.flink.util.TimeUtils; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.DumperOptions; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.LoaderOptions; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.constructor.Constructor; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.Mark; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.MarkedYAMLException; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.YAMLException; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.Node; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.Tag; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.representer.Represent; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.representer.Representer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.snakeyaml.engine.v2.api.Dump; +import org.snakeyaml.engine.v2.api.DumpSettings; +import org.snakeyaml.engine.v2.api.Load; +import org.snakeyaml.engine.v2.api.LoadSettings; +import org.snakeyaml.engine.v2.common.FlowStyle; +import org.snakeyaml.engine.v2.exceptions.Mark; +import org.snakeyaml.engine.v2.exceptions.MarkedYamlEngineException; +import org.snakeyaml.engine.v2.exceptions.YamlEngineException; +import org.snakeyaml.engine.v2.nodes.Node; +import org.snakeyaml.engine.v2.nodes.ScalarNode; +import org.snakeyaml.engine.v2.nodes.Tag; +import org.snakeyaml.engine.v2.representer.StandardRepresenter; +import org.snakeyaml.engine.v2.schema.CoreSchema; import javax.annotation.Nonnull; @@ -47,6 +48,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** * This class contains utility methods to load standard yaml file and convert object to standard @@ -56,26 +58,30 @@ public class YamlParserUtils { private static final Logger LOG = LoggerFactory.getLogger(YamlParserUtils.class); - private static final Yaml yaml; + private static final DumpSettings blockerDumperSettings = + DumpSettings.builder() + .setDefaultFlowStyle(FlowStyle.BLOCK) + // Disable split long lines to avoid add unexpected line breaks + .setSplitLines(false) + .setSchema(new CoreSchema()) + .build(); - private static final DumperOptions dumperOptions = new DumperOptions(); + private static final DumpSettings flowDumperSettings = + DumpSettings.builder() + .setDefaultFlowStyle(FlowStyle.FLOW) + // Disable split long lines to avoid add unexpected line breaks + .setSplitLines(false) + .setSchema(new CoreSchema()) + .build(); - private static final LoaderOptions loaderOptions = new LoaderOptions(); + private static final Dump blockerDumper = + new Dump(blockerDumperSettings, new FlinkConfigRepresenter(blockerDumperSettings)); - static { - // Make the dump output is in single line - dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW); - dumperOptions.setWidth(Integer.MAX_VALUE); - // The standard YAML do not allow duplicate keys. - loaderOptions.setAllowDuplicateKeys(false); + private static final Dump flowDumper = + new Dump(flowDumperSettings, new FlinkConfigRepresenter(flowDumperSettings)); - yaml = - new Yaml( - new Constructor(loaderOptions), - new FlinkConfigRepresenter(dumperOptions), - dumperOptions, - loaderOptions); - } + private static final Load loader = + new Load(LoadSettings.builder().setSchema(new CoreSchema()).build()); /** * Loads the contents of the given YAML file into a map. @@ -84,21 +90,22 @@ public class YamlParserUtils { * @return a non-null map representing the YAML content. If the file is empty or only contains * comments, an empty map is returned. * @throws FileNotFoundException if the YAML file is not found. - * @throws YAMLException if the file cannot be parsed. + * @throws YamlEngineException if the file cannot be parsed. * @throws IOException if an I/O error occurs while reading from the file stream. */ public static synchronized @Nonnull Map<String, Object> loadYamlFile(File file) throws Exception { try (FileInputStream inputStream = new FileInputStream((file))) { - Map<String, Object> yamlResult = yaml.load(inputStream); + Map<String, Object> yamlResult = + (Map<String, Object>) loader.loadFromInputStream(inputStream); return yamlResult == null ? new HashMap<>() : yamlResult; } catch (FileNotFoundException e) { LOG.error("Failed to find YAML file", e); throw e; - } catch (IOException | YAMLException e) { - if (e instanceof MarkedYAMLException) { - YAMLException exception = - wrapExceptionToHiddenSensitiveData((MarkedYAMLException) e); + } catch (IOException | YamlEngineException e) { + if (e instanceof MarkedYamlEngineException) { + YamlEngineException exception = + wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e); LOG.error("Failed to parse YAML configuration", exception); throw exception; } else { @@ -122,14 +129,14 @@ public class YamlParserUtils { */ public static synchronized String toYAMLString(Object value) { try { - String output = yaml.dump(value); + String output = flowDumper.dumpToString(value); // remove the line break - String linebreak = dumperOptions.getLineBreak().getString(); + String linebreak = flowDumperSettings.getBestLineBreak(); if (output.endsWith(linebreak)) { output = output.substring(0, output.length() - linebreak.length()); } return output; - } catch (MarkedYAMLException exception) { + } catch (MarkedYamlEngineException exception) { throw wrapExceptionToHiddenSensitiveData(exception); } } @@ -159,18 +166,18 @@ public class YamlParserUtils { } currentMap.put(keys[keys.length - 1], entry.getValue()); } - String data = yaml.dumpAsMap(nestedMap); - String linebreak = dumperOptions.getLineBreak().getString(); + String data = blockerDumper.dumpToString(nestedMap); + String linebreak = blockerDumperSettings.getBestLineBreak(); return Arrays.asList(data.split(linebreak)); - } catch (MarkedYAMLException exception) { + } catch (MarkedYamlEngineException exception) { throw wrapExceptionToHiddenSensitiveData(exception); } } public static synchronized <T> T convertToObject(String value, Class<T> type) { try { - return yaml.loadAs(value, type); - } catch (MarkedYAMLException exception) { + return type.cast(loader.loadFromString(value)); + } catch (MarkedYamlEngineException exception) { throw wrapExceptionToHiddenSensitiveData(exception); } } @@ -199,44 +206,48 @@ public class YamlParserUtils { * in 'reader', line 2, column 1 * }</pre> * - * @param exception The MarkedYAMLException containing potentially sensitive data. - * @return A YAMLException with a message that has sensitive data hidden. + * @param exception The MarkedYamlEngineException containing potentially sensitive data. + * @return A YamlEngineException with a message that has sensitive data hidden. */ - private static YAMLException wrapExceptionToHiddenSensitiveData(MarkedYAMLException exception) { + private static YamlEngineException wrapExceptionToHiddenSensitiveData( + MarkedYamlEngineException exception) { StringBuilder lines = new StringBuilder(); String context = exception.getContext(); - Mark contextMark = exception.getContextMark(); + Optional<Mark> contextMark = exception.getContextMark(); + Optional<Mark> problemMark = exception.getProblemMark(); String problem = exception.getProblem(); - Mark problemMark = exception.getProblemMark(); if (context != null) { lines.append(context); lines.append("\n"); } - if (contextMark != null + + if (contextMark.isPresent() && (problem == null - || problemMark == null - || contextMark.getName().equals(problemMark.getName()) - || (contextMark.getLine() != problemMark.getLine()) - || (contextMark.getColumn() != problemMark.getColumn()))) { - lines.append(hiddenSensitiveDataInMark(contextMark)); + || !problemMark.isPresent() + || contextMark.get().getName().equals(problemMark.get().getName()) + || contextMark.get().getLine() != problemMark.get().getLine() + || contextMark.get().getColumn() != problemMark.get().getColumn())) { + lines.append(hiddenSensitiveDataInMark(contextMark.get())); lines.append("\n"); } + if (problem != null) { lines.append(problem); lines.append("\n"); } - if (problemMark != null) { - lines.append(hiddenSensitiveDataInMark(problemMark)); + + if (problemMark.isPresent()) { + lines.append(hiddenSensitiveDataInMark(problemMark.get())); lines.append("\n"); } Throwable cause = exception.getCause(); - if (cause instanceof MarkedYAMLException) { - cause = wrapExceptionToHiddenSensitiveData((MarkedYAMLException) cause); + if (cause instanceof MarkedYamlEngineException) { + cause = wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) cause); } - YAMLException yamlException = new YAMLException(lines.toString(), cause); + YamlEngineException yamlException = new YamlEngineException(lines.toString(), cause); yamlException.setStackTrace(exception.getStackTrace()); return yamlException; } @@ -254,37 +265,27 @@ public class YamlParserUtils { + (mark.getColumn() + 1); } - private static class FlinkConfigRepresenter extends Representer { - public FlinkConfigRepresenter(DumperOptions options) { - super(options); - representers.put(Duration.class, new RepresentDuration()); - representers.put(MemorySize.class, new RepresentMemorySize()); - multiRepresenters.put(Enum.class, new RepresentEnum()); + private static class FlinkConfigRepresenter extends StandardRepresenter { + public FlinkConfigRepresenter(DumpSettings dumpSettings) { + super(dumpSettings); + representers.put(Duration.class, this::representDuration); + representers.put(MemorySize.class, this::representMemorySize); + parentClassRepresenters.put(Enum.class, this::representEnum); } - private class RepresentDuration implements Represent { - @Override - public Node representData(Object data) { - Duration duration = (Duration) data; - String durationString = TimeUtils.formatWithHighestUnit(duration); - return representScalar(getTag(duration.getClass(), Tag.STR), durationString, null); - } + private Node representDuration(Object data) { + Duration duration = (Duration) data; + String durationString = TimeUtils.formatWithHighestUnit(duration); + return new ScalarNode(Tag.STR, durationString, settings.getDefaultScalarStyle()); } - private class RepresentMemorySize implements Represent { - @Override - public Node representData(Object data) { - MemorySize memorySize = (MemorySize) data; - return representScalar( - getTag(memorySize.getClass(), Tag.STR), memorySize.toString(), null); - } + private Node representMemorySize(Object data) { + MemorySize memorySize = (MemorySize) data; + return new ScalarNode(Tag.STR, memorySize.toString(), settings.getDefaultScalarStyle()); } - private class RepresentEnum implements Represent { - @Override - public Node representData(Object data) { - return representScalar(getTag(data.getClass(), Tag.STR), data.toString(), null); - } + private Node representEnum(Object data) { + return new ScalarNode(Tag.STR, data.toString(), settings.getDefaultScalarStyle()); } } } diff --git a/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java index f857914238f..df22563c474 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java @@ -20,11 +20,10 @@ package org.apache.flink.configuration; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.YAMLException; - import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.snakeyaml.engine.v2.exceptions.YamlEngineException; import java.io.File; import java.io.FileNotFoundException; @@ -75,6 +74,62 @@ class YamlParserUtilsTest { assertThat(yamlData.get("key7")).isEqualTo("true"); } + /** + * Tests to avoid potential unexpected behavior changes for FLINK configuration due to + * differences between YAML 1.2 and its predecessor YAML 1.1. This test case is based on the + * YAML Changes page <a href="https://yaml.org/spec/1.2.2/ext/changes">YAML Changes</a>. + */ + @SuppressWarnings("unchecked") + @Test + void testYaml12Features() { + // In YAML 1.2, only true and false strings are parsed as booleans (including True and + // TRUE); y, yes, on, and their negative counterparts are parsed as strings. + String booleanRepresentation = "key1: Yes\n" + "key2: y\n" + "key3: on"; + Map<String, String> expectedBooleanRepresentation = new HashMap<>(); + expectedBooleanRepresentation.put( + "key1", "Yes"); // the value is expected to Boolean#True in YAML 1.1 + expectedBooleanRepresentation.put( + "key2", "y"); // the value is expected to Boolean#True in YAML 1.1 + expectedBooleanRepresentation.put( + "key3", "on"); // the value is expected to Boolean#True in YAML 1.1 + assertThat(YamlParserUtils.convertToObject(booleanRepresentation, Map.class)) + .containsAllEntriesOf(expectedBooleanRepresentation); + + // In YAML 1.2, underlines '_' cannot be used within numerical values. + String underlineInNumber = "key1: 1_000"; + assertThat(YamlParserUtils.convertToObject(underlineInNumber, Map.class)) + .containsEntry( + "key1", + "1_000"); // In YAML 1.1, the expected value is number 1000 not a string. + + // In YAML 1.2, Octal values need a 0o prefix; e.g. 010 is now parsed with the value 10 + // rather than 8. + String octalNumber1 = "octal: 010"; + assertThat(YamlParserUtils.convertToObject(octalNumber1, Map.class)) + .containsEntry("octal", 10); // In YAML 1.1, the expected value is number 8. + String octalNumber2 = "octal: 0o10"; + assertThat(YamlParserUtils.convertToObject(octalNumber2, Map.class)) + .containsEntry("octal", 8); + + // In YAML 1.2, the binary and sexagesimal integer formats have been dropped. + String binaryNumber = "binary: 0b101"; + assertThat(YamlParserUtils.convertToObject(binaryNumber, Map.class)) + .containsEntry( + "binary", + "0b101"); // In YAML 1.1, the expected value is number 5 not a string. + String sexagesimalNumber = "sexagesimal: 1:00"; + assertThat(YamlParserUtils.convertToObject(sexagesimalNumber, Map.class)) + .containsEntry( + "sexagesimal", + "1:00"); // In YAML 1.1, the expected value is number 60 not a string. + + // In YAML 1.2, the !!pairs, !!omap, !!set, !!timestamp and !!binary types have been + // dropped. + String timestamp = "!!timestamp 2001-12-15T02:59:43.1Z"; + assertThatThrownBy(() -> YamlParserUtils.convertToObject(timestamp, Object.class)) + .isInstanceOf(YamlEngineException.class); + } + @Test void testLoadEmptyYamlFile() throws Exception { File confFile = new File(tmpDir, "test.yaml"); @@ -92,7 +147,7 @@ class YamlParserUtilsTest { throw new RuntimeException(e); } assertThatThrownBy(() -> YamlParserUtils.loadYamlFile(confFile)) - .isInstanceOf(YAMLException.class) + .isInstanceOf(YamlEngineException.class) .satisfies( e -> Assertions.assertThat(ExceptionUtils.stringifyException(e)) @@ -109,7 +164,7 @@ class YamlParserUtilsTest { throw new RuntimeException(e); } assertThatThrownBy(() -> YamlParserUtils.loadYamlFile(confFile)) - .isInstanceOf(YAMLException.class) + .isInstanceOf(YamlEngineException.class) .satisfies( e -> Assertions.assertThat(ExceptionUtils.stringifyException(e)) diff --git a/flink-dist/src/main/resources/META-INF/NOTICE b/flink-dist/src/main/resources/META-INF/NOTICE index f5a1fc893c0..2eb8a611431 100644 --- a/flink-dist/src/main/resources/META-INF/NOTICE +++ b/flink-dist/src/main/resources/META-INF/NOTICE @@ -21,6 +21,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.objenesis:objenesis:2.1 - org.xerial.snappy:snappy-java:1.1.10.4 - tools.profiler:async-profiler:2.9 +- org.snakeyaml:snakeyaml-engine:2.6 This project bundles the following dependencies under the BSD license. See bundled license files for details. diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt index f692a63f0f8..0e7d3fadb0c 100755 --- a/flink-python/dev/dev-requirements.txt +++ b/flink-python/dev/dev-requirements.txt @@ -32,4 +32,4 @@ pemja==0.4.1; platform_system != 'Windows' httplib2>=0.19.0 protobuf>=3.19.0 pytest~=7.0 -pyyaml>=6.0.1 +ruamel.yaml>=0.18.4 diff --git a/flink-python/pyflink/common/configuration.py b/flink-python/pyflink/common/configuration.py index 35416b8fa47..779568da151 100644 --- a/flink-python/pyflink/common/configuration.py +++ b/flink-python/pyflink/common/configuration.py @@ -78,8 +78,9 @@ class Configuration: def parse_jars_value(value: str, jvm): is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() if is_standard_yaml: - import yaml - jar_urls_list = yaml.safe_load(value) + from ruamel.yaml import YAML + yaml = YAML(typ='safe') + jar_urls_list = yaml.load(value) if isinstance(jar_urls_list, list): return jar_urls_list return value.split(";") diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py index 19fd24bf7e5..0eae1e961ee 100644 --- a/flink-python/pyflink/pyflink_gateway_server.py +++ b/flink-python/pyflink/pyflink_gateway_server.py @@ -44,7 +44,8 @@ def on_windows(): def read_from_config(key, default_value, flink_conf_directory): - import yaml + from ruamel.yaml import YAML + yaml = YAML(typ='safe') # try to find flink-conf.yaml file in flink_conf_directory flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml") if os.path.isfile(flink_conf_file): @@ -68,7 +69,7 @@ def read_from_config(key, default_value, flink_conf_directory): if os.path.isfile(config_file): # If config.yaml exists, use YAML parser to read the value with open(os.path.realpath(config_file), "r") as f: - config = yaml.safe_load(f) + config = yaml.load(f) flat_config = flatten_config(config) return flat_config.get(key, default_value) diff --git a/flink-python/setup.py b/flink-python/setup.py index 24637fe4c7d..e5e1e231b22 100644 --- a/flink-python/setup.py +++ b/flink-python/setup.py @@ -326,7 +326,7 @@ try: 'pyarrow>=5.0.0', 'pemja==0.4.1;platform_system != "Windows"', 'httplib2>=0.19.0', - 'pyyaml>=6.0.1', + 'ruamel.yaml>=0.18.4', apache_flink_libraries_dependency] setup(
