This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch flink-config in repository https://gitbox.apache.org/repos/asf/streampark.git
commit 2be23c794619e6fd1a952393172e0e3b59e16996 Author: benjobs <benj...@apache.org> AuthorDate: Mon Sep 15 00:49:25 2025 +0800 [BUG] get flink-config bug fixed. --- pom.xml | 6 +- streampark-common/pom.xml | 4 +- .../apache/streampark/common/util/MemorySize.java | 421 +++++++++++++++++++++ .../apache/streampark/common/util/TimeUtils.java | 248 ++++++++++++ .../streampark/common/util/YamlParserUtils.java | 323 ++++++++++++++++ .../streampark/common/util/PropertiesUtils.scala | 20 +- .../flink/client/bean/SubmitRequest.scala | 29 +- .../streampark-flink-client-core/pom.xml | 5 + .../configuration/FlinkGlobalConfiguration.java | 313 +++++++++++++++ .../flink/client/trait/FlinkClientTrait.scala | 41 +- .../flink/client/test/YarnPerJobTestCase.scala | 2 +- 11 files changed, 1359 insertions(+), 53 deletions(-) diff --git a/pom.xml b/pom.xml index ca3fbaa2b..29e5496bf 100644 --- a/pom.xml +++ b/pom.xml @@ -113,7 +113,7 @@ <caffeine.version>2.8.6</caffeine.version> <mysql.version>8.0.27</mysql.version> <hikariCP.version>3.4.5</hikariCP.version> - <snakeyaml.version>2.0</snakeyaml.version> + <snakeyaml.version>2.6</snakeyaml.version> <typesafe-conf.version>1.4.2</typesafe-conf.version> <json4s-jackson.version>4.0.6</json4s-jackson.version> <commons-cli.version>1.5.0</commons-cli.version> @@ -245,8 +245,8 @@ </dependency> <dependency> - <groupId>org.yaml</groupId> - <artifactId>snakeyaml</artifactId> + <groupId>org.snakeyaml</groupId> + <artifactId>snakeyaml-engine</artifactId> <version>${snakeyaml.version}</version> </dependency> diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml index 3fc00a388..0dee93ec1 100644 --- a/streampark-common/pom.xml +++ b/streampark-common/pom.xml @@ -68,8 +68,8 @@ </dependency> <dependency> - <groupId>org.yaml</groupId> - <artifactId>snakeyaml</artifactId> + <groupId>org.snakeyaml</groupId> + <artifactId>snakeyaml-engine</artifactId> </dependency> <dependency> diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java b/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java new file mode 100644 index 000000000..3cb371d3d --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.IntStream; + +import static org.apache.streampark.common.util.MemorySize.MemoryUnit.BYTES; +import static org.apache.streampark.common.util.MemorySize.MemoryUnit.GIGA_BYTES; +import static org.apache.streampark.common.util.MemorySize.MemoryUnit.KILO_BYTES; +import static org.apache.streampark.common.util.MemorySize.MemoryUnit.MEGA_BYTES; +import static org.apache.streampark.common.util.MemorySize.MemoryUnit.TERA_BYTES; +import static org.apache.streampark.common.util.MemorySize.MemoryUnit.hasUnit; + +/** + * MemorySize is a representation of a number of bytes, viewable in different units. + * + * <h2>Parsing</h2> + * + * <p>The size can be parsed from a text expression. If the expression is a pure number, the value + * will be interpreted as bytes. + */ +public class MemorySize implements java.io.Serializable, Comparable<MemorySize> { + + private static final long serialVersionUID = 1L; + + public static final MemorySize ZERO = new MemorySize(0L); + + public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE); + + private static final List<MemoryUnit> ORDERED_UNITS = + Arrays.asList(BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES); + + // ------------------------------------------------------------------------ + + /** The memory size, in bytes. */ + private final long bytes; + + /** The memorized value returned by toString(). */ + private transient String stringified; + + /** The memorized value returned by toHumanReadableString(). */ + private transient String humanReadableStr; + + /** + * Constructs a new MemorySize. + * + * @param bytes The size, in bytes. Must be zero or larger. + */ + public MemorySize(long bytes) { + if (bytes < 0) { + throw new IllegalArgumentException("bytes must be >= 0"); + } + this.bytes = bytes; + } + + public static MemorySize ofMebiBytes(long mebiBytes) { + return new MemorySize(mebiBytes << 20); + } + + // ------------------------------------------------------------------------ + + /** Gets the memory size in bytes. */ + public long getBytes() { + return bytes; + } + + /** Gets the memory size in Kibibytes (= 1024 bytes). */ + public long getKibiBytes() { + return bytes >> 10; + } + + /** Gets the memory size in Mebibytes (= 1024 Kibibytes). */ + public int getMebiBytes() { + return (int) (bytes >> 20); + } + + /** Gets the memory size in Gibibytes (= 1024 Mebibytes). */ + public long getGibiBytes() { + return bytes >> 30; + } + + /** Gets the memory size in Tebibytes (= 1024 Gibibytes). */ + public long getTebiBytes() { + return bytes >> 40; + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return (int) (bytes ^ (bytes >>> 32)); + } + + @Override + public boolean equals(Object obj) { + return obj == this + || (obj != null + && obj.getClass() == this.getClass() + && ((MemorySize) obj).bytes == this.bytes); + } + + @Override + public String toString() { + if (stringified == null) { + stringified = formatToString(); + } + + return stringified; + } + + private String formatToString() { + MemoryUnit highestIntegerUnit = + IntStream.range(0, ORDERED_UNITS.size()) + .sequential() + .filter(idx -> bytes % ORDERED_UNITS.get(idx).getMultiplier() != 0) + .boxed() + .findFirst() + .map( + idx -> { + if (idx == 0) { + return ORDERED_UNITS.get(0); + } else { + return ORDERED_UNITS.get(idx - 1); + } + }) + .orElse(BYTES); + + return String.format( + "%d %s", bytes / highestIntegerUnit.getMultiplier(), highestIntegerUnit.getUnits()[1]); + } + + public String toHumanReadableString() { + if (humanReadableStr == null) { + humanReadableStr = formatToHumanReadableString(); + } + + return humanReadableStr; + } + + private String formatToHumanReadableString() { + MemoryUnit highestUnit = + IntStream.range(0, ORDERED_UNITS.size()) + .sequential() + .filter(idx -> bytes > ORDERED_UNITS.get(idx).getMultiplier()) + .boxed() + .max(Comparator.naturalOrder()) + .map(ORDERED_UNITS::get) + .orElse(BYTES); + + if (highestUnit == BYTES) { + return String.format("%d %s", bytes, BYTES.getUnits()[1]); + } else { + double approximate = 1.0 * bytes / highestUnit.getMultiplier(); + return String.format( + Locale.ROOT, "%.3f%s (%d bytes)", approximate, highestUnit.getUnits()[1], bytes); + } + } + + @Override + public int compareTo(MemorySize that) { + return Long.compare(this.bytes, that.bytes); + } + + // ------------------------------------------------------------------------ + // Calculations + // ------------------------------------------------------------------------ + + public MemorySize add(MemorySize that) { + return new MemorySize(Math.addExact(this.bytes, that.bytes)); + } + + public MemorySize subtract(MemorySize that) { + return new MemorySize(Math.subtractExact(this.bytes, that.bytes)); + } + + public MemorySize multiply(double multiplier) { + if (multiplier < 0) { + throw new IllegalArgumentException("multiplier must be >= 0"); + } + + BigDecimal product = BigDecimal.valueOf(this.bytes).multiply(BigDecimal.valueOf(multiplier)); + if (product.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) { + throw new ArithmeticException("long overflow"); + } + return new MemorySize(product.longValue()); + } + + public MemorySize divide(long by) { + if (by < 0) { + throw new IllegalArgumentException("divisor must be != 0"); + } + return new MemorySize(bytes / by); + } + + // ------------------------------------------------------------------------ + // Parsing + // ------------------------------------------------------------------------ + + /** + * Parses the given string as as MemorySize. + * + * @param text The string to parse + * @return The parsed MemorySize + * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. + */ + public static MemorySize parse(String text) throws IllegalArgumentException { + return new MemorySize(parseBytes(text)); + } + + /** + * Parses the given string with a default unit. + * + * @param text The string to parse. + * @param defaultUnit specify the default unit. + * @return The parsed MemorySize. + * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. + */ + public static MemorySize parse(String text, MemoryUnit defaultUnit) + throws IllegalArgumentException { + if (!hasUnit(text)) { + return parse(text + defaultUnit.getUnits()[0]); + } + + return parse(text); + } + + /** + * Parses the given string as bytes. The supported expressions are listed under {@link + * MemorySize}. + * + * @param text The string to parse + * @return The parsed size, in bytes. + * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. + */ + public static long parseBytes(String text) throws IllegalArgumentException { + Objects.requireNonNull(text, "text cannot be null"); + + final String trimmed = text.trim(); + if (trimmed.isEmpty()) { + throw new IllegalArgumentException("argument is an empty- or whitespace-only string"); + } + + final int len = trimmed.length(); + int pos = 0; + + char current; + while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { + pos++; + } + + final String number = trimmed.substring(0, pos); + final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US); + + if (number.isEmpty()) { + throw new NumberFormatException("text does not start with a number"); + } + + final long value; + try { + value = Long.parseLong(number); // this throws a NumberFormatException on overflow + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The value '" + + number + + "' cannot be re represented as 64bit number (numeric overflow)."); + } + + final long multiplier = parseUnit(unit).map(MemoryUnit::getMultiplier).orElse(1L); + final long result = value * multiplier; + + // check for overflow + if (result / multiplier != value) { + throw new IllegalArgumentException( + "The value '" + + text + + "' cannot be re represented as 64bit number of bytes (numeric overflow)."); + } + + return result; + } + + private static Optional<MemoryUnit> parseUnit(String unit) { + if (matchesAny(unit, BYTES)) { + return Optional.of(BYTES); + } else if (matchesAny(unit, KILO_BYTES)) { + return Optional.of(KILO_BYTES); + } else if (matchesAny(unit, MEGA_BYTES)) { + return Optional.of(MEGA_BYTES); + } else if (matchesAny(unit, GIGA_BYTES)) { + return Optional.of(GIGA_BYTES); + } else if (matchesAny(unit, TERA_BYTES)) { + return Optional.of(TERA_BYTES); + } else if (!unit.isEmpty()) { + throw new IllegalArgumentException( + "Memory size unit '" + + unit + + "' does not match any of the recognized units: " + + MemoryUnit.getAllUnits()); + } + + return Optional.empty(); + } + + private static boolean matchesAny(String str, MemoryUnit unit) { + for (String s : unit.getUnits()) { + if (s.equals(str)) { + return true; + } + } + return false; + } + + /** + * Enum which defines memory unit, mostly used to parse value from configuration file. + * + * <p>To make larger values more compact, the common size suffixes are supported: + * + * <ul> + * <li>1b or 1bytes (bytes) + * <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes) + * <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes) + * <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes) + * <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes) + * </ul> + */ + public enum MemoryUnit { + BYTES(new String[] {"b", "bytes"}, 1L), + KILO_BYTES(new String[] {"k", "kb", "kibibytes"}, 1024L), + MEGA_BYTES(new String[] {"m", "mb", "mebibytes"}, 1024L * 1024L), + GIGA_BYTES(new String[] {"g", "gb", "gibibytes"}, 1024L * 1024L * 1024L), + TERA_BYTES(new String[] {"t", "tb", "tebibytes"}, 1024L * 1024L * 1024L * 1024L); + + private final String[] units; + + private final long multiplier; + + MemoryUnit(String[] units, long multiplier) { + this.units = units; + this.multiplier = multiplier; + } + + public String[] getUnits() { + return units; + } + + public long getMultiplier() { + return multiplier; + } + + public static String getAllUnits() { + return concatenateUnits( + BYTES.getUnits(), + KILO_BYTES.getUnits(), + MEGA_BYTES.getUnits(), + GIGA_BYTES.getUnits(), + TERA_BYTES.getUnits()); + } + + public static boolean hasUnit(String text) { + Objects.requireNonNull(text, "text cannot be null"); + + final String trimmed = text.trim(); + if (trimmed.isEmpty()) { + throw new IllegalArgumentException("argument is an empty- or whitespace-only string"); + } + + final int len = trimmed.length(); + int pos = 0; + + char current; + while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { + pos++; + } + + final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US); + + return unit.length() > 0; + } + + private static String concatenateUnits(final String[]... allUnits) { + final StringBuilder builder = new StringBuilder(128); + + for (String[] units : allUnits) { + builder.append('('); + + for (String unit : units) { + builder.append(unit); + builder.append(" | "); + } + + builder.setLength(builder.length() - 3); + builder.append(") / "); + } + + builder.setLength(builder.length() - 3); + return builder.toString(); + } + } +} diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java new file mode 100644 index 000000000..c1efee3a1 --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +/** Collection of utilities about time intervals. */ +public class TimeUtils { + + private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP = + Collections.unmodifiableMap(initMap()); + + /** + * Parse the given string to a java {@link Duration}. The string is in format "{length value}{time + * unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will be considered + * as milliseconds. + * + * <p>Supported time unit labels are: + * + * <ul> + * <li>DAYS: "d", "day" + * <li>HOURS: "h", "hour" + * <li>MINUTES: "m", "min", "minute" + * <li>SECONDS: "s", "sec", "second" + * <li>MILLISECONDS: "ms", "milli", "millisecond" + * <li>MICROSECONDS: "µs", "micro", "microsecond" + * <li>NANOSECONDS: "ns", "nano", "nanosecond" + * </ul> + * + * @param text string to parse. + */ + public static Duration parseDuration(String text) { + AssertUtils.notNull(text); + + final String trimmed = text.trim(); + + if (trimmed.isEmpty()) { + throw new IllegalArgumentException("argument is an empty- or whitespace-only string."); + } + + final int len = trimmed.length(); + int pos = 0; + + char current; + while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { + pos++; + } + + final String number = trimmed.substring(0, pos); + final String unitLabel = trimmed.substring(pos).trim().toLowerCase(Locale.US); + + if (number.isEmpty()) { + throw new NumberFormatException("text does not start with a number"); + } + + final long value; + try { + value = Long.parseLong(number); // this throws a NumberFormatException on overflow + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The value '" + + number + + "' cannot be re represented as 64bit number (numeric overflow)."); + } + + if (unitLabel.isEmpty()) { + return Duration.of(value, ChronoUnit.MILLIS); + } + + ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel); + if (unit != null) { + return Duration.of(value, unit); + } else { + throw new IllegalArgumentException( + "Time interval unit label '" + + unitLabel + + "' does not match any of the recognized units: " + + TimeUnit.getAllUnits()); + } + } + + private static Map<String, ChronoUnit> initMap() { + Map<String, ChronoUnit> labelToUnit = new HashMap<>(); + for (TimeUnit timeUnit : TimeUnit.values()) { + for (String label : timeUnit.getLabels()) { + labelToUnit.put(label, timeUnit.getUnit()); + } + } + return labelToUnit; + } + + /** + * @param duration to convert to string + * @return duration string in millis + */ + public static String getStringInMillis(final Duration duration) { + return duration.toMillis() + TimeUnit.MILLISECONDS.labels.get(0); + } + + /** + * Pretty prints the duration as a lowest granularity unit that does not lose precision. + * + * <p>Examples: + * + * <pre>{@code + * Duration.ofMilliseconds(60000) will be printed as 1 min + * Duration.ofHours(1).plusSeconds(1) will be printed as 3601 s + * }</pre> + * + * <b>NOTE:</b> It supports only durations that fit into long. + */ + public static String formatWithHighestUnit(Duration duration) { + long nanos = duration.toNanos(); + + TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos); + return String.format( + "%d %s", + nanos / highestIntegerUnit.unit.getDuration().toNanos(), + highestIntegerUnit.getLabels().get(0)); + } + + private static TimeUnit getHighestIntegerUnit(long nanos) { + if (nanos == 0) { + return TimeUnit.MILLISECONDS; + } + + final List<TimeUnit> orderedUnits = + Arrays.asList( + TimeUnit.NANOSECONDS, + TimeUnit.MICROSECONDS, + TimeUnit.MILLISECONDS, + TimeUnit.SECONDS, + TimeUnit.MINUTES, + TimeUnit.HOURS, + TimeUnit.DAYS); + + TimeUnit highestIntegerUnit = null; + for (TimeUnit timeUnit : orderedUnits) { + if (nanos % timeUnit.unit.getDuration().toNanos() != 0) { + break; + } + highestIntegerUnit = timeUnit; + } + + return AssertUtils.notNull(highestIntegerUnit, "Should find a highestIntegerUnit."); + } + + /** Enum which defines time unit, mostly used to parse value from configuration file. */ + private enum TimeUnit { + DAYS(ChronoUnit.DAYS, singular("d"), plural("day")), + HOURS(ChronoUnit.HOURS, singular("h"), plural("hour")), + MINUTES(ChronoUnit.MINUTES, singular("min"), singular("m"), plural("minute")), + SECONDS(ChronoUnit.SECONDS, singular("s"), plural("sec"), plural("second")), + MILLISECONDS(ChronoUnit.MILLIS, singular("ms"), plural("milli"), plural("millisecond")), + MICROSECONDS(ChronoUnit.MICROS, singular("µs"), plural("micro"), plural("microsecond")), + NANOSECONDS(ChronoUnit.NANOS, singular("ns"), plural("nano"), plural("nanosecond")); + + private static final String PLURAL_SUFFIX = "s"; + + private final List<String> labels; + + private final ChronoUnit unit; + + TimeUnit(ChronoUnit unit, String[]... labels) { + this.unit = unit; + this.labels = + Arrays.stream(labels).flatMap(ls -> Arrays.stream(ls)).collect(Collectors.toList()); + } + + /** + * @param label the original label + * @return the singular format of the original label + */ + private static String[] singular(String label) { + return new String[] {label}; + } + + /** + * @param label the original label + * @return both the singular format and plural format of the original label + */ + private static String[] plural(String label) { + return new String[] {label, label + PLURAL_SUFFIX}; + } + + public List<String> getLabels() { + return labels; + } + + public ChronoUnit getUnit() { + return unit; + } + + public static String getAllUnits() { + return Arrays.stream(TimeUnit.values()) + .map(TimeUnit::createTimeUnitString) + .collect(Collectors.joining(", ")); + } + + private static String createTimeUnitString(TimeUnit timeUnit) { + return timeUnit.name() + ": (" + String.join(" | ", timeUnit.getLabels()) + ")"; + } + } + + private static ChronoUnit toChronoUnit(java.util.concurrent.TimeUnit timeUnit) { + switch (timeUnit) { + case NANOSECONDS: + return ChronoUnit.NANOS; + case MICROSECONDS: + return ChronoUnit.MICROS; + case MILLISECONDS: + return ChronoUnit.MILLIS; + case SECONDS: + return ChronoUnit.SECONDS; + case MINUTES: + return ChronoUnit.MINUTES; + case HOURS: + return ChronoUnit.HOURS; + case DAYS: + return ChronoUnit.DAYS; + default: + throw new IllegalArgumentException(String.format("Unsupported time unit %s.", timeUnit)); + } + } +} diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java new file mode 100644 index 000000000..850985f7f --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.snakeyaml.engine.v2.api.Dump; +import org.snakeyaml.engine.v2.api.DumpSettings; +import org.snakeyaml.engine.v2.api.Load; +import org.snakeyaml.engine.v2.api.LoadSettings; +import org.snakeyaml.engine.v2.common.FlowStyle; +import org.snakeyaml.engine.v2.exceptions.Mark; +import org.snakeyaml.engine.v2.exceptions.MarkedYamlEngineException; +import org.snakeyaml.engine.v2.exceptions.YamlEngineException; +import org.snakeyaml.engine.v2.nodes.Node; +import org.snakeyaml.engine.v2.nodes.ScalarNode; +import org.snakeyaml.engine.v2.nodes.Tag; +import org.snakeyaml.engine.v2.representer.StandardRepresenter; +import org.snakeyaml.engine.v2.schema.CoreSchema; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * This class contains utility methods to load standard yaml file and convert object to standard + * yaml syntax. + */ +public class YamlParserUtils { + + private static final Logger LOG = LoggerFactory.getLogger(YamlParserUtils.class); + + private static final DumpSettings blockerDumperSettings = + DumpSettings.builder() + .setDefaultFlowStyle(FlowStyle.BLOCK) + // Disable split long lines to avoid add unexpected line breaks + .setSplitLines(false) + .setSchema(new CoreSchema()) + .build(); + + private static final DumpSettings flowDumperSettings = + DumpSettings.builder() + .setDefaultFlowStyle(FlowStyle.FLOW) + // Disable split long lines to avoid add unexpected line breaks + .setSplitLines(false) + .setSchema(new CoreSchema()) + .build(); + + private static final Dump blockerDumper = + new Dump(blockerDumperSettings, new FlinkConfigRepresenter(blockerDumperSettings)); + + private static final Dump flowDumper = + new Dump(flowDumperSettings, new FlinkConfigRepresenter(flowDumperSettings)); + + private static final Load loader = + new Load(LoadSettings.builder().setSchema(new CoreSchema()).build()); + + /** + * Loads the contents of the given YAML file into a map. + * + * @param file the YAML file to load. + * @return a non-null map representing the YAML content. If the file is empty or only contains + * comments, an empty map is returned. + * @throws FileNotFoundException if the YAML file is not found. + * @throws YamlEngineException if the file cannot be parsed. + * @throws IOException if an I/O error occurs while reading from the file stream. + */ + public static synchronized @Nonnull Map<String, Object> loadYamlFile(File file) throws Exception { + try (FileInputStream inputStream = new FileInputStream((file))) { + Map<String, Object> yamlResult = + (Map<String, Object>) loader.loadFromInputStream(inputStream); + + return yamlResult == null ? new HashMap<>() : yamlResult; + } catch (FileNotFoundException e) { + LOG.error("Failed to find YAML file", e); + throw e; + } catch (IOException | YamlEngineException e) { + if (e instanceof MarkedYamlEngineException) { + YamlEngineException exception = + wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e); + LOG.error("Failed to parse YAML configuration", exception); + throw exception; + } else { + throw e; + } + } + } + + public static synchronized @Nonnull Map<String, Object> loadYamlInput(InputStream inputStream) + throws Exception { + try { + Map<String, Object> yamlResult = + (Map<String, Object>) loader.loadFromInputStream(inputStream); + return yamlResult == null ? new HashMap<>() : yamlResult; + } catch (YamlEngineException e) { + if (e instanceof MarkedYamlEngineException) { + YamlEngineException exception = + wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e); + LOG.error("Failed to parse YAML configuration", exception); + throw exception; + } else { + throw e; + } + } + } + + public static synchronized @Nonnull Map<String, Object> loadYamlString(String text) { + try { + Map<String, Object> yamlResult = (Map<String, Object>) loader.loadFromString(text); + + return yamlResult == null ? new HashMap<>() : yamlResult; + } catch (YamlEngineException e) { + if (e instanceof MarkedYamlEngineException) { + YamlEngineException exception = + wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e); + LOG.error("Failed to parse YAML configuration", exception); + throw exception; + } else { + throw e; + } + } + } + + /** + * Converts the given value to a string representation in the YAML syntax. This method uses a YAML + * parser to convert the object to YAML format. + * + * <p>The resulting YAML string may have line breaks at the end of each line. This method removes + * the line break at the end of the string if it exists. + * + * <p>Note: This method may perform escaping on certain characters in the value to ensure proper + * YAML syntax. + * + * @param value The value to be converted. + * @return The string representation of the value in YAML syntax. + */ + public static synchronized String toYAMLString(Object value) { + try { + String output = flowDumper.dumpToString(value); + // remove the line break + String linebreak = flowDumperSettings.getBestLineBreak(); + if (output.endsWith(linebreak)) { + output = output.substring(0, output.length() - linebreak.length()); + } + return output; + } catch (MarkedYamlEngineException exception) { + throw wrapExceptionToHiddenSensitiveData(exception); + } + } + + /** + * Converts a flat map into a nested map structure and outputs the result as a list of + * YAML-formatted strings. Each item in the list represents a single line of the YAML data. The + * method is synchronized and thus thread-safe. + * + * @param flattenMap A map containing flattened keys (e.g., "parent.child.key") associated with + * their values. + * @return A list of strings that represents the YAML data, where each item corresponds to a line + * of the data. + */ + @SuppressWarnings("unchecked") + public static synchronized List<String> convertAndDumpYamlFromFlatMap( + Map<String, Object> flattenMap) { + try { + Map<String, Object> nestedMap = new LinkedHashMap<>(); + for (Map.Entry<String, Object> entry : flattenMap.entrySet()) { + String[] keys = entry.getKey().split("\\."); + Map<String, Object> currentMap = nestedMap; + for (int i = 0; i < keys.length - 1; i++) { + currentMap = + (Map<String, Object>) currentMap.computeIfAbsent(keys[i], k -> new LinkedHashMap<>()); + } + currentMap.put(keys[keys.length - 1], entry.getValue()); + } + String data = blockerDumper.dumpToString(nestedMap); + String linebreak = blockerDumperSettings.getBestLineBreak(); + return Arrays.asList(data.split(linebreak)); + } catch (MarkedYamlEngineException exception) { + throw wrapExceptionToHiddenSensitiveData(exception); + } + } + + public static synchronized <T> T convertToObject(String value, Class<T> type) { + try { + return type.cast(loader.loadFromString(value)); + } catch (MarkedYamlEngineException exception) { + throw wrapExceptionToHiddenSensitiveData(exception); + } + } + + /** + * This method wraps a MarkedYAMLException to hide sensitive data in its message. Before using + * this method, an exception message might include sensitive information like: + * + * <pre>{@code + * while constructing a mapping + * in 'reader', line 1, column 1: + * key1: secret1 + * ^ + * found duplicate key key1 + * in 'reader', line 2, column 1: + * key1: secret2 + * ^ + * }</pre> + * + * <p>After using this method, the message will be sanitized to hide the sensitive details: + * + * <pre>{@code + * while constructing a mapping + * in 'reader', line 1, column 1 + * found duplicate key key1 + * in 'reader', line 2, column 1 + * }</pre> + * + * @param exception The MarkedYamlEngineException containing potentially sensitive data. + * @return A YamlEngineException with a message that has sensitive data hidden. + */ + private static YamlEngineException wrapExceptionToHiddenSensitiveData( + MarkedYamlEngineException exception) { + StringBuilder lines = new StringBuilder(); + String context = exception.getContext(); + Optional<Mark> contextMark = exception.getContextMark(); + Optional<Mark> problemMark = exception.getProblemMark(); + String problem = exception.getProblem(); + + if (context != null) { + lines.append(context); + lines.append("\n"); + } + + if (contextMark.isPresent() + && (problem == null + || !problemMark.isPresent() + || contextMark.get().getName().equals(problemMark.get().getName()) + || contextMark.get().getLine() != problemMark.get().getLine() + || contextMark.get().getColumn() != problemMark.get().getColumn())) { + lines.append(hiddenSensitiveDataInMark(contextMark.get())); + lines.append("\n"); + } + + if (problem != null) { + lines.append(problem); + lines.append("\n"); + } + + if (problemMark.isPresent()) { + lines.append(hiddenSensitiveDataInMark(problemMark.get())); + lines.append("\n"); + } + + Throwable cause = exception.getCause(); + if (cause instanceof MarkedYamlEngineException) { + cause = wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) cause); + } + + YamlEngineException yamlException = new YamlEngineException(lines.toString(), cause); + yamlException.setStackTrace(exception.getStackTrace()); + return yamlException; + } + + /** + * This method is a mock implementation of the Mark#toString() method, specifically designed to + * exclude the Mark#get_snippet(), to prevent leaking any sensitive data. + */ + private static String hiddenSensitiveDataInMark(Mark mark) { + return " in " + + mark.getName() + + ", line " + + (mark.getLine() + 1) + + ", column " + + (mark.getColumn() + 1); + } + + private static class FlinkConfigRepresenter extends StandardRepresenter { + public FlinkConfigRepresenter(DumpSettings dumpSettings) { + super(dumpSettings); + representers.put(Duration.class, this::representDuration); + representers.put(MemorySize.class, this::representMemorySize); + parentClassRepresenters.put(Enum.class, this::representEnum); + } + + private Node representDuration(Object data) { + Duration duration = (Duration) data; + String durationString = TimeUtils.formatWithHighestUnit(duration); + return new ScalarNode(Tag.STR, durationString, settings.getDefaultScalarStyle()); + } + + private Node representMemorySize(Object data) { + MemorySize memorySize = (MemorySize) data; + return new ScalarNode(Tag.STR, memorySize.toString(), settings.getDefaultScalarStyle()); + } + + private Node representEnum(Object data) { + return new ScalarNode(Tag.STR, data.toString(), settings.getDefaultScalarStyle()); + } + } +} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index f839fa207..f66b113f3 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -18,7 +18,6 @@ package org.apache.streampark.common.util import com.typesafe.config.ConfigFactory import org.apache.commons.lang3.StringUtils -import org.yaml.snakeyaml.Yaml import javax.annotation.Nonnull @@ -82,16 +81,8 @@ object PropertiesUtils extends Logger { } def fromYamlText(text: String): Map[String, String] = { - try { - new Yaml() - .load(text) - .asInstanceOf[java.util.Map[String, Map[String, Any]]] - .flatMap(x => eachYamlItem(x._1, x._2)) - .toMap - } catch { - case e: IOException => - throw new IllegalArgumentException(s"Failed when loading conf error:", e) - } + val map = YamlParserUtils.loadYamlString(text) + map.flatMap(x => eachYamlItem(x._1, x._2)).toMap } def fromHoconText(conf: String): Map[String, String] = { @@ -146,11 +137,8 @@ object PropertiesUtils extends Logger { inputStream != null, s"[StreamPark] fromYamlFile: Properties inputStream must not be null") try { - new Yaml() - .load(inputStream) - .asInstanceOf[java.util.Map[String, Map[String, Any]]] - .flatMap(x => eachYamlItem(x._1, x._2)) - .toMap + val map = YamlParserUtils.loadYamlInput(inputStream) + map.flatMap(x => eachYamlItem(x._1, x._2)).toMap } catch { case e: IOException => throw new IllegalArgumentException(s"Failed when loading yaml from inputStream", e) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 4cdcf07c1..86bc2bf8d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -26,7 +26,6 @@ import org.apache.streampark.flink.util.FlinkUtils import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper import org.apache.commons.io.FileUtils -import org.apache.flink.configuration.{Configuration, GlobalConfiguration} import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointRestoreSettings} import javax.annotation.Nullable @@ -35,7 +34,7 @@ import java.io.File import java.util.{Map => JavaMap} import scala.collection.JavaConversions._ -import scala.util.{Success, Try} +import scala.util.Try case class SubmitRequest( flinkVersion: FlinkVersion, @@ -99,32 +98,6 @@ case class SubmitRequest( } } - lazy val flinkDefaultConfiguration: Configuration = { - Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) match { - case Success(value) => - executionMode match { - case ExecutionMode.YARN_SESSION | ExecutionMode.KUBERNETES_NATIVE_SESSION | - ExecutionMode.REMOTE => - value - case _ => - value - .keySet() - .foreach( - k => { - val v = value.getString(k, null) - if (v != null) { - val result = v - .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", effectiveAppName) - .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString) - value.setString(k, result) - } - }) - value - } - case _ => new Configuration() - } - } - def hasProp(key: String): Boolean = properties.containsKey(key) def getProp(key: String): Any = properties.get(key) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml index 2e642312d..0a40d9a7e 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml @@ -87,6 +87,11 @@ <artifactId>json4s-jackson_${scala.binary.version}</artifactId> </dependency> + <dependency> + <groupId>org.snakeyaml</groupId> + <artifactId>snakeyaml-engine</artifactId> + </dependency> + </dependencies> <build> diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java new file mode 100644 index 000000000..715972d50 --- /dev/null +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.streampark.common.util.AssertUtils; +import org.apache.streampark.common.util.YamlParserUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Global configuration object for Flink. Similar to Java properties configuration objects it + * includes key-value pairs which represent the framework's configuration. + */ +public final class FlinkGlobalConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkGlobalConfiguration.class); + + public static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml"; + + public static final String FLINK_CONF_FILENAME = "config.yaml"; + + // key separator character + private static final String KEY_SEPARATOR = "."; + + // the keys whose values should be hidden + private static final String[] SENSITIVE_KEYS = + new String[] { + "password", + "secret", + "fs.azure.account.key", + "apikey", + "auth-params", + "service-key", + "token", + "basic-auth", + "jaas.config", + "http-headers" + }; + + // the hidden content to be displayed + public static final String HIDDEN_CONTENT = "******"; + + private static boolean standardYaml = true; + + // -------------------------------------------------------------------------------------------- + + private FlinkGlobalConfiguration() {} + + // -------------------------------------------------------------------------------------------- + + /** + * Loads the global configuration from the environment. Fails if an error occurs during loading. + * Returns an empty configuration object if the environment variable is not set. In production + * this variable is set but tests and local execution/debugging don't have this environment + * variable set. That's why we should fail if it is not set. + * + * @return Returns the Configuration + */ + public static Configuration loadConfiguration() { + return loadConfiguration(new Configuration()); + } + + /** + * Loads the global configuration and adds the given dynamic properties configuration. + * + * @param dynamicProperties The given dynamic properties + * @return Returns the loaded global configuration with dynamic properties + */ + public static Configuration loadConfiguration(Configuration dynamicProperties) { + final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); + if (configDir == null) { + return new Configuration(dynamicProperties); + } + + return loadConfiguration(configDir, dynamicProperties); + } + + /** + * Loads the configuration files from the specified directory. + * + * <p>YAML files are supported as configuration files. + * + * @param configDir the directory which contains the configuration files + */ + public static Configuration loadConfiguration(final String configDir) { + return loadConfiguration(configDir, null); + } + + /** + * Loads the configuration files from the specified directory. If the dynamic properties + * configuration is not null, then it is added to the loaded configuration. + * + * @param configDir directory to load the configuration from + * @param dynamicProperties configuration file containing the dynamic properties. Null if none. + * @return The configuration loaded from the given configuration directory + */ + public static Configuration loadConfiguration( + final String configDir, @Nullable final Configuration dynamicProperties) { + + if (configDir == null) { + throw new IllegalArgumentException( + "Given configuration directory is null, cannot load configuration"); + } + + final File confDirFile = new File(configDir); + if (!(confDirFile.exists())) { + throw new IllegalConfigurationException( + "The given configuration directory name '" + + configDir + + "' (" + + confDirFile.getAbsolutePath() + + ") does not describe an existing directory."); + } + + // get Flink yaml configuration file + File yamlConfigFile = new File(confDirFile, LEGACY_FLINK_CONF_FILENAME); + Configuration configuration; + + if (!yamlConfigFile.exists()) { + yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME); + if (!yamlConfigFile.exists()) { + throw new IllegalConfigurationException( + "The Flink config file '" + + yamlConfigFile + + "' (" + + yamlConfigFile.getAbsolutePath() + + ") does not exist."); + } else { + standardYaml = true; + LOG.info( + "Using standard YAML parser to load flink configuration file from {}.", + yamlConfigFile.getAbsolutePath()); + configuration = loadYAMLResource(yamlConfigFile); + } + } else { + standardYaml = false; + LOG.info( + "Using legacy YAML parser to load flink configuration file from {}.", + yamlConfigFile.getAbsolutePath()); + configuration = loadLegacyYAMLResource(yamlConfigFile); + } + + logConfiguration("Loading", configuration); + + if (dynamicProperties != null) { + logConfiguration("Loading dynamic", dynamicProperties); + configuration.addAll(dynamicProperties); + } + + return configuration; + } + + private static void logConfiguration(String prefix, Configuration config) { + config.confData.forEach( + (key, value) -> + LOG.info( + "{} configuration property: {}, {}", + prefix, + key, + isSensitive(key) ? HIDDEN_CONTENT : value)); + } + + private static Configuration loadLegacyYAMLResource(File file) { + final Configuration config = new Configuration(); + + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(new FileInputStream(file)))) { + + String line; + int lineNo = 0; + while ((line = reader.readLine()) != null) { + lineNo++; + // 1. check for comments + String[] comments = line.split("#", 2); + String conf = comments[0].trim(); + + // 2. get key and value + if (conf.length() > 0) { + String[] kv = conf.split(": ", 2); + + // skip line with no valid key-value pair + if (kv.length == 1) { + LOG.warn( + "Error while trying to split key and value in configuration file " + + file + + ":" + + lineNo + + ": Line is not a key-value pair (missing space after ':'?)"); + continue; + } + + String key = kv[0].trim(); + String value = kv[1].trim(); + + // sanity check + if (key.length() == 0 || value.length() == 0) { + LOG.warn( + "Error after splitting key and value in configuration file " + + file + + ":" + + lineNo + + ": Key or value was empty"); + continue; + } + + config.setString(key, value); + } + } + } catch (IOException e) { + throw new RuntimeException("Error parsing YAML configuration.", e); + } + + return config; + } + + private static Map<String, Object> flatten(Map<String, Object> config, String keyPrefix) { + final Map<String, Object> flattenedMap = new HashMap<>(); + + config.forEach( + (key, value) -> { + String flattenedKey = keyPrefix + key; + if (value instanceof Map) { + Map<String, Object> e = (Map<String, Object>) value; + flattenedMap.putAll(flatten(e, flattenedKey + KEY_SEPARATOR)); + } else { + if (value instanceof List) { + flattenedMap.put(flattenedKey, YamlParserUtils.toYAMLString(value)); + } else { + flattenedMap.put(flattenedKey, value); + } + } + }); + + return flattenedMap; + } + + private static Map<String, Object> flatten(Map<String, Object> config) { + // Since we start flattening from the root, keys should not be prefixed with anything. + return flatten(config, ""); + } + + private static Configuration loadYAMLResource(File file) { + final Configuration config = new Configuration(); + + try { + Map<String, Object> configDocument = flatten(YamlParserUtils.loadYamlFile(file)); + configDocument.forEach((k, v) -> config.setValueInternal(k, v, false)); + + return config; + } catch (Exception e) { + throw new RuntimeException("Error parsing YAML configuration.", e); + } + } + + /** + * Check whether the key is a hidden key. + * + * @param key the config key + */ + public static boolean isSensitive(String key) { + AssertUtils.notNull(key, "key"); + final String keyInLower = key.toLowerCase(); + for (String hideKey : SENSITIVE_KEYS) { + if (keyInLower.length() >= hideKey.length() && keyInLower.contains(hideKey)) { + return true; + } + } + return false; + } + + public static String getFlinkConfFilename() { + if (isStandardYaml()) { + return FLINK_CONF_FILENAME; + } else { + return LEGACY_FLINK_CONF_FILENAME; + } + } + + public static boolean isStandardYaml() { + return standardYaml; + } + + public static void setStandardYaml(boolean standardYaml) { + FlinkGlobalConfiguration.standardYaml = standardYaml; + } +} diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 0437dbaab..b652aa67c 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -18,7 +18,7 @@ package org.apache.streampark.flink.client.`trait` import org.apache.streampark.common.conf.ConfigConst._ -import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode} +import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode} import org.apache.streampark.common.util.{DeflaterUtils, Logger, PropertiesUtils, Utils} import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.core.conf.FlinkRunOption @@ -284,7 +284,8 @@ trait FlinkClientTrait extends Logger { val configurationDirectory = s"$flinkHome/conf" // 2. load the custom command lines val flinkConfig = - Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration()) + Try(FlinkGlobalConfiguration.loadConfiguration(s"$flinkHome/conf")) + .getOrElse(new Configuration()) loadCustomCommandLines(flinkConfig, configurationDirectory) } @@ -413,7 +414,8 @@ trait FlinkClientTrait extends Logger { validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome), commandLine) val flinkDefaultConfiguration = - Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration()) + Try(FlinkGlobalConfiguration.loadConfiguration(s"$flinkHome/conf")) + .getOrElse(new Configuration()) val configuration = new Configuration(flinkDefaultConfiguration) configuration.addAll(activeCommandLine.toConfiguration(commandLine)) @@ -451,6 +453,39 @@ trait FlinkClientTrait extends Logger { } } + implicit private[client] class EnhanceConfiguration(request: SubmitRequest) { + lazy val flinkDefaultConfiguration: Configuration = { + Try( + FlinkGlobalConfiguration.loadConfiguration( + s"${request.flinkVersion.flinkHome}/conf")) match { + case Success(value) => + request.executionMode match { + case ExecutionMode.YARN_SESSION | ExecutionMode.KUBERNETES_NATIVE_SESSION | + ExecutionMode.REMOTE => + value + case _ => + value + .keySet() + .foreach( + k => { + val v = value.getString(k, null) + if (v != null) { + val result = v + .replaceAll( + "\\$\\{job(Name|name)}|\\$job(Name|name)", + request.effectiveAppName) + .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", request.id.toString) + value.setString(k, result) + } + }) + value + } + case _ => new Configuration() + } + } + + } + private[client] def cancelJob( cancelRequest: CancelRequest, jobID: JobID, diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala index 6451d5908..f51defcaf 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala @@ -65,7 +65,7 @@ object YarnPerJobTestCase extends Logger { lazy val flinkDefaultConfiguration: Configuration = { require(FLINK_HOME != null) // get flink config - GlobalConfiguration.loadConfiguration(s"$FLINK_HOME/conf") + FlinkGlobalConfiguration.loadConfiguration(s"$FLINK_HOME/conf") } lazy val customCommandLines: util.List[CustomCommandLine] = {