This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch release-2.1.6
in repository https://gitbox.apache.org/repos/asf/streampark.git
The following commit(s) were added to refs/heads/release-2.1.6 by this push:
new 7548fdd09 [BUG] get flink-config bug fixed. (#4288)
7548fdd09 is described below
commit 7548fdd09f1a645817cc0e00ebd4adefe381cb97
Author: benjobs <[email protected]>
AuthorDate: Mon Sep 15 00:58:49 2025 +0800
[BUG] get flink-config bug fixed. (#4288)
---
pom.xml | 6 +-
streampark-common/pom.xml | 4 +-
.../apache/streampark/common/util/MemorySize.java | 421 +++++++++++++++++++++
.../apache/streampark/common/util/TimeUtils.java | 248 ++++++++++++
.../streampark/common/util/YamlParserUtils.java | 323 ++++++++++++++++
.../streampark/common/util/PropertiesUtils.scala | 20 +-
.../flink/client/bean/SubmitRequest.scala | 29 +-
.../streampark-flink-client-core/pom.xml | 5 +
.../configuration/FlinkGlobalConfiguration.java | 313 +++++++++++++++
.../flink/client/trait/FlinkClientTrait.scala | 41 +-
.../flink/client/test/YarnPerJobTestCase.scala | 2 +-
11 files changed, 1359 insertions(+), 53 deletions(-)
diff --git a/pom.xml b/pom.xml
index ca3fbaa2b..29e5496bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
<caffeine.version>2.8.6</caffeine.version>
<mysql.version>8.0.27</mysql.version>
<hikariCP.version>3.4.5</hikariCP.version>
- <snakeyaml.version>2.0</snakeyaml.version>
+ <snakeyaml.version>2.6</snakeyaml.version>
<typesafe-conf.version>1.4.2</typesafe-conf.version>
<json4s-jackson.version>4.0.6</json4s-jackson.version>
<commons-cli.version>1.5.0</commons-cli.version>
@@ -245,8 +245,8 @@
</dependency>
<dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
+ <groupId>org.snakeyaml</groupId>
+ <artifactId>snakeyaml-engine</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 3fc00a388..0dee93ec1 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -68,8 +68,8 @@
</dependency>
<dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
+ <groupId>org.snakeyaml</groupId>
+ <artifactId>snakeyaml-engine</artifactId>
</dependency>
<dependency>
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java
b/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java
new file mode 100644
index 000000000..3cb371d3d
--- /dev/null
+++
b/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.BYTES;
+import static
org.apache.streampark.common.util.MemorySize.MemoryUnit.GIGA_BYTES;
+import static
org.apache.streampark.common.util.MemorySize.MemoryUnit.KILO_BYTES;
+import static
org.apache.streampark.common.util.MemorySize.MemoryUnit.MEGA_BYTES;
+import static
org.apache.streampark.common.util.MemorySize.MemoryUnit.TERA_BYTES;
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.hasUnit;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different
units.
+ *
+ * <h2>Parsing</h2>
+ *
+ * <p>The size can be parsed from a text expression. If the expression is a
pure number, the value
+ * will be interpreted as bytes.
+ */
+public class MemorySize implements java.io.Serializable,
Comparable<MemorySize> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final MemorySize ZERO = new MemorySize(0L);
+
+ public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE);
+
+ private static final List<MemoryUnit> ORDERED_UNITS =
+ Arrays.asList(BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES);
+
+ // ------------------------------------------------------------------------
+
+ /** The memory size, in bytes. */
+ private final long bytes;
+
+ /** The memorized value returned by toString(). */
+ private transient String stringified;
+
+ /** The memorized value returned by toHumanReadableString(). */
+ private transient String humanReadableStr;
+
+ /**
+ * Constructs a new MemorySize.
+ *
+ * @param bytes The size, in bytes. Must be zero or larger.
+ */
+ public MemorySize(long bytes) {
+ if (bytes < 0) {
+ throw new IllegalArgumentException("bytes must be >= 0");
+ }
+ this.bytes = bytes;
+ }
+
+ public static MemorySize ofMebiBytes(long mebiBytes) {
+ return new MemorySize(mebiBytes << 20);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Gets the memory size in bytes. */
+ public long getBytes() {
+ return bytes;
+ }
+
+ /** Gets the memory size in Kibibytes (= 1024 bytes). */
+ public long getKibiBytes() {
+ return bytes >> 10;
+ }
+
+ /** Gets the memory size in Mebibytes (= 1024 Kibibytes). */
+ public int getMebiBytes() {
+ return (int) (bytes >> 20);
+ }
+
+ /** Gets the memory size in Gibibytes (= 1024 Mebibytes). */
+ public long getGibiBytes() {
+ return bytes >> 30;
+ }
+
+ /** Gets the memory size in Tebibytes (= 1024 Gibibytes). */
+ public long getTebiBytes() {
+ return bytes >> 40;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return (int) (bytes ^ (bytes >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this
+ || (obj != null
+ && obj.getClass() == this.getClass()
+ && ((MemorySize) obj).bytes == this.bytes);
+ }
+
+ @Override
+ public String toString() {
+ if (stringified == null) {
+ stringified = formatToString();
+ }
+
+ return stringified;
+ }
+
+ private String formatToString() {
+ MemoryUnit highestIntegerUnit =
+ IntStream.range(0, ORDERED_UNITS.size())
+ .sequential()
+ .filter(idx -> bytes % ORDERED_UNITS.get(idx).getMultiplier() != 0)
+ .boxed()
+ .findFirst()
+ .map(
+ idx -> {
+ if (idx == 0) {
+ return ORDERED_UNITS.get(0);
+ } else {
+ return ORDERED_UNITS.get(idx - 1);
+ }
+ })
+ .orElse(BYTES);
+
+ return String.format(
+ "%d %s", bytes / highestIntegerUnit.getMultiplier(),
highestIntegerUnit.getUnits()[1]);
+ }
+
+ public String toHumanReadableString() {
+ if (humanReadableStr == null) {
+ humanReadableStr = formatToHumanReadableString();
+ }
+
+ return humanReadableStr;
+ }
+
+ private String formatToHumanReadableString() {
+ MemoryUnit highestUnit =
+ IntStream.range(0, ORDERED_UNITS.size())
+ .sequential()
+ .filter(idx -> bytes > ORDERED_UNITS.get(idx).getMultiplier())
+ .boxed()
+ .max(Comparator.naturalOrder())
+ .map(ORDERED_UNITS::get)
+ .orElse(BYTES);
+
+ if (highestUnit == BYTES) {
+ return String.format("%d %s", bytes, BYTES.getUnits()[1]);
+ } else {
+ double approximate = 1.0 * bytes / highestUnit.getMultiplier();
+ return String.format(
+ Locale.ROOT, "%.3f%s (%d bytes)", approximate,
highestUnit.getUnits()[1], bytes);
+ }
+ }
+
+ @Override
+ public int compareTo(MemorySize that) {
+ return Long.compare(this.bytes, that.bytes);
+ }
+
+ // ------------------------------------------------------------------------
+ // Calculations
+ // ------------------------------------------------------------------------
+
+ public MemorySize add(MemorySize that) {
+ return new MemorySize(Math.addExact(this.bytes, that.bytes));
+ }
+
+ public MemorySize subtract(MemorySize that) {
+ return new MemorySize(Math.subtractExact(this.bytes, that.bytes));
+ }
+
+ public MemorySize multiply(double multiplier) {
+ if (multiplier < 0) {
+ throw new IllegalArgumentException("multiplier must be >= 0");
+ }
+
+ BigDecimal product =
BigDecimal.valueOf(this.bytes).multiply(BigDecimal.valueOf(multiplier));
+ if (product.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) {
+ throw new ArithmeticException("long overflow");
+ }
+ return new MemorySize(product.longValue());
+ }
+
+ public MemorySize divide(long by) {
+ if (by < 0) {
+ throw new IllegalArgumentException("divisor must be != 0");
+ }
+ return new MemorySize(bytes / by);
+ }
+
+ // ------------------------------------------------------------------------
+ // Parsing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Parses the given string as as MemorySize.
+ *
+ * @param text The string to parse
+ * @return The parsed MemorySize
+ * @throws IllegalArgumentException Thrown, if the expression cannot be
parsed.
+ */
+ public static MemorySize parse(String text) throws IllegalArgumentException {
+ return new MemorySize(parseBytes(text));
+ }
+
+ /**
+ * Parses the given string with a default unit.
+ *
+ * @param text The string to parse.
+ * @param defaultUnit specify the default unit.
+ * @return The parsed MemorySize.
+ * @throws IllegalArgumentException Thrown, if the expression cannot be
parsed.
+ */
+ public static MemorySize parse(String text, MemoryUnit defaultUnit)
+ throws IllegalArgumentException {
+ if (!hasUnit(text)) {
+ return parse(text + defaultUnit.getUnits()[0]);
+ }
+
+ return parse(text);
+ }
+
+ /**
+ * Parses the given string as bytes. The supported expressions are listed
under {@link
+ * MemorySize}.
+ *
+ * @param text The string to parse
+ * @return The parsed size, in bytes.
+ * @throws IllegalArgumentException Thrown, if the expression cannot be
parsed.
+ */
+ public static long parseBytes(String text) throws IllegalArgumentException {
+ Objects.requireNonNull(text, "text cannot be null");
+
+ final String trimmed = text.trim();
+ if (trimmed.isEmpty()) {
+ throw new IllegalArgumentException("argument is an empty- or
whitespace-only string");
+ }
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <=
'9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException
on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + number
+ + "' cannot be re represented as 64bit number (numeric
overflow).");
+ }
+
+ final long multiplier =
parseUnit(unit).map(MemoryUnit::getMultiplier).orElse(1L);
+ final long result = value * multiplier;
+
+ // check for overflow
+ if (result / multiplier != value) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + text
+ + "' cannot be re represented as 64bit number of bytes (numeric
overflow).");
+ }
+
+ return result;
+ }
+
+ private static Optional<MemoryUnit> parseUnit(String unit) {
+ if (matchesAny(unit, BYTES)) {
+ return Optional.of(BYTES);
+ } else if (matchesAny(unit, KILO_BYTES)) {
+ return Optional.of(KILO_BYTES);
+ } else if (matchesAny(unit, MEGA_BYTES)) {
+ return Optional.of(MEGA_BYTES);
+ } else if (matchesAny(unit, GIGA_BYTES)) {
+ return Optional.of(GIGA_BYTES);
+ } else if (matchesAny(unit, TERA_BYTES)) {
+ return Optional.of(TERA_BYTES);
+ } else if (!unit.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Memory size unit '"
+ + unit
+ + "' does not match any of the recognized units: "
+ + MemoryUnit.getAllUnits());
+ }
+
+ return Optional.empty();
+ }
+
+ private static boolean matchesAny(String str, MemoryUnit unit) {
+ for (String s : unit.getUnits()) {
+ if (s.equals(str)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Enum which defines memory unit, mostly used to parse value from
configuration file.
+ *
+ * <p>To make larger values more compact, the common size suffixes are
supported:
+ *
+ * <ul>
+ * <li>1b or 1bytes (bytes)
+ * <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+ * <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+ * <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+ * <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+ * </ul>
+ */
+ public enum MemoryUnit {
+ BYTES(new String[] {"b", "bytes"}, 1L),
+ KILO_BYTES(new String[] {"k", "kb", "kibibytes"}, 1024L),
+ MEGA_BYTES(new String[] {"m", "mb", "mebibytes"}, 1024L * 1024L),
+ GIGA_BYTES(new String[] {"g", "gb", "gibibytes"}, 1024L * 1024L * 1024L),
+ TERA_BYTES(new String[] {"t", "tb", "tebibytes"}, 1024L * 1024L * 1024L *
1024L);
+
+ private final String[] units;
+
+ private final long multiplier;
+
+ MemoryUnit(String[] units, long multiplier) {
+ this.units = units;
+ this.multiplier = multiplier;
+ }
+
+ public String[] getUnits() {
+ return units;
+ }
+
+ public long getMultiplier() {
+ return multiplier;
+ }
+
+ public static String getAllUnits() {
+ return concatenateUnits(
+ BYTES.getUnits(),
+ KILO_BYTES.getUnits(),
+ MEGA_BYTES.getUnits(),
+ GIGA_BYTES.getUnits(),
+ TERA_BYTES.getUnits());
+ }
+
+ public static boolean hasUnit(String text) {
+ Objects.requireNonNull(text, "text cannot be null");
+
+ final String trimmed = text.trim();
+ if (trimmed.isEmpty()) {
+ throw new IllegalArgumentException("argument is an empty- or
whitespace-only string");
+ }
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <=
'9') {
+ pos++;
+ }
+
+ final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ return unit.length() > 0;
+ }
+
+ private static String concatenateUnits(final String[]... allUnits) {
+ final StringBuilder builder = new StringBuilder(128);
+
+ for (String[] units : allUnits) {
+ builder.append('(');
+
+ for (String unit : units) {
+ builder.append(unit);
+ builder.append(" | ");
+ }
+
+ builder.setLength(builder.length() - 3);
+ builder.append(") / ");
+ }
+
+ builder.setLength(builder.length() - 3);
+ return builder.toString();
+ }
+ }
+}
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java
b/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java
new file mode 100644
index 000000000..c1efee3a1
--- /dev/null
+++
b/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Collection of utilities about time intervals. */
+public class TimeUtils {
+
+ private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
+ Collections.unmodifiableMap(initMap());
+
+ /**
+ * Parse the given string to a java {@link Duration}. The string is in
format "{length value}{time
+ * unit label}", e.g. "123ms", "321 s". If no time unit label is specified,
it will be considered
+ * as milliseconds.
+ *
+ * <p>Supported time unit labels are:
+ *
+ * <ul>
+ * <li>DAYS: "d", "day"
+ * <li>HOURS: "h", "hour"
+ * <li>MINUTES: "m", "min", "minute"
+ * <li>SECONDS: "s", "sec", "second"
+ * <li>MILLISECONDS: "ms", "milli", "millisecond"
+ * <li>MICROSECONDS: "µs", "micro", "microsecond"
+ * <li>NANOSECONDS: "ns", "nano", "nanosecond"
+ * </ul>
+ *
+ * @param text string to parse.
+ */
+ public static Duration parseDuration(String text) {
+ AssertUtils.notNull(text);
+
+ final String trimmed = text.trim();
+
+ if (trimmed.isEmpty()) {
+ throw new IllegalArgumentException("argument is an empty- or
whitespace-only string.");
+ }
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <=
'9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unitLabel =
trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException
on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + number
+ + "' cannot be re represented as 64bit number (numeric
overflow).");
+ }
+
+ if (unitLabel.isEmpty()) {
+ return Duration.of(value, ChronoUnit.MILLIS);
+ }
+
+ ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
+ if (unit != null) {
+ return Duration.of(value, unit);
+ } else {
+ throw new IllegalArgumentException(
+ "Time interval unit label '"
+ + unitLabel
+ + "' does not match any of the recognized units: "
+ + TimeUnit.getAllUnits());
+ }
+ }
+
+ private static Map<String, ChronoUnit> initMap() {
+ Map<String, ChronoUnit> labelToUnit = new HashMap<>();
+ for (TimeUnit timeUnit : TimeUnit.values()) {
+ for (String label : timeUnit.getLabels()) {
+ labelToUnit.put(label, timeUnit.getUnit());
+ }
+ }
+ return labelToUnit;
+ }
+
+ /**
+ * @param duration to convert to string
+ * @return duration string in millis
+ */
+ public static String getStringInMillis(final Duration duration) {
+ return duration.toMillis() + TimeUnit.MILLISECONDS.labels.get(0);
+ }
+
+ /**
+ * Pretty prints the duration as a lowest granularity unit that does not
lose precision.
+ *
+ * <p>Examples:
+ *
+ * <pre>{@code
+ * Duration.ofMilliseconds(60000) will be printed as 1 min
+ * Duration.ofHours(1).plusSeconds(1) will be printed as 3601 s
+ * }</pre>
+ *
+ * <b>NOTE:</b> It supports only durations that fit into long.
+ */
+ public static String formatWithHighestUnit(Duration duration) {
+ long nanos = duration.toNanos();
+
+ TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
+ return String.format(
+ "%d %s",
+ nanos / highestIntegerUnit.unit.getDuration().toNanos(),
+ highestIntegerUnit.getLabels().get(0));
+ }
+
+ private static TimeUnit getHighestIntegerUnit(long nanos) {
+ if (nanos == 0) {
+ return TimeUnit.MILLISECONDS;
+ }
+
+ final List<TimeUnit> orderedUnits =
+ Arrays.asList(
+ TimeUnit.NANOSECONDS,
+ TimeUnit.MICROSECONDS,
+ TimeUnit.MILLISECONDS,
+ TimeUnit.SECONDS,
+ TimeUnit.MINUTES,
+ TimeUnit.HOURS,
+ TimeUnit.DAYS);
+
+ TimeUnit highestIntegerUnit = null;
+ for (TimeUnit timeUnit : orderedUnits) {
+ if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
+ break;
+ }
+ highestIntegerUnit = timeUnit;
+ }
+
+ return AssertUtils.notNull(highestIntegerUnit, "Should find a
highestIntegerUnit.");
+ }
+
+ /** Enum which defines time unit, mostly used to parse value from
configuration file. */
+ private enum TimeUnit {
+ DAYS(ChronoUnit.DAYS, singular("d"), plural("day")),
+ HOURS(ChronoUnit.HOURS, singular("h"), plural("hour")),
+ MINUTES(ChronoUnit.MINUTES, singular("min"), singular("m"),
plural("minute")),
+ SECONDS(ChronoUnit.SECONDS, singular("s"), plural("sec"),
plural("second")),
+ MILLISECONDS(ChronoUnit.MILLIS, singular("ms"), plural("milli"),
plural("millisecond")),
+ MICROSECONDS(ChronoUnit.MICROS, singular("µs"), plural("micro"),
plural("microsecond")),
+ NANOSECONDS(ChronoUnit.NANOS, singular("ns"), plural("nano"),
plural("nanosecond"));
+
+ private static final String PLURAL_SUFFIX = "s";
+
+ private final List<String> labels;
+
+ private final ChronoUnit unit;
+
+ TimeUnit(ChronoUnit unit, String[]... labels) {
+ this.unit = unit;
+ this.labels =
+ Arrays.stream(labels).flatMap(ls ->
Arrays.stream(ls)).collect(Collectors.toList());
+ }
+
+ /**
+ * @param label the original label
+ * @return the singular format of the original label
+ */
+ private static String[] singular(String label) {
+ return new String[] {label};
+ }
+
+ /**
+ * @param label the original label
+ * @return both the singular format and plural format of the original label
+ */
+ private static String[] plural(String label) {
+ return new String[] {label, label + PLURAL_SUFFIX};
+ }
+
+ public List<String> getLabels() {
+ return labels;
+ }
+
+ public ChronoUnit getUnit() {
+ return unit;
+ }
+
+ public static String getAllUnits() {
+ return Arrays.stream(TimeUnit.values())
+ .map(TimeUnit::createTimeUnitString)
+ .collect(Collectors.joining(", "));
+ }
+
+ private static String createTimeUnitString(TimeUnit timeUnit) {
+ return timeUnit.name() + ": (" + String.join(" | ",
timeUnit.getLabels()) + ")";
+ }
+ }
+
+ private static ChronoUnit toChronoUnit(java.util.concurrent.TimeUnit
timeUnit) {
+ switch (timeUnit) {
+ case NANOSECONDS:
+ return ChronoUnit.NANOS;
+ case MICROSECONDS:
+ return ChronoUnit.MICROS;
+ case MILLISECONDS:
+ return ChronoUnit.MILLIS;
+ case SECONDS:
+ return ChronoUnit.SECONDS;
+ case MINUTES:
+ return ChronoUnit.MINUTES;
+ case HOURS:
+ return ChronoUnit.HOURS;
+ case DAYS:
+ return ChronoUnit.DAYS;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported time
unit %s.", timeUnit));
+ }
+ }
+}
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java
b/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java
new file mode 100644
index 000000000..850985f7f
--- /dev/null
+++
b/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.snakeyaml.engine.v2.api.Dump;
+import org.snakeyaml.engine.v2.api.DumpSettings;
+import org.snakeyaml.engine.v2.api.Load;
+import org.snakeyaml.engine.v2.api.LoadSettings;
+import org.snakeyaml.engine.v2.common.FlowStyle;
+import org.snakeyaml.engine.v2.exceptions.Mark;
+import org.snakeyaml.engine.v2.exceptions.MarkedYamlEngineException;
+import org.snakeyaml.engine.v2.exceptions.YamlEngineException;
+import org.snakeyaml.engine.v2.nodes.Node;
+import org.snakeyaml.engine.v2.nodes.ScalarNode;
+import org.snakeyaml.engine.v2.nodes.Tag;
+import org.snakeyaml.engine.v2.representer.StandardRepresenter;
+import org.snakeyaml.engine.v2.schema.CoreSchema;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains utility methods to load standard yaml file and convert
object to standard
+ * yaml syntax.
+ */
+public class YamlParserUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(YamlParserUtils.class);
+
+ private static final DumpSettings blockerDumperSettings =
+ DumpSettings.builder()
+ .setDefaultFlowStyle(FlowStyle.BLOCK)
+ // Disable split long lines to avoid add unexpected line breaks
+ .setSplitLines(false)
+ .setSchema(new CoreSchema())
+ .build();
+
+ private static final DumpSettings flowDumperSettings =
+ DumpSettings.builder()
+ .setDefaultFlowStyle(FlowStyle.FLOW)
+ // Disable split long lines to avoid add unexpected line breaks
+ .setSplitLines(false)
+ .setSchema(new CoreSchema())
+ .build();
+
+ private static final Dump blockerDumper =
+ new Dump(blockerDumperSettings, new
FlinkConfigRepresenter(blockerDumperSettings));
+
+ private static final Dump flowDumper =
+ new Dump(flowDumperSettings, new
FlinkConfigRepresenter(flowDumperSettings));
+
+ private static final Load loader =
+ new Load(LoadSettings.builder().setSchema(new CoreSchema()).build());
+
+ /**
+ * Loads the contents of the given YAML file into a map.
+ *
+ * @param file the YAML file to load.
+ * @return a non-null map representing the YAML content. If the file is
empty or only contains
+ * comments, an empty map is returned.
+ * @throws FileNotFoundException if the YAML file is not found.
+ * @throws YamlEngineException if the file cannot be parsed.
+ * @throws IOException if an I/O error occurs while reading from the file
stream.
+ */
+ public static synchronized @Nonnull Map<String, Object> loadYamlFile(File
file) throws Exception {
+ try (FileInputStream inputStream = new FileInputStream((file))) {
+ Map<String, Object> yamlResult =
+ (Map<String, Object>) loader.loadFromInputStream(inputStream);
+
+ return yamlResult == null ? new HashMap<>() : yamlResult;
+ } catch (FileNotFoundException e) {
+ LOG.error("Failed to find YAML file", e);
+ throw e;
+ } catch (IOException | YamlEngineException e) {
+ if (e instanceof MarkedYamlEngineException) {
+ YamlEngineException exception =
+ wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+ LOG.error("Failed to parse YAML configuration", exception);
+ throw exception;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ public static synchronized @Nonnull Map<String, Object>
loadYamlInput(InputStream inputStream)
+ throws Exception {
+ try {
+ Map<String, Object> yamlResult =
+ (Map<String, Object>) loader.loadFromInputStream(inputStream);
+ return yamlResult == null ? new HashMap<>() : yamlResult;
+ } catch (YamlEngineException e) {
+ if (e instanceof MarkedYamlEngineException) {
+ YamlEngineException exception =
+ wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+ LOG.error("Failed to parse YAML configuration", exception);
+ throw exception;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ public static synchronized @Nonnull Map<String, Object>
loadYamlString(String text) {
+ try {
+ Map<String, Object> yamlResult = (Map<String, Object>)
loader.loadFromString(text);
+
+ return yamlResult == null ? new HashMap<>() : yamlResult;
+ } catch (YamlEngineException e) {
+ if (e instanceof MarkedYamlEngineException) {
+ YamlEngineException exception =
+ wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+ LOG.error("Failed to parse YAML configuration", exception);
+ throw exception;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Converts the given value to a string representation in the YAML syntax.
This method uses a YAML
+ * parser to convert the object to YAML format.
+ *
+ * <p>The resulting YAML string may have line breaks at the end of each
line. This method removes
+ * the line break at the end of the string if it exists.
+ *
+ * <p>Note: This method may perform escaping on certain characters in the
value to ensure proper
+ * YAML syntax.
+ *
+ * @param value The value to be converted.
+ * @return The string representation of the value in YAML syntax.
+ */
+ public static synchronized String toYAMLString(Object value) {
+ try {
+ String output = flowDumper.dumpToString(value);
+ // remove the line break
+ String linebreak = flowDumperSettings.getBestLineBreak();
+ if (output.endsWith(linebreak)) {
+ output = output.substring(0, output.length() - linebreak.length());
+ }
+ return output;
+ } catch (MarkedYamlEngineException exception) {
+ throw wrapExceptionToHiddenSensitiveData(exception);
+ }
+ }
+
+ /**
+ * Converts a flat map into a nested map structure and outputs the result as
a list of
+ * YAML-formatted strings. Each item in the list represents a single line of
the YAML data. The
+ * method is synchronized and thus thread-safe.
+ *
+ * @param flattenMap A map containing flattened keys (e.g.,
"parent.child.key") associated with
+ * their values.
+ * @return A list of strings that represents the YAML data, where each item
corresponds to a line
+ * of the data.
+ */
+ @SuppressWarnings("unchecked")
+ public static synchronized List<String> convertAndDumpYamlFromFlatMap(
+ Map<String, Object> flattenMap) {
+ try {
+ Map<String, Object> nestedMap = new LinkedHashMap<>();
+ for (Map.Entry<String, Object> entry : flattenMap.entrySet()) {
+ String[] keys = entry.getKey().split("\\.");
+ Map<String, Object> currentMap = nestedMap;
+ for (int i = 0; i < keys.length - 1; i++) {
+ currentMap =
+ (Map<String, Object>) currentMap.computeIfAbsent(keys[i], k ->
new LinkedHashMap<>());
+ }
+ currentMap.put(keys[keys.length - 1], entry.getValue());
+ }
+ String data = blockerDumper.dumpToString(nestedMap);
+ String linebreak = blockerDumperSettings.getBestLineBreak();
+ return Arrays.asList(data.split(linebreak));
+ } catch (MarkedYamlEngineException exception) {
+ throw wrapExceptionToHiddenSensitiveData(exception);
+ }
+ }
+
+ public static synchronized <T> T convertToObject(String value, Class<T>
type) {
+ try {
+ return type.cast(loader.loadFromString(value));
+ } catch (MarkedYamlEngineException exception) {
+ throw wrapExceptionToHiddenSensitiveData(exception);
+ }
+ }
+
+ /**
+ * This method wraps a MarkedYAMLException to hide sensitive data in its
message. Before using
+ * this method, an exception message might include sensitive information
like:
+ *
+ * <pre>{@code
+ * while constructing a mapping
+ * in 'reader', line 1, column 1:
+ * key1: secret1
+ * ^
+ * found duplicate key key1
+ * in 'reader', line 2, column 1:
+ * key1: secret2
+ * ^
+ * }</pre>
+ *
+ * <p>After using this method, the message will be sanitized to hide the
sensitive details:
+ *
+ * <pre>{@code
+ * while constructing a mapping
+ * in 'reader', line 1, column 1
+ * found duplicate key key1
+ * in 'reader', line 2, column 1
+ * }</pre>
+ *
+ * @param exception The MarkedYamlEngineException containing potentially
sensitive data.
+ * @return A YamlEngineException with a message that has sensitive data
hidden.
+ */
+ private static YamlEngineException wrapExceptionToHiddenSensitiveData(
+ MarkedYamlEngineException exception) {
+ StringBuilder lines = new StringBuilder();
+ String context = exception.getContext();
+ Optional<Mark> contextMark = exception.getContextMark();
+ Optional<Mark> problemMark = exception.getProblemMark();
+ String problem = exception.getProblem();
+
+ if (context != null) {
+ lines.append(context);
+ lines.append("\n");
+ }
+
+ if (contextMark.isPresent()
+ && (problem == null
+ || !problemMark.isPresent()
+ || contextMark.get().getName().equals(problemMark.get().getName())
+ || contextMark.get().getLine() != problemMark.get().getLine()
+ || contextMark.get().getColumn() !=
problemMark.get().getColumn())) {
+ lines.append(hiddenSensitiveDataInMark(contextMark.get()));
+ lines.append("\n");
+ }
+
+ if (problem != null) {
+ lines.append(problem);
+ lines.append("\n");
+ }
+
+ if (problemMark.isPresent()) {
+ lines.append(hiddenSensitiveDataInMark(problemMark.get()));
+ lines.append("\n");
+ }
+
+ Throwable cause = exception.getCause();
+ if (cause instanceof MarkedYamlEngineException) {
+ cause = wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException)
cause);
+ }
+
+ YamlEngineException yamlException = new
YamlEngineException(lines.toString(), cause);
+ yamlException.setStackTrace(exception.getStackTrace());
+ return yamlException;
+ }
+
+ /**
+ * This method is a mock implementation of the Mark#toString() method,
specifically designed to
+ * exclude the Mark#get_snippet(), to prevent leaking any sensitive data.
+ */
+ private static String hiddenSensitiveDataInMark(Mark mark) {
+ return " in "
+ + mark.getName()
+ + ", line "
+ + (mark.getLine() + 1)
+ + ", column "
+ + (mark.getColumn() + 1);
+ }
+
+ private static class FlinkConfigRepresenter extends StandardRepresenter {
+ public FlinkConfigRepresenter(DumpSettings dumpSettings) {
+ super(dumpSettings);
+ representers.put(Duration.class, this::representDuration);
+ representers.put(MemorySize.class, this::representMemorySize);
+ parentClassRepresenters.put(Enum.class, this::representEnum);
+ }
+
+ private Node representDuration(Object data) {
+ Duration duration = (Duration) data;
+ String durationString = TimeUtils.formatWithHighestUnit(duration);
+ return new ScalarNode(Tag.STR, durationString,
settings.getDefaultScalarStyle());
+ }
+
+ private Node representMemorySize(Object data) {
+ MemorySize memorySize = (MemorySize) data;
+ return new ScalarNode(Tag.STR, memorySize.toString(),
settings.getDefaultScalarStyle());
+ }
+
+ private Node representEnum(Object data) {
+ return new ScalarNode(Tag.STR, data.toString(),
settings.getDefaultScalarStyle());
+ }
+ }
+}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f839fa207..f66b113f3 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -18,7 +18,6 @@ package org.apache.streampark.common.util
import com.typesafe.config.ConfigFactory
import org.apache.commons.lang3.StringUtils
-import org.yaml.snakeyaml.Yaml
import javax.annotation.Nonnull
@@ -82,16 +81,8 @@ object PropertiesUtils extends Logger {
}
def fromYamlText(text: String): Map[String, String] = {
- try {
- new Yaml()
- .load(text)
- .asInstanceOf[java.util.Map[String, Map[String, Any]]]
- .flatMap(x => eachYamlItem(x._1, x._2))
- .toMap
- } catch {
- case e: IOException =>
- throw new IllegalArgumentException(s"Failed when loading conf error:",
e)
- }
+ val map = YamlParserUtils.loadYamlString(text)
+ map.flatMap(x => eachYamlItem(x._1, x._2)).toMap
}
def fromHoconText(conf: String): Map[String, String] = {
@@ -146,11 +137,8 @@ object PropertiesUtils extends Logger {
inputStream != null,
s"[StreamPark] fromYamlFile: Properties inputStream must not be null")
try {
- new Yaml()
- .load(inputStream)
- .asInstanceOf[java.util.Map[String, Map[String, Any]]]
- .flatMap(x => eachYamlItem(x._1, x._2))
- .toMap
+ val map = YamlParserUtils.loadYamlInput(inputStream)
+ map.flatMap(x => eachYamlItem(x._1, x._2)).toMap
} catch {
case e: IOException =>
throw new IllegalArgumentException(s"Failed when loading yaml from
inputStream", e)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 4cdcf07c1..86bc2bf8d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -26,7 +26,6 @@ import org.apache.streampark.flink.util.FlinkUtils
import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.io.FileUtils
-import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions,
SavepointRestoreSettings}
import javax.annotation.Nullable
@@ -35,7 +34,7 @@ import java.io.File
import java.util.{Map => JavaMap}
import scala.collection.JavaConversions._
-import scala.util.{Success, Try}
+import scala.util.Try
case class SubmitRequest(
flinkVersion: FlinkVersion,
@@ -99,32 +98,6 @@ case class SubmitRequest(
}
}
- lazy val flinkDefaultConfiguration: Configuration = {
-
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf"))
match {
- case Success(value) =>
- executionMode match {
- case ExecutionMode.YARN_SESSION |
ExecutionMode.KUBERNETES_NATIVE_SESSION |
- ExecutionMode.REMOTE =>
- value
- case _ =>
- value
- .keySet()
- .foreach(
- k => {
- val v = value.getString(k, null)
- if (v != null) {
- val result = v
- .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)",
effectiveAppName)
- .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)",
id.toString)
- value.setString(k, result)
- }
- })
- value
- }
- case _ => new Configuration()
- }
- }
-
def hasProp(key: String): Boolean = properties.containsKey(key)
def getProp(key: String): Any = properties.get(key)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
index 2e642312d..0a40d9a7e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
@@ -87,6 +87,11 @@
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.snakeyaml</groupId>
+ <artifactId>snakeyaml-engine</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java
new file mode 100644
index 000000000..715972d50
--- /dev/null
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.streampark.common.util.AssertUtils;
+import org.apache.streampark.common.util.YamlParserUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Global configuration object for Flink. Similar to Java properties
configuration objects it
+ * includes key-value pairs which represent the framework's configuration.
+ */
+public final class FlinkGlobalConfiguration {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkGlobalConfiguration.class);
+
+ public static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
+
+ public static final String FLINK_CONF_FILENAME = "config.yaml";
+
+ // key separator character
+ private static final String KEY_SEPARATOR = ".";
+
+ // the keys whose values should be hidden
+ private static final String[] SENSITIVE_KEYS =
+ new String[] {
+ "password",
+ "secret",
+ "fs.azure.account.key",
+ "apikey",
+ "auth-params",
+ "service-key",
+ "token",
+ "basic-auth",
+ "jaas.config",
+ "http-headers"
+ };
+
+ // the hidden content to be displayed
+ public static final String HIDDEN_CONTENT = "******";
+
+ private static boolean standardYaml = true;
+
+ //
--------------------------------------------------------------------------------------------
+
+ private FlinkGlobalConfiguration() {}
+
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Loads the global configuration from the environment. Fails if an error
occurs during loading.
+ * Returns an empty configuration object if the environment variable is not
set. In production
+ * this variable is set but tests and local execution/debugging don't have
this environment
+ * variable set. That's why we should fail if it is not set.
+ *
+ * @return Returns the Configuration
+ */
+ public static Configuration loadConfiguration() {
+ return loadConfiguration(new Configuration());
+ }
+
+ /**
+ * Loads the global configuration and adds the given dynamic properties
configuration.
+ *
+ * @param dynamicProperties The given dynamic properties
+ * @return Returns the loaded global configuration with dynamic properties
+ */
+ public static Configuration loadConfiguration(Configuration
dynamicProperties) {
+ final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+ if (configDir == null) {
+ return new Configuration(dynamicProperties);
+ }
+
+ return loadConfiguration(configDir, dynamicProperties);
+ }
+
+ /**
+ * Loads the configuration files from the specified directory.
+ *
+ * <p>YAML files are supported as configuration files.
+ *
+ * @param configDir the directory which contains the configuration files
+ */
+ public static Configuration loadConfiguration(final String configDir) {
+ return loadConfiguration(configDir, null);
+ }
+
+ /**
+ * Loads the configuration files from the specified directory. If the
dynamic properties
+ * configuration is not null, then it is added to the loaded configuration.
+ *
+ * @param configDir directory to load the configuration from
+ * @param dynamicProperties configuration file containing the dynamic
properties. Null if none.
+ * @return The configuration loaded from the given configuration directory
+ */
+ public static Configuration loadConfiguration(
+ final String configDir, @Nullable final Configuration dynamicProperties)
{
+
+ if (configDir == null) {
+ throw new IllegalArgumentException(
+ "Given configuration directory is null, cannot load configuration");
+ }
+
+ final File confDirFile = new File(configDir);
+ if (!(confDirFile.exists())) {
+ throw new IllegalConfigurationException(
+ "The given configuration directory name '"
+ + configDir
+ + "' ("
+ + confDirFile.getAbsolutePath()
+ + ") does not describe an existing directory.");
+ }
+
+ // get Flink yaml configuration file
+ File yamlConfigFile = new File(confDirFile, LEGACY_FLINK_CONF_FILENAME);
+ Configuration configuration;
+
+ if (!yamlConfigFile.exists()) {
+ yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
+ if (!yamlConfigFile.exists()) {
+ throw new IllegalConfigurationException(
+ "The Flink config file '"
+ + yamlConfigFile
+ + "' ("
+ + yamlConfigFile.getAbsolutePath()
+ + ") does not exist.");
+ } else {
+ standardYaml = true;
+ LOG.info(
+ "Using standard YAML parser to load flink configuration file from
{}.",
+ yamlConfigFile.getAbsolutePath());
+ configuration = loadYAMLResource(yamlConfigFile);
+ }
+ } else {
+ standardYaml = false;
+ LOG.info(
+ "Using legacy YAML parser to load flink configuration file from {}.",
+ yamlConfigFile.getAbsolutePath());
+ configuration = loadLegacyYAMLResource(yamlConfigFile);
+ }
+
+ logConfiguration("Loading", configuration);
+
+ if (dynamicProperties != null) {
+ logConfiguration("Loading dynamic", dynamicProperties);
+ configuration.addAll(dynamicProperties);
+ }
+
+ return configuration;
+ }
+
+ private static void logConfiguration(String prefix, Configuration config) {
+ config.confData.forEach(
+ (key, value) ->
+ LOG.info(
+ "{} configuration property: {}, {}",
+ prefix,
+ key,
+ isSensitive(key) ? HIDDEN_CONTENT : value));
+ }
+
+ private static Configuration loadLegacyYAMLResource(File file) {
+ final Configuration config = new Configuration();
+
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(new FileInputStream(file)))) {
+
+ String line;
+ int lineNo = 0;
+ while ((line = reader.readLine()) != null) {
+ lineNo++;
+ // 1. check for comments
+ String[] comments = line.split("#", 2);
+ String conf = comments[0].trim();
+
+ // 2. get key and value
+ if (conf.length() > 0) {
+ String[] kv = conf.split(": ", 2);
+
+ // skip line with no valid key-value pair
+ if (kv.length == 1) {
+ LOG.warn(
+ "Error while trying to split key and value in configuration
file "
+ + file
+ + ":"
+ + lineNo
+ + ": Line is not a key-value pair (missing space after
':'?)");
+ continue;
+ }
+
+ String key = kv[0].trim();
+ String value = kv[1].trim();
+
+ // sanity check
+ if (key.length() == 0 || value.length() == 0) {
+ LOG.warn(
+ "Error after splitting key and value in configuration file "
+ + file
+ + ":"
+ + lineNo
+ + ": Key or value was empty");
+ continue;
+ }
+
+ config.setString(key, value);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error parsing YAML configuration.", e);
+ }
+
+ return config;
+ }
+
+ private static Map<String, Object> flatten(Map<String, Object> config,
String keyPrefix) {
+ final Map<String, Object> flattenedMap = new HashMap<>();
+
+ config.forEach(
+ (key, value) -> {
+ String flattenedKey = keyPrefix + key;
+ if (value instanceof Map) {
+ Map<String, Object> e = (Map<String, Object>) value;
+ flattenedMap.putAll(flatten(e, flattenedKey + KEY_SEPARATOR));
+ } else {
+ if (value instanceof List) {
+ flattenedMap.put(flattenedKey,
YamlParserUtils.toYAMLString(value));
+ } else {
+ flattenedMap.put(flattenedKey, value);
+ }
+ }
+ });
+
+ return flattenedMap;
+ }
+
+ private static Map<String, Object> flatten(Map<String, Object> config) {
+ // Since we start flattening from the root, keys should not be prefixed
with anything.
+ return flatten(config, "");
+ }
+
+ private static Configuration loadYAMLResource(File file) {
+ final Configuration config = new Configuration();
+
+ try {
+ Map<String, Object> configDocument =
flatten(YamlParserUtils.loadYamlFile(file));
+ configDocument.forEach((k, v) -> config.setValueInternal(k, v, false));
+
+ return config;
+ } catch (Exception e) {
+ throw new RuntimeException("Error parsing YAML configuration.", e);
+ }
+ }
+
+ /**
+ * Check whether the key is a hidden key.
+ *
+ * @param key the config key
+ */
+ public static boolean isSensitive(String key) {
+ AssertUtils.notNull(key, "key");
+ final String keyInLower = key.toLowerCase();
+ for (String hideKey : SENSITIVE_KEYS) {
+ if (keyInLower.length() >= hideKey.length() &&
keyInLower.contains(hideKey)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static String getFlinkConfFilename() {
+ if (isStandardYaml()) {
+ return FLINK_CONF_FILENAME;
+ } else {
+ return LEGACY_FLINK_CONF_FILENAME;
+ }
+ }
+
+ public static boolean isStandardYaml() {
+ return standardYaml;
+ }
+
+ public static void setStandardYaml(boolean standardYaml) {
+ FlinkGlobalConfiguration.standardYaml = standardYaml;
+ }
+}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 0437dbaab..b652aa67c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode}
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode}
import org.apache.streampark.common.util.{DeflaterUtils, Logger,
PropertiesUtils, Utils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -284,7 +284,8 @@ trait FlinkClientTrait extends Logger {
val configurationDirectory = s"$flinkHome/conf"
// 2. load the custom command lines
val flinkConfig =
-
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new
Configuration())
+ Try(FlinkGlobalConfiguration.loadConfiguration(s"$flinkHome/conf"))
+ .getOrElse(new Configuration())
loadCustomCommandLines(flinkConfig, configurationDirectory)
}
@@ -413,7 +414,8 @@ trait FlinkClientTrait extends Logger {
validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome),
commandLine)
val flinkDefaultConfiguration =
-
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new
Configuration())
+ Try(FlinkGlobalConfiguration.loadConfiguration(s"$flinkHome/conf"))
+ .getOrElse(new Configuration())
val configuration = new Configuration(flinkDefaultConfiguration)
configuration.addAll(activeCommandLine.toConfiguration(commandLine))
@@ -451,6 +453,39 @@ trait FlinkClientTrait extends Logger {
}
}
+ implicit private[client] class EnhanceConfiguration(request: SubmitRequest) {
+ lazy val flinkDefaultConfiguration: Configuration = {
+ Try(
+ FlinkGlobalConfiguration.loadConfiguration(
+ s"${request.flinkVersion.flinkHome}/conf")) match {
+ case Success(value) =>
+ request.executionMode match {
+ case ExecutionMode.YARN_SESSION |
ExecutionMode.KUBERNETES_NATIVE_SESSION |
+ ExecutionMode.REMOTE =>
+ value
+ case _ =>
+ value
+ .keySet()
+ .foreach(
+ k => {
+ val v = value.getString(k, null)
+ if (v != null) {
+ val result = v
+ .replaceAll(
+ "\\$\\{job(Name|name)}|\\$job(Name|name)",
+ request.effectiveAppName)
+ .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)",
request.id.toString)
+ value.setString(k, result)
+ }
+ })
+ value
+ }
+ case _ => new Configuration()
+ }
+ }
+
+ }
+
private[client] def cancelJob(
cancelRequest: CancelRequest,
jobID: JobID,
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
index 6451d5908..f51defcaf 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
@@ -65,7 +65,7 @@ object YarnPerJobTestCase extends Logger {
lazy val flinkDefaultConfiguration: Configuration = {
require(FLINK_HOME != null)
// get flink config
- GlobalConfiguration.loadConfiguration(s"$FLINK_HOME/conf")
+ FlinkGlobalConfiguration.loadConfiguration(s"$FLINK_HOME/conf")
}
lazy val customCommandLines: util.List[CustomCommandLine] = {