This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch release-2.1.6
in repository https://gitbox.apache.org/repos/asf/streampark.git


The following commit(s) were added to refs/heads/release-2.1.6 by this push:
     new 7548fdd09 [BUG] get flink-config bug fixed. (#4288)
7548fdd09 is described below

commit 7548fdd09f1a645817cc0e00ebd4adefe381cb97
Author: benjobs <benj...@apache.org>
AuthorDate: Mon Sep 15 00:58:49 2025 +0800

    [BUG] get flink-config bug fixed. (#4288)
---
 pom.xml                                            |   6 +-
 streampark-common/pom.xml                          |   4 +-
 .../apache/streampark/common/util/MemorySize.java  | 421 +++++++++++++++++++++
 .../apache/streampark/common/util/TimeUtils.java   | 248 ++++++++++++
 .../streampark/common/util/YamlParserUtils.java    | 323 ++++++++++++++++
 .../streampark/common/util/PropertiesUtils.scala   |  20 +-
 .../flink/client/bean/SubmitRequest.scala          |  29 +-
 .../streampark-flink-client-core/pom.xml           |   5 +
 .../configuration/FlinkGlobalConfiguration.java    | 313 +++++++++++++++
 .../flink/client/trait/FlinkClientTrait.scala      |  41 +-
 .../flink/client/test/YarnPerJobTestCase.scala     |   2 +-
 11 files changed, 1359 insertions(+), 53 deletions(-)

diff --git a/pom.xml b/pom.xml
index ca3fbaa2b..29e5496bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
         <caffeine.version>2.8.6</caffeine.version>
         <mysql.version>8.0.27</mysql.version>
         <hikariCP.version>3.4.5</hikariCP.version>
-        <snakeyaml.version>2.0</snakeyaml.version>
+        <snakeyaml.version>2.6</snakeyaml.version>
         <typesafe-conf.version>1.4.2</typesafe-conf.version>
         <json4s-jackson.version>4.0.6</json4s-jackson.version>
         <commons-cli.version>1.5.0</commons-cli.version>
@@ -245,8 +245,8 @@
             </dependency>
 
             <dependency>
-                <groupId>org.yaml</groupId>
-                <artifactId>snakeyaml</artifactId>
+                <groupId>org.snakeyaml</groupId>
+                <artifactId>snakeyaml-engine</artifactId>
                 <version>${snakeyaml.version}</version>
             </dependency>
 
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 3fc00a388..0dee93ec1 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -68,8 +68,8 @@
         </dependency>
 
         <dependency>
-            <groupId>org.yaml</groupId>
-            <artifactId>snakeyaml</artifactId>
+            <groupId>org.snakeyaml</groupId>
+            <artifactId>snakeyaml-engine</artifactId>
         </dependency>
 
         <dependency>
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java
new file mode 100644
index 000000000..3cb371d3d
--- /dev/null
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.BYTES;
+import static 
org.apache.streampark.common.util.MemorySize.MemoryUnit.GIGA_BYTES;
+import static 
org.apache.streampark.common.util.MemorySize.MemoryUnit.KILO_BYTES;
+import static 
org.apache.streampark.common.util.MemorySize.MemoryUnit.MEGA_BYTES;
+import static 
org.apache.streampark.common.util.MemorySize.MemoryUnit.TERA_BYTES;
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.hasUnit;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different 
units.
+ *
+ * <h2>Parsing</h2>
+ *
+ * <p>The size can be parsed from a text expression. If the expression is a 
pure number, the value
+ * will be interpreted as bytes.
+ */
+public class MemorySize implements java.io.Serializable, 
Comparable<MemorySize> {
+
+  private static final long serialVersionUID = 1L;
+
+  public static final MemorySize ZERO = new MemorySize(0L);
+
+  public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE);
+
+  private static final List<MemoryUnit> ORDERED_UNITS =
+      Arrays.asList(BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES);
+
+  // ------------------------------------------------------------------------
+
+  /** The memory size, in bytes. */
+  private final long bytes;
+
+  /** The memorized value returned by toString(). */
+  private transient String stringified;
+
+  /** The memorized value returned by toHumanReadableString(). */
+  private transient String humanReadableStr;
+
+  /**
+   * Constructs a new MemorySize.
+   *
+   * @param bytes The size, in bytes. Must be zero or larger.
+   */
+  public MemorySize(long bytes) {
+    if (bytes < 0) {
+      throw new IllegalArgumentException("bytes must be >= 0");
+    }
+    this.bytes = bytes;
+  }
+
+  public static MemorySize ofMebiBytes(long mebiBytes) {
+    return new MemorySize(mebiBytes << 20);
+  }
+
+  // ------------------------------------------------------------------------
+
+  /** Gets the memory size in bytes. */
+  public long getBytes() {
+    return bytes;
+  }
+
+  /** Gets the memory size in Kibibytes (= 1024 bytes). */
+  public long getKibiBytes() {
+    return bytes >> 10;
+  }
+
+  /** Gets the memory size in Mebibytes (= 1024 Kibibytes). */
+  public int getMebiBytes() {
+    return (int) (bytes >> 20);
+  }
+
+  /** Gets the memory size in Gibibytes (= 1024 Mebibytes). */
+  public long getGibiBytes() {
+    return bytes >> 30;
+  }
+
+  /** Gets the memory size in Tebibytes (= 1024 Gibibytes). */
+  public long getTebiBytes() {
+    return bytes >> 40;
+  }
+
+  // ------------------------------------------------------------------------
+
+  @Override
+  public int hashCode() {
+    return (int) (bytes ^ (bytes >>> 32));
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj == this
+        || (obj != null
+            && obj.getClass() == this.getClass()
+            && ((MemorySize) obj).bytes == this.bytes);
+  }
+
+  @Override
+  public String toString() {
+    if (stringified == null) {
+      stringified = formatToString();
+    }
+
+    return stringified;
+  }
+
+  private String formatToString() {
+    MemoryUnit highestIntegerUnit =
+        IntStream.range(0, ORDERED_UNITS.size())
+            .sequential()
+            .filter(idx -> bytes % ORDERED_UNITS.get(idx).getMultiplier() != 0)
+            .boxed()
+            .findFirst()
+            .map(
+                idx -> {
+                  if (idx == 0) {
+                    return ORDERED_UNITS.get(0);
+                  } else {
+                    return ORDERED_UNITS.get(idx - 1);
+                  }
+                })
+            .orElse(BYTES);
+
+    return String.format(
+        "%d %s", bytes / highestIntegerUnit.getMultiplier(), 
highestIntegerUnit.getUnits()[1]);
+  }
+
+  public String toHumanReadableString() {
+    if (humanReadableStr == null) {
+      humanReadableStr = formatToHumanReadableString();
+    }
+
+    return humanReadableStr;
+  }
+
+  private String formatToHumanReadableString() {
+    MemoryUnit highestUnit =
+        IntStream.range(0, ORDERED_UNITS.size())
+            .sequential()
+            .filter(idx -> bytes > ORDERED_UNITS.get(idx).getMultiplier())
+            .boxed()
+            .max(Comparator.naturalOrder())
+            .map(ORDERED_UNITS::get)
+            .orElse(BYTES);
+
+    if (highestUnit == BYTES) {
+      return String.format("%d %s", bytes, BYTES.getUnits()[1]);
+    } else {
+      double approximate = 1.0 * bytes / highestUnit.getMultiplier();
+      return String.format(
+          Locale.ROOT, "%.3f%s (%d bytes)", approximate, 
highestUnit.getUnits()[1], bytes);
+    }
+  }
+
+  @Override
+  public int compareTo(MemorySize that) {
+    return Long.compare(this.bytes, that.bytes);
+  }
+
+  // ------------------------------------------------------------------------
+  //  Calculations
+  // ------------------------------------------------------------------------
+
+  public MemorySize add(MemorySize that) {
+    return new MemorySize(Math.addExact(this.bytes, that.bytes));
+  }
+
+  public MemorySize subtract(MemorySize that) {
+    return new MemorySize(Math.subtractExact(this.bytes, that.bytes));
+  }
+
+  public MemorySize multiply(double multiplier) {
+    if (multiplier < 0) {
+      throw new IllegalArgumentException("multiplier must be >= 0");
+    }
+
+    BigDecimal product = 
BigDecimal.valueOf(this.bytes).multiply(BigDecimal.valueOf(multiplier));
+    if (product.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) {
+      throw new ArithmeticException("long overflow");
+    }
+    return new MemorySize(product.longValue());
+  }
+
+  public MemorySize divide(long by) {
+    if (by < 0) {
+      throw new IllegalArgumentException("divisor must be != 0");
+    }
+    return new MemorySize(bytes / by);
+  }
+
+  // ------------------------------------------------------------------------
+  //  Parsing
+  // ------------------------------------------------------------------------
+
+  /**
+   * Parses the given string as as MemorySize.
+   *
+   * @param text The string to parse
+   * @return The parsed MemorySize
+   * @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
+   */
+  public static MemorySize parse(String text) throws IllegalArgumentException {
+    return new MemorySize(parseBytes(text));
+  }
+
+  /**
+   * Parses the given string with a default unit.
+   *
+   * @param text The string to parse.
+   * @param defaultUnit specify the default unit.
+   * @return The parsed MemorySize.
+   * @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
+   */
+  public static MemorySize parse(String text, MemoryUnit defaultUnit)
+      throws IllegalArgumentException {
+    if (!hasUnit(text)) {
+      return parse(text + defaultUnit.getUnits()[0]);
+    }
+
+    return parse(text);
+  }
+
+  /**
+   * Parses the given string as bytes. The supported expressions are listed 
under {@link
+   * MemorySize}.
+   *
+   * @param text The string to parse
+   * @return The parsed size, in bytes.
+   * @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
+   */
+  public static long parseBytes(String text) throws IllegalArgumentException {
+    Objects.requireNonNull(text, "text cannot be null");
+
+    final String trimmed = text.trim();
+    if (trimmed.isEmpty()) {
+      throw new IllegalArgumentException("argument is an empty- or 
whitespace-only string");
+    }
+
+    final int len = trimmed.length();
+    int pos = 0;
+
+    char current;
+    while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= 
'9') {
+      pos++;
+    }
+
+    final String number = trimmed.substring(0, pos);
+    final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+    if (number.isEmpty()) {
+      throw new NumberFormatException("text does not start with a number");
+    }
+
+    final long value;
+    try {
+      value = Long.parseLong(number); // this throws a NumberFormatException 
on overflow
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException(
+          "The value '"
+              + number
+              + "' cannot be re represented as 64bit number (numeric 
overflow).");
+    }
+
+    final long multiplier = 
parseUnit(unit).map(MemoryUnit::getMultiplier).orElse(1L);
+    final long result = value * multiplier;
+
+    // check for overflow
+    if (result / multiplier != value) {
+      throw new IllegalArgumentException(
+          "The value '"
+              + text
+              + "' cannot be re represented as 64bit number of bytes (numeric 
overflow).");
+    }
+
+    return result;
+  }
+
+  private static Optional<MemoryUnit> parseUnit(String unit) {
+    if (matchesAny(unit, BYTES)) {
+      return Optional.of(BYTES);
+    } else if (matchesAny(unit, KILO_BYTES)) {
+      return Optional.of(KILO_BYTES);
+    } else if (matchesAny(unit, MEGA_BYTES)) {
+      return Optional.of(MEGA_BYTES);
+    } else if (matchesAny(unit, GIGA_BYTES)) {
+      return Optional.of(GIGA_BYTES);
+    } else if (matchesAny(unit, TERA_BYTES)) {
+      return Optional.of(TERA_BYTES);
+    } else if (!unit.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Memory size unit '"
+              + unit
+              + "' does not match any of the recognized units: "
+              + MemoryUnit.getAllUnits());
+    }
+
+    return Optional.empty();
+  }
+
+  private static boolean matchesAny(String str, MemoryUnit unit) {
+    for (String s : unit.getUnits()) {
+      if (s.equals(str)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Enum which defines memory unit, mostly used to parse value from 
configuration file.
+   *
+   * <p>To make larger values more compact, the common size suffixes are 
supported:
+   *
+   * <ul>
+   *   <li>1b or 1bytes (bytes)
+   *   <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+   *   <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+   *   <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+   *   <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+   * </ul>
+   */
+  public enum MemoryUnit {
+    BYTES(new String[] {"b", "bytes"}, 1L),
+    KILO_BYTES(new String[] {"k", "kb", "kibibytes"}, 1024L),
+    MEGA_BYTES(new String[] {"m", "mb", "mebibytes"}, 1024L * 1024L),
+    GIGA_BYTES(new String[] {"g", "gb", "gibibytes"}, 1024L * 1024L * 1024L),
+    TERA_BYTES(new String[] {"t", "tb", "tebibytes"}, 1024L * 1024L * 1024L * 
1024L);
+
+    private final String[] units;
+
+    private final long multiplier;
+
+    MemoryUnit(String[] units, long multiplier) {
+      this.units = units;
+      this.multiplier = multiplier;
+    }
+
+    public String[] getUnits() {
+      return units;
+    }
+
+    public long getMultiplier() {
+      return multiplier;
+    }
+
+    public static String getAllUnits() {
+      return concatenateUnits(
+          BYTES.getUnits(),
+          KILO_BYTES.getUnits(),
+          MEGA_BYTES.getUnits(),
+          GIGA_BYTES.getUnits(),
+          TERA_BYTES.getUnits());
+    }
+
+    public static boolean hasUnit(String text) {
+      Objects.requireNonNull(text, "text cannot be null");
+
+      final String trimmed = text.trim();
+      if (trimmed.isEmpty()) {
+        throw new IllegalArgumentException("argument is an empty- or 
whitespace-only string");
+      }
+
+      final int len = trimmed.length();
+      int pos = 0;
+
+      char current;
+      while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= 
'9') {
+        pos++;
+      }
+
+      final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+      return unit.length() > 0;
+    }
+
+    private static String concatenateUnits(final String[]... allUnits) {
+      final StringBuilder builder = new StringBuilder(128);
+
+      for (String[] units : allUnits) {
+        builder.append('(');
+
+        for (String unit : units) {
+          builder.append(unit);
+          builder.append(" | ");
+        }
+
+        builder.setLength(builder.length() - 3);
+        builder.append(") / ");
+      }
+
+      builder.setLength(builder.length() - 3);
+      return builder.toString();
+    }
+  }
+}
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java
new file mode 100644
index 000000000..c1efee3a1
--- /dev/null
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Collection of utilities about time intervals. */
+public class TimeUtils {
+
+  private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
+      Collections.unmodifiableMap(initMap());
+
+  /**
+   * Parse the given string to a java {@link Duration}. The string is in 
format "{length value}{time
+   * unit label}", e.g. "123ms", "321 s". If no time unit label is specified, 
it will be considered
+   * as milliseconds.
+   *
+   * <p>Supported time unit labels are:
+   *
+   * <ul>
+   *   <li>DAYS: "d", "day"
+   *   <li>HOURS: "h", "hour"
+   *   <li>MINUTES: "m", "min", "minute"
+   *   <li>SECONDS: "s", "sec", "second"
+   *   <li>MILLISECONDS: "ms", "milli", "millisecond"
+   *   <li>MICROSECONDS: "µs", "micro", "microsecond"
+   *   <li>NANOSECONDS: "ns", "nano", "nanosecond"
+   * </ul>
+   *
+   * @param text string to parse.
+   */
+  public static Duration parseDuration(String text) {
+    AssertUtils.notNull(text);
+
+    final String trimmed = text.trim();
+
+    if (trimmed.isEmpty()) {
+      throw new IllegalArgumentException("argument is an empty- or 
whitespace-only string.");
+    }
+
+    final int len = trimmed.length();
+    int pos = 0;
+
+    char current;
+    while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= 
'9') {
+      pos++;
+    }
+
+    final String number = trimmed.substring(0, pos);
+    final String unitLabel = 
trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+    if (number.isEmpty()) {
+      throw new NumberFormatException("text does not start with a number");
+    }
+
+    final long value;
+    try {
+      value = Long.parseLong(number); // this throws a NumberFormatException 
on overflow
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException(
+          "The value '"
+              + number
+              + "' cannot be re represented as 64bit number (numeric 
overflow).");
+    }
+
+    if (unitLabel.isEmpty()) {
+      return Duration.of(value, ChronoUnit.MILLIS);
+    }
+
+    ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
+    if (unit != null) {
+      return Duration.of(value, unit);
+    } else {
+      throw new IllegalArgumentException(
+          "Time interval unit label '"
+              + unitLabel
+              + "' does not match any of the recognized units: "
+              + TimeUnit.getAllUnits());
+    }
+  }
+
+  private static Map<String, ChronoUnit> initMap() {
+    Map<String, ChronoUnit> labelToUnit = new HashMap<>();
+    for (TimeUnit timeUnit : TimeUnit.values()) {
+      for (String label : timeUnit.getLabels()) {
+        labelToUnit.put(label, timeUnit.getUnit());
+      }
+    }
+    return labelToUnit;
+  }
+
+  /**
+   * @param duration to convert to string
+   * @return duration string in millis
+   */
+  public static String getStringInMillis(final Duration duration) {
+    return duration.toMillis() + TimeUnit.MILLISECONDS.labels.get(0);
+  }
+
+  /**
+   * Pretty prints the duration as a lowest granularity unit that does not 
lose precision.
+   *
+   * <p>Examples:
+   *
+   * <pre>{@code
+   * Duration.ofMilliseconds(60000) will be printed as 1 min
+   * Duration.ofHours(1).plusSeconds(1) will be printed as 3601 s
+   * }</pre>
+   *
+   * <b>NOTE:</b> It supports only durations that fit into long.
+   */
+  public static String formatWithHighestUnit(Duration duration) {
+    long nanos = duration.toNanos();
+
+    TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
+    return String.format(
+        "%d %s",
+        nanos / highestIntegerUnit.unit.getDuration().toNanos(),
+        highestIntegerUnit.getLabels().get(0));
+  }
+
+  private static TimeUnit getHighestIntegerUnit(long nanos) {
+    if (nanos == 0) {
+      return TimeUnit.MILLISECONDS;
+    }
+
+    final List<TimeUnit> orderedUnits =
+        Arrays.asList(
+            TimeUnit.NANOSECONDS,
+            TimeUnit.MICROSECONDS,
+            TimeUnit.MILLISECONDS,
+            TimeUnit.SECONDS,
+            TimeUnit.MINUTES,
+            TimeUnit.HOURS,
+            TimeUnit.DAYS);
+
+    TimeUnit highestIntegerUnit = null;
+    for (TimeUnit timeUnit : orderedUnits) {
+      if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
+        break;
+      }
+      highestIntegerUnit = timeUnit;
+    }
+
+    return AssertUtils.notNull(highestIntegerUnit, "Should find a 
highestIntegerUnit.");
+  }
+
+  /** Enum which defines time unit, mostly used to parse value from 
configuration file. */
+  private enum TimeUnit {
+    DAYS(ChronoUnit.DAYS, singular("d"), plural("day")),
+    HOURS(ChronoUnit.HOURS, singular("h"), plural("hour")),
+    MINUTES(ChronoUnit.MINUTES, singular("min"), singular("m"), 
plural("minute")),
+    SECONDS(ChronoUnit.SECONDS, singular("s"), plural("sec"), 
plural("second")),
+    MILLISECONDS(ChronoUnit.MILLIS, singular("ms"), plural("milli"), 
plural("millisecond")),
+    MICROSECONDS(ChronoUnit.MICROS, singular("µs"), plural("micro"), 
plural("microsecond")),
+    NANOSECONDS(ChronoUnit.NANOS, singular("ns"), plural("nano"), 
plural("nanosecond"));
+
+    private static final String PLURAL_SUFFIX = "s";
+
+    private final List<String> labels;
+
+    private final ChronoUnit unit;
+
+    TimeUnit(ChronoUnit unit, String[]... labels) {
+      this.unit = unit;
+      this.labels =
+          Arrays.stream(labels).flatMap(ls -> 
Arrays.stream(ls)).collect(Collectors.toList());
+    }
+
+    /**
+     * @param label the original label
+     * @return the singular format of the original label
+     */
+    private static String[] singular(String label) {
+      return new String[] {label};
+    }
+
+    /**
+     * @param label the original label
+     * @return both the singular format and plural format of the original label
+     */
+    private static String[] plural(String label) {
+      return new String[] {label, label + PLURAL_SUFFIX};
+    }
+
+    public List<String> getLabels() {
+      return labels;
+    }
+
+    public ChronoUnit getUnit() {
+      return unit;
+    }
+
+    public static String getAllUnits() {
+      return Arrays.stream(TimeUnit.values())
+          .map(TimeUnit::createTimeUnitString)
+          .collect(Collectors.joining(", "));
+    }
+
+    private static String createTimeUnitString(TimeUnit timeUnit) {
+      return timeUnit.name() + ": (" + String.join(" | ", 
timeUnit.getLabels()) + ")";
+    }
+  }
+
+  private static ChronoUnit toChronoUnit(java.util.concurrent.TimeUnit 
timeUnit) {
+    switch (timeUnit) {
+      case NANOSECONDS:
+        return ChronoUnit.NANOS;
+      case MICROSECONDS:
+        return ChronoUnit.MICROS;
+      case MILLISECONDS:
+        return ChronoUnit.MILLIS;
+      case SECONDS:
+        return ChronoUnit.SECONDS;
+      case MINUTES:
+        return ChronoUnit.MINUTES;
+      case HOURS:
+        return ChronoUnit.HOURS;
+      case DAYS:
+        return ChronoUnit.DAYS;
+      default:
+        throw new IllegalArgumentException(String.format("Unsupported time 
unit %s.", timeUnit));
+    }
+  }
+}
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java
new file mode 100644
index 000000000..850985f7f
--- /dev/null
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.snakeyaml.engine.v2.api.Dump;
+import org.snakeyaml.engine.v2.api.DumpSettings;
+import org.snakeyaml.engine.v2.api.Load;
+import org.snakeyaml.engine.v2.api.LoadSettings;
+import org.snakeyaml.engine.v2.common.FlowStyle;
+import org.snakeyaml.engine.v2.exceptions.Mark;
+import org.snakeyaml.engine.v2.exceptions.MarkedYamlEngineException;
+import org.snakeyaml.engine.v2.exceptions.YamlEngineException;
+import org.snakeyaml.engine.v2.nodes.Node;
+import org.snakeyaml.engine.v2.nodes.ScalarNode;
+import org.snakeyaml.engine.v2.nodes.Tag;
+import org.snakeyaml.engine.v2.representer.StandardRepresenter;
+import org.snakeyaml.engine.v2.schema.CoreSchema;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains utility methods to load standard yaml file and convert 
object to standard
+ * yaml syntax.
+ */
+public class YamlParserUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(YamlParserUtils.class);
+
+  private static final DumpSettings blockerDumperSettings =
+      DumpSettings.builder()
+          .setDefaultFlowStyle(FlowStyle.BLOCK)
+          // Disable split long lines to avoid add unexpected line breaks
+          .setSplitLines(false)
+          .setSchema(new CoreSchema())
+          .build();
+
+  private static final DumpSettings flowDumperSettings =
+      DumpSettings.builder()
+          .setDefaultFlowStyle(FlowStyle.FLOW)
+          // Disable split long lines to avoid add unexpected line breaks
+          .setSplitLines(false)
+          .setSchema(new CoreSchema())
+          .build();
+
+  private static final Dump blockerDumper =
+      new Dump(blockerDumperSettings, new 
FlinkConfigRepresenter(blockerDumperSettings));
+
+  private static final Dump flowDumper =
+      new Dump(flowDumperSettings, new 
FlinkConfigRepresenter(flowDumperSettings));
+
+  private static final Load loader =
+      new Load(LoadSettings.builder().setSchema(new CoreSchema()).build());
+
+  /**
+   * Loads the contents of the given YAML file into a map.
+   *
+   * @param file the YAML file to load.
+   * @return a non-null map representing the YAML content. If the file is 
empty or only contains
+   *     comments, an empty map is returned.
+   * @throws FileNotFoundException if the YAML file is not found.
+   * @throws YamlEngineException if the file cannot be parsed.
+   * @throws IOException if an I/O error occurs while reading from the file 
stream.
+   */
+  public static synchronized @Nonnull Map<String, Object> loadYamlFile(File 
file) throws Exception {
+    try (FileInputStream inputStream = new FileInputStream((file))) {
+      Map<String, Object> yamlResult =
+          (Map<String, Object>) loader.loadFromInputStream(inputStream);
+
+      return yamlResult == null ? new HashMap<>() : yamlResult;
+    } catch (FileNotFoundException e) {
+      LOG.error("Failed to find YAML file", e);
+      throw e;
+    } catch (IOException | YamlEngineException e) {
+      if (e instanceof MarkedYamlEngineException) {
+        YamlEngineException exception =
+            wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+        LOG.error("Failed to parse YAML configuration", exception);
+        throw exception;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  public static synchronized @Nonnull Map<String, Object> 
loadYamlInput(InputStream inputStream)
+      throws Exception {
+    try {
+      Map<String, Object> yamlResult =
+          (Map<String, Object>) loader.loadFromInputStream(inputStream);
+      return yamlResult == null ? new HashMap<>() : yamlResult;
+    } catch (YamlEngineException e) {
+      if (e instanceof MarkedYamlEngineException) {
+        YamlEngineException exception =
+            wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+        LOG.error("Failed to parse YAML configuration", exception);
+        throw exception;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  public static synchronized @Nonnull Map<String, Object> 
loadYamlString(String text) {
+    try {
+      Map<String, Object> yamlResult = (Map<String, Object>) 
loader.loadFromString(text);
+
+      return yamlResult == null ? new HashMap<>() : yamlResult;
+    } catch (YamlEngineException e) {
+      if (e instanceof MarkedYamlEngineException) {
+        YamlEngineException exception =
+            wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+        LOG.error("Failed to parse YAML configuration", exception);
+        throw exception;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Converts the given value to a string representation in the YAML syntax. 
This method uses a YAML
+   * parser to convert the object to YAML format.
+   *
+   * <p>The resulting YAML string may have line breaks at the end of each 
line. This method removes
+   * the line break at the end of the string if it exists.
+   *
+   * <p>Note: This method may perform escaping on certain characters in the 
value to ensure proper
+   * YAML syntax.
+   *
+   * @param value The value to be converted.
+   * @return The string representation of the value in YAML syntax.
+   */
+  public static synchronized String toYAMLString(Object value) {
+    try {
+      String output = flowDumper.dumpToString(value);
+      // remove the line break
+      String linebreak = flowDumperSettings.getBestLineBreak();
+      if (output.endsWith(linebreak)) {
+        output = output.substring(0, output.length() - linebreak.length());
+      }
+      return output;
+    } catch (MarkedYamlEngineException exception) {
+      throw wrapExceptionToHiddenSensitiveData(exception);
+    }
+  }
+
+  /**
+   * Converts a flat map into a nested map structure and outputs the result as 
a list of
+   * YAML-formatted strings. Each item in the list represents a single line of 
the YAML data. The
+   * method is synchronized and thus thread-safe.
+   *
+   * @param flattenMap A map containing flattened keys (e.g., 
"parent.child.key") associated with
+   *     their values.
+   * @return A list of strings that represents the YAML data, where each item 
corresponds to a line
+   *     of the data.
+   */
+  @SuppressWarnings("unchecked")
+  public static synchronized List<String> convertAndDumpYamlFromFlatMap(
+      Map<String, Object> flattenMap) {
+    try {
+      Map<String, Object> nestedMap = new LinkedHashMap<>();
+      for (Map.Entry<String, Object> entry : flattenMap.entrySet()) {
+        String[] keys = entry.getKey().split("\\.");
+        Map<String, Object> currentMap = nestedMap;
+        for (int i = 0; i < keys.length - 1; i++) {
+          currentMap =
+              (Map<String, Object>) currentMap.computeIfAbsent(keys[i], k -> 
new LinkedHashMap<>());
+        }
+        currentMap.put(keys[keys.length - 1], entry.getValue());
+      }
+      String data = blockerDumper.dumpToString(nestedMap);
+      String linebreak = blockerDumperSettings.getBestLineBreak();
+      return Arrays.asList(data.split(linebreak));
+    } catch (MarkedYamlEngineException exception) {
+      throw wrapExceptionToHiddenSensitiveData(exception);
+    }
+  }
+
+  public static synchronized <T> T convertToObject(String value, Class<T> 
type) {
+    try {
+      return type.cast(loader.loadFromString(value));
+    } catch (MarkedYamlEngineException exception) {
+      throw wrapExceptionToHiddenSensitiveData(exception);
+    }
+  }
+
+  /**
+   * This method wraps a MarkedYAMLException to hide sensitive data in its 
message. Before using
+   * this method, an exception message might include sensitive information 
like:
+   *
+   * <pre>{@code
+   * while constructing a mapping
+   * in 'reader', line 1, column 1:
+   *     key1: secret1
+   *     ^
+   * found duplicate key key1
+   * in 'reader', line 2, column 1:
+   *     key1: secret2
+   *     ^
+   * }</pre>
+   *
+   * <p>After using this method, the message will be sanitized to hide the 
sensitive details:
+   *
+   * <pre>{@code
+   * while constructing a mapping
+   * in 'reader', line 1, column 1
+   * found duplicate key key1
+   * in 'reader', line 2, column 1
+   * }</pre>
+   *
+   * @param exception The MarkedYamlEngineException containing potentially 
sensitive data.
+   * @return A YamlEngineException with a message that has sensitive data 
hidden.
+   */
+  private static YamlEngineException wrapExceptionToHiddenSensitiveData(
+      MarkedYamlEngineException exception) {
+    StringBuilder lines = new StringBuilder();
+    String context = exception.getContext();
+    Optional<Mark> contextMark = exception.getContextMark();
+    Optional<Mark> problemMark = exception.getProblemMark();
+    String problem = exception.getProblem();
+
+    if (context != null) {
+      lines.append(context);
+      lines.append("\n");
+    }
+
+    if (contextMark.isPresent()
+        && (problem == null
+            || !problemMark.isPresent()
+            || contextMark.get().getName().equals(problemMark.get().getName())
+            || contextMark.get().getLine() != problemMark.get().getLine()
+            || contextMark.get().getColumn() != 
problemMark.get().getColumn())) {
+      lines.append(hiddenSensitiveDataInMark(contextMark.get()));
+      lines.append("\n");
+    }
+
+    if (problem != null) {
+      lines.append(problem);
+      lines.append("\n");
+    }
+
+    if (problemMark.isPresent()) {
+      lines.append(hiddenSensitiveDataInMark(problemMark.get()));
+      lines.append("\n");
+    }
+
+    Throwable cause = exception.getCause();
+    if (cause instanceof MarkedYamlEngineException) {
+      cause = wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) 
cause);
+    }
+
+    YamlEngineException yamlException = new 
YamlEngineException(lines.toString(), cause);
+    yamlException.setStackTrace(exception.getStackTrace());
+    return yamlException;
+  }
+
+  /**
+   * This method is a mock implementation of the Mark#toString() method, 
specifically designed to
+   * exclude the Mark#get_snippet(), to prevent leaking any sensitive data.
+   */
+  private static String hiddenSensitiveDataInMark(Mark mark) {
+    return " in "
+        + mark.getName()
+        + ", line "
+        + (mark.getLine() + 1)
+        + ", column "
+        + (mark.getColumn() + 1);
+  }
+
+  private static class FlinkConfigRepresenter extends StandardRepresenter {
+    public FlinkConfigRepresenter(DumpSettings dumpSettings) {
+      super(dumpSettings);
+      representers.put(Duration.class, this::representDuration);
+      representers.put(MemorySize.class, this::representMemorySize);
+      parentClassRepresenters.put(Enum.class, this::representEnum);
+    }
+
+    private Node representDuration(Object data) {
+      Duration duration = (Duration) data;
+      String durationString = TimeUtils.formatWithHighestUnit(duration);
+      return new ScalarNode(Tag.STR, durationString, 
settings.getDefaultScalarStyle());
+    }
+
+    private Node representMemorySize(Object data) {
+      MemorySize memorySize = (MemorySize) data;
+      return new ScalarNode(Tag.STR, memorySize.toString(), 
settings.getDefaultScalarStyle());
+    }
+
+    private Node representEnum(Object data) {
+      return new ScalarNode(Tag.STR, data.toString(), 
settings.getDefaultScalarStyle());
+    }
+  }
+}
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f839fa207..f66b113f3 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -18,7 +18,6 @@ package org.apache.streampark.common.util
 
 import com.typesafe.config.ConfigFactory
 import org.apache.commons.lang3.StringUtils
-import org.yaml.snakeyaml.Yaml
 
 import javax.annotation.Nonnull
 
@@ -82,16 +81,8 @@ object PropertiesUtils extends Logger {
   }
 
   def fromYamlText(text: String): Map[String, String] = {
-    try {
-      new Yaml()
-        .load(text)
-        .asInstanceOf[java.util.Map[String, Map[String, Any]]]
-        .flatMap(x => eachYamlItem(x._1, x._2))
-        .toMap
-    } catch {
-      case e: IOException =>
-        throw new IllegalArgumentException(s"Failed when loading conf error:", 
e)
-    }
+    val map = YamlParserUtils.loadYamlString(text)
+    map.flatMap(x => eachYamlItem(x._1, x._2)).toMap
   }
 
   def fromHoconText(conf: String): Map[String, String] = {
@@ -146,11 +137,8 @@ object PropertiesUtils extends Logger {
       inputStream != null,
       s"[StreamPark] fromYamlFile: Properties inputStream  must not be null")
     try {
-      new Yaml()
-        .load(inputStream)
-        .asInstanceOf[java.util.Map[String, Map[String, Any]]]
-        .flatMap(x => eachYamlItem(x._1, x._2))
-        .toMap
+      val map = YamlParserUtils.loadYamlInput(inputStream)
+      map.flatMap(x => eachYamlItem(x._1, x._2)).toMap
     } catch {
       case e: IOException =>
         throw new IllegalArgumentException(s"Failed when loading yaml from 
inputStream", e)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 4cdcf07c1..86bc2bf8d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -26,7 +26,6 @@ import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
 
 import org.apache.commons.io.FileUtils
-import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
 import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, 
SavepointRestoreSettings}
 
 import javax.annotation.Nullable
@@ -35,7 +34,7 @@ import java.io.File
 import java.util.{Map => JavaMap}
 
 import scala.collection.JavaConversions._
-import scala.util.{Success, Try}
+import scala.util.Try
 
 case class SubmitRequest(
     flinkVersion: FlinkVersion,
@@ -99,32 +98,6 @@ case class SubmitRequest(
     }
   }
 
-  lazy val flinkDefaultConfiguration: Configuration = {
-    
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) 
match {
-      case Success(value) =>
-        executionMode match {
-          case ExecutionMode.YARN_SESSION | 
ExecutionMode.KUBERNETES_NATIVE_SESSION |
-              ExecutionMode.REMOTE =>
-            value
-          case _ =>
-            value
-              .keySet()
-              .foreach(
-                k => {
-                  val v = value.getString(k, null)
-                  if (v != null) {
-                    val result = v
-                      .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
effectiveAppName)
-                      .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", 
id.toString)
-                    value.setString(k, result)
-                  }
-                })
-            value
-        }
-      case _ => new Configuration()
-    }
-  }
-
   def hasProp(key: String): Boolean = properties.containsKey(key)
 
   def getProp(key: String): Any = properties.get(key)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
index 2e642312d..0a40d9a7e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
@@ -87,6 +87,11 @@
             <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.snakeyaml</groupId>
+            <artifactId>snakeyaml-engine</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java
new file mode 100644
index 000000000..715972d50
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.streampark.common.util.AssertUtils;
+import org.apache.streampark.common.util.YamlParserUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Global configuration object for Flink. Similar to Java properties 
configuration objects it
+ * includes key-value pairs which represent the framework's configuration.
+ */
+public final class FlinkGlobalConfiguration {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkGlobalConfiguration.class);
+
+  public static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
+
+  public static final String FLINK_CONF_FILENAME = "config.yaml";
+
+  // key separator character
+  private static final String KEY_SEPARATOR = ".";
+
+  // the keys whose values should be hidden
+  private static final String[] SENSITIVE_KEYS =
+      new String[] {
+        "password",
+        "secret",
+        "fs.azure.account.key",
+        "apikey",
+        "auth-params",
+        "service-key",
+        "token",
+        "basic-auth",
+        "jaas.config",
+        "http-headers"
+      };
+
+  // the hidden content to be displayed
+  public static final String HIDDEN_CONTENT = "******";
+
+  private static boolean standardYaml = true;
+
+  // 
--------------------------------------------------------------------------------------------
+
+  private FlinkGlobalConfiguration() {}
+
+  // 
--------------------------------------------------------------------------------------------
+
+  /**
+   * Loads the global configuration from the environment. Fails if an error 
occurs during loading.
+   * Returns an empty configuration object if the environment variable is not 
set. In production
+   * this variable is set but tests and local execution/debugging don't have 
this environment
+   * variable set. That's why we should fail if it is not set.
+   *
+   * @return Returns the Configuration
+   */
+  public static Configuration loadConfiguration() {
+    return loadConfiguration(new Configuration());
+  }
+
+  /**
+   * Loads the global configuration and adds the given dynamic properties 
configuration.
+   *
+   * @param dynamicProperties The given dynamic properties
+   * @return Returns the loaded global configuration with dynamic properties
+   */
+  public static Configuration loadConfiguration(Configuration 
dynamicProperties) {
+    final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+    if (configDir == null) {
+      return new Configuration(dynamicProperties);
+    }
+
+    return loadConfiguration(configDir, dynamicProperties);
+  }
+
+  /**
+   * Loads the configuration files from the specified directory.
+   *
+   * <p>YAML files are supported as configuration files.
+   *
+   * @param configDir the directory which contains the configuration files
+   */
+  public static Configuration loadConfiguration(final String configDir) {
+    return loadConfiguration(configDir, null);
+  }
+
+  /**
+   * Loads the configuration files from the specified directory. If the 
dynamic properties
+   * configuration is not null, then it is added to the loaded configuration.
+   *
+   * @param configDir directory to load the configuration from
+   * @param dynamicProperties configuration file containing the dynamic 
properties. Null if none.
+   * @return The configuration loaded from the given configuration directory
+   */
+  public static Configuration loadConfiguration(
+      final String configDir, @Nullable final Configuration dynamicProperties) 
{
+
+    if (configDir == null) {
+      throw new IllegalArgumentException(
+          "Given configuration directory is null, cannot load configuration");
+    }
+
+    final File confDirFile = new File(configDir);
+    if (!(confDirFile.exists())) {
+      throw new IllegalConfigurationException(
+          "The given configuration directory name '"
+              + configDir
+              + "' ("
+              + confDirFile.getAbsolutePath()
+              + ") does not describe an existing directory.");
+    }
+
+    // get Flink yaml configuration file
+    File yamlConfigFile = new File(confDirFile, LEGACY_FLINK_CONF_FILENAME);
+    Configuration configuration;
+
+    if (!yamlConfigFile.exists()) {
+      yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
+      if (!yamlConfigFile.exists()) {
+        throw new IllegalConfigurationException(
+            "The Flink config file '"
+                + yamlConfigFile
+                + "' ("
+                + yamlConfigFile.getAbsolutePath()
+                + ") does not exist.");
+      } else {
+        standardYaml = true;
+        LOG.info(
+            "Using standard YAML parser to load flink configuration file from 
{}.",
+            yamlConfigFile.getAbsolutePath());
+        configuration = loadYAMLResource(yamlConfigFile);
+      }
+    } else {
+      standardYaml = false;
+      LOG.info(
+          "Using legacy YAML parser to load flink configuration file from {}.",
+          yamlConfigFile.getAbsolutePath());
+      configuration = loadLegacyYAMLResource(yamlConfigFile);
+    }
+
+    logConfiguration("Loading", configuration);
+
+    if (dynamicProperties != null) {
+      logConfiguration("Loading dynamic", dynamicProperties);
+      configuration.addAll(dynamicProperties);
+    }
+
+    return configuration;
+  }
+
+  private static void logConfiguration(String prefix, Configuration config) {
+    config.confData.forEach(
+        (key, value) ->
+            LOG.info(
+                "{} configuration property: {}, {}",
+                prefix,
+                key,
+                isSensitive(key) ? HIDDEN_CONTENT : value));
+  }
+
+  private static Configuration loadLegacyYAMLResource(File file) {
+    final Configuration config = new Configuration();
+
+    try (BufferedReader reader =
+        new BufferedReader(new InputStreamReader(new FileInputStream(file)))) {
+
+      String line;
+      int lineNo = 0;
+      while ((line = reader.readLine()) != null) {
+        lineNo++;
+        // 1. check for comments
+        String[] comments = line.split("#", 2);
+        String conf = comments[0].trim();
+
+        // 2. get key and value
+        if (conf.length() > 0) {
+          String[] kv = conf.split(": ", 2);
+
+          // skip line with no valid key-value pair
+          if (kv.length == 1) {
+            LOG.warn(
+                "Error while trying to split key and value in configuration 
file "
+                    + file
+                    + ":"
+                    + lineNo
+                    + ": Line is not a key-value pair (missing space after 
':'?)");
+            continue;
+          }
+
+          String key = kv[0].trim();
+          String value = kv[1].trim();
+
+          // sanity check
+          if (key.length() == 0 || value.length() == 0) {
+            LOG.warn(
+                "Error after splitting key and value in configuration file "
+                    + file
+                    + ":"
+                    + lineNo
+                    + ": Key or value was empty");
+            continue;
+          }
+
+          config.setString(key, value);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error parsing YAML configuration.", e);
+    }
+
+    return config;
+  }
+
+  private static Map<String, Object> flatten(Map<String, Object> config, 
String keyPrefix) {
+    final Map<String, Object> flattenedMap = new HashMap<>();
+
+    config.forEach(
+        (key, value) -> {
+          String flattenedKey = keyPrefix + key;
+          if (value instanceof Map) {
+            Map<String, Object> e = (Map<String, Object>) value;
+            flattenedMap.putAll(flatten(e, flattenedKey + KEY_SEPARATOR));
+          } else {
+            if (value instanceof List) {
+              flattenedMap.put(flattenedKey, 
YamlParserUtils.toYAMLString(value));
+            } else {
+              flattenedMap.put(flattenedKey, value);
+            }
+          }
+        });
+
+    return flattenedMap;
+  }
+
+  private static Map<String, Object> flatten(Map<String, Object> config) {
+    // Since we start flattening from the root, keys should not be prefixed 
with anything.
+    return flatten(config, "");
+  }
+
+  private static Configuration loadYAMLResource(File file) {
+    final Configuration config = new Configuration();
+
+    try {
+      Map<String, Object> configDocument = 
flatten(YamlParserUtils.loadYamlFile(file));
+      configDocument.forEach((k, v) -> config.setValueInternal(k, v, false));
+
+      return config;
+    } catch (Exception e) {
+      throw new RuntimeException("Error parsing YAML configuration.", e);
+    }
+  }
+
+  /**
+   * Check whether the key is a hidden key.
+   *
+   * @param key the config key
+   */
+  public static boolean isSensitive(String key) {
+    AssertUtils.notNull(key, "key");
+    final String keyInLower = key.toLowerCase();
+    for (String hideKey : SENSITIVE_KEYS) {
+      if (keyInLower.length() >= hideKey.length() && 
keyInLower.contains(hideKey)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static String getFlinkConfFilename() {
+    if (isStandardYaml()) {
+      return FLINK_CONF_FILENAME;
+    } else {
+      return LEGACY_FLINK_CONF_FILENAME;
+    }
+  }
+
+  public static boolean isStandardYaml() {
+    return standardYaml;
+  }
+
+  public static void setStandardYaml(boolean standardYaml) {
+    FlinkGlobalConfiguration.standardYaml = standardYaml;
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 0437dbaab..b652aa67c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode}
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode}
 import org.apache.streampark.common.util.{DeflaterUtils, Logger, 
PropertiesUtils, Utils}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -284,7 +284,8 @@ trait FlinkClientTrait extends Logger {
     val configurationDirectory = s"$flinkHome/conf"
     // 2. load the custom command lines
     val flinkConfig =
-      
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new 
Configuration())
+      Try(FlinkGlobalConfiguration.loadConfiguration(s"$flinkHome/conf"))
+        .getOrElse(new Configuration())
     loadCustomCommandLines(flinkConfig, configurationDirectory)
   }
 
@@ -413,7 +414,8 @@ trait FlinkClientTrait extends Logger {
       validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome), 
commandLine)
 
     val flinkDefaultConfiguration =
-      
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new 
Configuration())
+      Try(FlinkGlobalConfiguration.loadConfiguration(s"$flinkHome/conf"))
+        .getOrElse(new Configuration())
 
     val configuration = new Configuration(flinkDefaultConfiguration)
     configuration.addAll(activeCommandLine.toConfiguration(commandLine))
@@ -451,6 +453,39 @@ trait FlinkClientTrait extends Logger {
     }
   }
 
+  implicit private[client] class EnhanceConfiguration(request: SubmitRequest) {
+    lazy val flinkDefaultConfiguration: Configuration = {
+      Try(
+        FlinkGlobalConfiguration.loadConfiguration(
+          s"${request.flinkVersion.flinkHome}/conf")) match {
+        case Success(value) =>
+          request.executionMode match {
+            case ExecutionMode.YARN_SESSION | 
ExecutionMode.KUBERNETES_NATIVE_SESSION |
+                ExecutionMode.REMOTE =>
+              value
+            case _ =>
+              value
+                .keySet()
+                .foreach(
+                  k => {
+                    val v = value.getString(k, null)
+                    if (v != null) {
+                      val result = v
+                        .replaceAll(
+                          "\\$\\{job(Name|name)}|\\$job(Name|name)",
+                          request.effectiveAppName)
+                        .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", 
request.id.toString)
+                      value.setString(k, result)
+                    }
+                  })
+              value
+          }
+        case _ => new Configuration()
+      }
+    }
+
+  }
+
   private[client] def cancelJob(
       cancelRequest: CancelRequest,
       jobID: JobID,
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
index 6451d5908..f51defcaf 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
@@ -65,7 +65,7 @@ object YarnPerJobTestCase extends Logger {
   lazy val flinkDefaultConfiguration: Configuration = {
     require(FLINK_HOME != null)
     // get flink config
-    GlobalConfiguration.loadConfiguration(s"$FLINK_HOME/conf")
+    FlinkGlobalConfiguration.loadConfiguration(s"$FLINK_HOME/conf")
   }
 
   lazy val customCommandLines: util.List[CustomCommandLine] = {

Reply via email to