This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch flink-new-conf
in repository https://gitbox.apache.org/repos/asf/streampark.git

commit f6bd4953691cc65fc64f79e57f18a0b0d4092c28
Author: benjobs <[email protected]>
AuthorDate: Sun Jun 22 02:55:03 2025 +0800

    [Improve] Compatibility improvement for parsing Flink config file
---
 .../common/util/FlinkConfigurationUtils.scala      | 203 ++++++++++++++++
 .../streampark/common/util/PropertiesUtils.scala   | 260 ++-------------------
 .../common/util/SparkConfigurationUtils.scala      |  68 ++++++
 .../common/util/PropertiesUtilsTestCase.scala      |   4 +-
 .../console/core/entity/FlinkCluster.java          |   6 +-
 .../streampark/console/core/entity/FlinkEnv.java   |  23 +-
 .../streampark/console/core/entity/SparkEnv.java   |  39 ----
 .../console/core/runner/QuickStartRunner.java      |   5 +-
 .../impl/FlinkApplicationActionServiceImpl.java    |   4 +-
 .../impl/SparkApplicationActionServiceImpl.java    |   6 +-
 .../service/impl/FlinkSavepointServiceImpl.java    |   7 +-
 .../core/watcher/FlinkK8sWatcherWrapper.java       |   4 +-
 .../flink/client/trait/FlinkClientTrait.scala      |   2 +-
 13 files changed, 324 insertions(+), 307 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala
new file mode 100644
index 000000000..f43c0a0b8
--- /dev/null
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util
+
+import org.apache.streampark.common.util.Implicits._
+
+import org.apache.commons.lang3.StringUtils
+
+import javax.annotation.Nonnull
+
+import java.io.File
+import java.util.Scanner
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+object FlinkConfigurationUtils extends Logger {
+
+  private[this] val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
+
+  private[this] val MULTI_PROPERTY_REGEXP = 
"-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
+
+  private[this] val MULTI_PROPERTY_PATTERN = 
Pattern.compile(MULTI_PROPERTY_REGEXP)
+
+  /**
+   * @param file
+   * @return
+   */
+  def loadFlinkConf(file: File): JavaMap[String, String] = {
+    AssertUtils.required(
+      file != null && file.exists() && file.isFile,
+      "[StreamPark] loadFlinkConfYaml: file must not be null")
+    loadFlinkConf(org.apache.commons.io.FileUtils.readFileToString(file))
+  }
+
+  def loadFlinkConf(yaml: String): JavaMap[String, String] = {
+    AssertUtils.required(yaml != null && yaml.nonEmpty, "[StreamPark] 
loadFlinkConfYaml: yaml must not be null")
+    PropertiesUtils.fromYamlText(yaml)
+  }
+
+  def loadLegacyFlinkConf(file: File): JavaMap[String, String] = {
+    AssertUtils.required(
+      file != null && file.exists() && file.isFile,
+      "[StreamPark] loadFlinkConfYaml: file must not be null")
+    loadLegacyFlinkConf(org.apache.commons.io.FileUtils.readFileToString(file))
+  }
+
+  def loadLegacyFlinkConf(yaml: String): JavaMap[String, String] = {
+    AssertUtils.required(yaml != null && yaml.nonEmpty, "[StreamPark] 
loadFlinkConfYaml: yaml must not be null")
+    val flinkConf = new JavaHashMap[String, String]()
+    val scanner: Scanner = new Scanner(yaml)
+    val lineNo: AtomicInteger = new AtomicInteger(0)
+    while (scanner.hasNextLine) {
+      val line = scanner.nextLine()
+      lineNo.incrementAndGet()
+      // 1. check for comments
+      // [FLINK-27299] flink parsing parameter bug fixed.
+      val comments = line.split("^#|\\s+#", 2)
+      val conf = comments(0).trim
+      // 2. get key and value
+      if (conf.nonEmpty) {
+        val kv = conf.split(": ", 2)
+        // skip line with no valid key-value pair
+        if (kv.length == 2) {
+          val key = kv(0).trim
+          val value = kv(1).trim
+          // sanity check
+          if (key.nonEmpty && value.nonEmpty) {
+            flinkConf += key -> value
+          } else {
+            logWarn(s"Error after splitting key and value in configuration 
${lineNo.get()}: $line")
+          }
+        } else {
+          logWarn(s"Error while trying to split key and value in 
configuration. $lineNo : $line")
+        }
+      }
+    }
+    flinkConf
+  }
+
+  /** extract flink configuration from application.properties */
+  @Nonnull def extractDynamicProperties(properties: String): Map[String, 
String] = {
+    if (StringUtils.isEmpty(properties)) Map.empty[String, String]
+    else {
+      val map = mutable.Map[String, String]()
+      val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
+      simple.split("\\s?-D") match {
+        case d if Utils.isNotEmpty(d) =>
+          d.foreach(x => {
+            if (x.nonEmpty) {
+              val p = PROPERTY_PATTERN.matcher(x.trim)
+              if (p.matches) {
+                map += p.group(1).trim -> p.group(2).trim
+              }
+            }
+          })
+        case _ =>
+      }
+      val matcher = MULTI_PROPERTY_PATTERN.matcher(properties)
+      while (matcher.find()) {
+        val opts = matcher.group()
+        val index = opts.indexOf("=")
+        val key = opts.substring(2, index).trim
+        val value =
+          opts.substring(index + 1).trim.replaceAll("(^[\"|']|[\"|']$)", "")
+        map += key -> value
+      }
+      map.toMap
+    }
+  }
+
+  @Nonnull def extractArguments(args: String): List[String] = {
+    val programArgs = new ArrayBuffer[String]()
+    if (StringUtils.isNotEmpty(args)) {
+      return extractArguments(args.split("\\s+"))
+    }
+    programArgs.toList
+  }
+
+  def extractArguments(array: Array[String]): List[String] = {
+    val programArgs = new ArrayBuffer[String]()
+    val iter = array.iterator
+    while (iter.hasNext) {
+      val v = iter.next()
+      val p = v.take(1)
+      p match {
+        case "'" | "\"" =>
+          var value = v
+          if (!v.endsWith(p)) {
+            while (!value.endsWith(p) && iter.hasNext) {
+              value += s" ${iter.next()}"
+            }
+          }
+          programArgs += value.substring(1, value.length - 1)
+        case _ =>
+          val regexp = "(.*)='(.*)'$"
+          if (v.matches(regexp)) {
+            programArgs += v.replaceAll(regexp, "$1=$2")
+          } else {
+            val regexp = "(.*)=\"(.*)\"$"
+            if (v.matches(regexp)) {
+              programArgs += v.replaceAll(regexp, "$1=$2")
+            } else {
+              programArgs += v
+            }
+          }
+      }
+    }
+    programArgs.toList
+  }
+
+  def extractMultipleArguments(array: Array[String]): Map[String, Map[String, 
String]] = {
+    val iter = array.iterator
+    val map = mutable.Map[String, mutable.Map[String, String]]()
+    while (iter.hasNext) {
+      val v = iter.next()
+      v.take(2) match {
+        case "--" =>
+          val kv = iter.next()
+          val regexp = "(.*)=(.*)"
+          if (kv.matches(regexp)) {
+            val values = kv.split("=")
+            val k1 = values(0).trim
+            val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
+            val k = v.drop(2)
+            map.get(k) match {
+              case Some(m) => m += k1 -> v1
+              case _ => map += k -> mutable.Map(k1 -> v1)
+            }
+          }
+        case _ =>
+      }
+    }
+    map.map(x => x._1 -> x._2.toMap).toMap
+  }
+
+  @Nonnull def extractDynamicPropertiesAsJava(properties: String): 
JavaMap[String, String] =
+    new JavaHashMap[String, String](extractDynamicProperties(properties))
+
+  @Nonnull def extractMultipleArgumentsAsJava(args: Array[String]): 
JavaMap[String, JavaMap[String, String]] = {
+    val map =
+      extractMultipleArguments(args).map(c => c._1 -> new JavaHashMap[String, 
String](c._2))
+    new JavaHashMap[String, JavaMap[String, String]](map)
+  }
+
+}
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f7f68ae45..32843a94f 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -23,30 +23,13 @@ import com.typesafe.config.ConfigFactory
 import org.apache.commons.lang3.StringUtils
 import org.yaml.snakeyaml.Yaml
 
-import javax.annotation.Nonnull
-
 import java.io._
 import java.util.{Properties, Scanner}
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.regex.Pattern
 
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
 
 object PropertiesUtils extends Logger {
 
-  private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
-
-  private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN = 
Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")
-
-  // scalastyle:off
-  private[this] lazy val SPARK_ARGUMENT_REGEXP = 
"\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
-  // scalastyle:on
-
-  private[this] lazy val MULTI_PROPERTY_REGEXP = 
"-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
-
-  private[this] lazy val MULTI_PROPERTY_PATTERN = 
Pattern.compile(MULTI_PROPERTY_REGEXP)
-
   def readFile(filename: String): String = {
     val file = new File(filename)
     require(file.exists(), s"[StreamPark] readFile: file $file does not exist")
@@ -60,41 +43,10 @@ object PropertiesUtils extends Logger {
     buffer.toString()
   }
 
-  private[this] def eachYamlItem(
-      k: String,
-      v: Any,
-      prefix: String = "",
-      proper: MutableMap[String, String] = MutableMap[String, String]()): 
Map[String, String] = {
-    v match {
-      case map: JavaLinkedMap[_, _] =>
-        map
-          .flatMap(x => {
-            prefix match {
-              case "" => eachYamlItem(x._1.toString, x._2, k, proper)
-              case other =>
-                eachYamlItem(x._1.toString, x._2, s"$other.$k", proper)
-            }
-          })
-          .toMap
-      case text =>
-        if (text != null) {
-          val value = text.toString.trim
-          prefix match {
-            case "" => proper += k -> value
-            case other => proper += s"$other.$k" -> value
-          }
-        }
-        proper.toMap
-    }
-  }
-
   def fromYamlText(text: String): Map[String, String] = {
     try {
-      new Yaml()
-        .load(text)
-        .asInstanceOf[java.util.Map[String, Map[String, Any]]]
-        .flatMap(x => eachYamlItem(x._1, x._2))
-        .toMap
+      val map = new Yaml().load[JavaMap[String, Object]](text)
+      flatten(map)
     } catch {
       case e: IOException =>
         throw new IllegalArgumentException(s"Failed when loading conf error:", 
e)
@@ -149,15 +101,12 @@ object PropertiesUtils extends Logger {
 
   /** Load Yaml present in the given file. */
   def fromYamlFile(inputStream: InputStream): Map[String, String] = {
-    require(
+    AssertUtils.required(
       inputStream != null,
       s"[StreamPark] fromYamlFile: Properties inputStream  must not be null")
     try {
-      new Yaml()
-        .load(inputStream)
-        .asInstanceOf[java.util.Map[String, Map[String, Any]]]
-        .flatMap(x => eachYamlItem(x._1, x._2))
-        .toMap
+      val map = new Yaml().load[JavaMap[String, Object]](inputStream)
+      flatten(map)
     } catch {
       case e: IOException =>
         throw new IllegalArgumentException(s"Failed when loading yaml from 
inputStream", e)
@@ -236,195 +185,16 @@ object PropertiesUtils extends Logger {
   def fromPropertiesFileAsJava(inputStream: InputStream): JavaMap[String, 
String] =
     new JavaHashMap[String, String](fromPropertiesFile(inputStream))
 
-  /**
-   * @param file
-   * @return
-   */
-  def loadFlinkConfYaml(file: File): JavaMap[String, String] = {
-    require(
-      file != null && file.exists() && file.isFile,
-      "[StreamPark] loadFlinkConfYaml: file must not be null")
-    loadFlinkConfYaml(org.apache.commons.io.FileUtils.readFileToString(file))
-  }
-
-  def loadFlinkConfYaml(yaml: String): JavaMap[String, String] = {
-    require(yaml != null && yaml.nonEmpty, "[StreamPark] loadFlinkConfYaml: 
yaml must not be null")
-    val flinkConf = new JavaHashMap[String, String]()
-    val scanner: Scanner = new Scanner(yaml)
-    val lineNo: AtomicInteger = new AtomicInteger(0)
-    while (scanner.hasNextLine) {
-      val line = scanner.nextLine()
-      lineNo.incrementAndGet()
-      // 1. check for comments
-      // [FLINK-27299] flink parsing parameter bug fixed.
-      val comments = line.split("^#|\\s+#", 2)
-      val conf = comments(0).trim
-      // 2. get key and value
-      if (conf.nonEmpty) {
-        val kv = conf.split(": ", 2)
-        // skip line with no valid key-value pair
-        if (kv.length == 2) {
-          val key = kv(0).trim
-          val value = kv(1).trim
-          // sanity check
-          if (key.nonEmpty && value.nonEmpty) {
-            flinkConf += key -> value
-          } else {
-            logWarn(s"Error after splitting key and value in configuration 
${lineNo.get()}: $line")
-          }
-        } else {
-          logWarn(s"Error while trying to split key and value in 
configuration. $lineNo : $line")
-        }
-      }
-    }
-    flinkConf
-  }
-
-  /** extract flink configuration from application.properties */
-  @Nonnull def extractDynamicProperties(properties: String): Map[String, 
String] = {
-    if (StringUtils.isEmpty(properties)) Map.empty[String, String]
-    else {
-      val map = mutable.Map[String, String]()
-      val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
-      simple.split("\\s?-D") match {
-        case d if Utils.isNotEmpty(d) =>
-          d.foreach(x => {
-            if (x.nonEmpty) {
-              val p = PROPERTY_PATTERN.matcher(x.trim)
-              if (p.matches) {
-                map += p.group(1).trim -> p.group(2).trim
-              }
-            }
-          })
-        case _ =>
-      }
-      val matcher = MULTI_PROPERTY_PATTERN.matcher(properties)
-      while (matcher.find()) {
-        val opts = matcher.group()
-        val index = opts.indexOf("=")
-        val key = opts.substring(2, index).trim
-        val value =
-          opts.substring(index + 1).trim.replaceAll("(^[\"|']|[\"|']$)", "")
-        map += key -> value
-      }
-      map.toMap
-    }
-  }
-
-  @Nonnull def extractArguments(args: String): List[String] = {
-    val programArgs = new ArrayBuffer[String]()
-    if (StringUtils.isNotEmpty(args)) {
-      return extractArguments(args.split("\\s+"))
-    }
-    programArgs.toList
-  }
-
-  def extractArguments(array: Array[String]): List[String] = {
-    val programArgs = new ArrayBuffer[String]()
-    val iter = array.iterator
-    while (iter.hasNext) {
-      val v = iter.next()
-      val p = v.take(1)
-      p match {
-        case "'" | "\"" =>
-          var value = v
-          if (!v.endsWith(p)) {
-            while (!value.endsWith(p) && iter.hasNext) {
-              value += s" ${iter.next()}"
-            }
-          }
-          programArgs += value.substring(1, value.length - 1)
-        case _ =>
-          val regexp = "(.*)='(.*)'$"
-          if (v.matches(regexp)) {
-            programArgs += v.replaceAll(regexp, "$1=$2")
-          } else {
-            val regexp = "(.*)=\"(.*)\"$"
-            if (v.matches(regexp)) {
-              programArgs += v.replaceAll(regexp, "$1=$2")
-            } else {
-              programArgs += v
-            }
-          }
-      }
-    }
-    programArgs.toList
-  }
-
-  def extractMultipleArguments(array: Array[String]): Map[String, Map[String, 
String]] = {
-    val iter = array.iterator
-    val map = mutable.Map[String, mutable.Map[String, String]]()
-    while (iter.hasNext) {
-      val v = iter.next()
-      v.take(2) match {
-        case "--" =>
-          val kv = iter.next()
-          val regexp = "(.*)=(.*)"
-          if (kv.matches(regexp)) {
-            val values = kv.split("=")
-            val k1 = values(0).trim
-            val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
-            val k = v.drop(2)
-            map.get(k) match {
-              case Some(m) => m += k1 -> v1
-              case _ => map += k -> mutable.Map(k1 -> v1)
-            }
-          }
-        case _ =>
-      }
-    }
-    map.map(x => x._1 -> x._2.toMap).toMap
-  }
-
-  @Nonnull def extractDynamicPropertiesAsJava(properties: String): 
JavaMap[String, String] =
-    new JavaHashMap[String, String](extractDynamicProperties(properties))
-
-  @Nonnull def extractMultipleArgumentsAsJava(
-      args: Array[String]): JavaMap[String, JavaMap[String, String]] = {
-    val map =
-      extractMultipleArguments(args).map(c => c._1 -> new JavaHashMap[String, 
String](c._2))
-    new JavaHashMap[String, JavaMap[String, String]](map)
-  }
-
-  /** extract spark configuration from sparkApplication.appProperties */
-  @Nonnull def extractSparkPropertiesAsJava(properties: String): 
JavaMap[String, String] =
-    new JavaHashMap[String, String](extractSparkProperties(properties))
-
-  @Nonnull def extractSparkProperties(properties: String): Map[String, String] 
= {
-    if (StringUtils.isEmpty(properties)) Map.empty[String, String]
-    else {
-      val map = mutable.Map[String, String]()
-      properties.split("(\\s)*(--conf|-c)(\\s)+") match {
-        case d if Utils.isNotEmpty(d) =>
-          d.foreach(x => {
-            if (x.nonEmpty) {
-              val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
-              if (p.matches) {
-                map += p.group(1).trim -> p.group(2).trim
-              }
-            }
-          })
-        case _ =>
-      }
-      map.toMap
-    }
+  private[this] def flatten(map: JavaMap[String, Object], prefix: String = 
""): Map[String, String] = {
+    map.asScala.flatMap {
+      case (k, v: JavaMap[String, Object] @unchecked) => flatten(v, 
s"$prefix$k.")
+      case (k, v: String) =>
+        if (StringUtils.isBlank(v)) Map.empty[String, String] else 
Map(s"$prefix$k" -> v)
+      case (k, v: JavaCollection[_]) =>
+        if (v.isEmpty) Map.empty[String, String] else Map(s"$prefix$k" -> 
v.toString)
+      case (k, v) =>
+        if (v == null) Map.empty[String, String] else Map(s"$prefix$k" -> 
v.toString)
+    }.toMap
   }
 
-  /** extract spark configuration from sparkApplication.appArgs */
-  @Nonnull def extractSparkArgumentsAsJava(arguments: String): 
JavaList[String] = {
-    val list = new JavaArrayList[String]()
-    if (StringUtils.isEmpty(arguments)) list
-    else {
-      arguments.split(SPARK_ARGUMENT_REGEXP) match {
-        case d if Utils.isNotEmpty(d) =>
-          d.foreach(x => {
-            if (x.nonEmpty) {
-              list.add(x)
-            }
-          })
-        case _ =>
-      }
-      list
-    }
-  }
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala
new file mode 100644
index 000000000..281914e8b
--- /dev/null
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util
+
+import org.apache.streampark.common.util.Implicits._
+
+import com.google.common.collect.Lists
+import org.apache.commons.lang3.StringUtils
+
+import javax.annotation.Nonnull
+
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+
+object SparkConfigurationUtils extends Logger {
+
+  private[this] val SPARK_PROPERTY_COMPLEX_PATTERN = 
Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")
+
+  // scalastyle:off
+  private[this] val SPARK_ARGUMENT_REGEXP = 
"\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
+  // scalastyle:on
+
+  /** extract spark configuration from sparkApplication.appProperties */
+  @Nonnull def extractPropertiesAsJava(properties: String): JavaMap[String, 
String] =
+    new JavaHashMap[String, String](extractProperties(properties))
+
+  @Nonnull def extractProperties(properties: String): Map[String, String] = {
+    if (StringUtils.isEmpty(properties)) {
+      Map.empty[String, String]
+    } else {
+      val map = mutable.Map[String, String]()
+      properties.split("(\\s)*(--conf|-c)(\\s)+").filter(Utils.isNotEmpty(_))
+        .foreach(x =>
+          if (x.nonEmpty) {
+            val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
+            if (p.matches) {
+              map += p.group(1).trim -> p.group(2).trim
+            }
+          })
+      map.toMap
+    }
+  }
+
+  /** extract spark configuration from sparkApplication.appArgs */
+  @Nonnull def extractArgumentsAsJava(arguments: String): JavaList[String] = {
+    if (StringUtils.isEmpty(arguments)) {
+      Lists.newArrayList()
+    } else {
+      arguments.split(SPARK_ARGUMENT_REGEXP).filter(_.nonEmpty).toList.asJava
+    }
+  }
+}
diff --git 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 20aadc6c1..40f9a5afc 100644
--- 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++ 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -39,7 +39,7 @@ class PropertiesUtilsTestCase {
         "--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 " +
         "--sink-conf sink.label-prefix=label" +
         "--table-conf replication_num=1"
-    val programArgs = PropertiesUtils.extractArguments(args)
+    val programArgs = FlinkConfigurationUtils.extractArguments(args)
     Assertions.assertTrue(programArgs.contains("username=root"))
   }
 
@@ -56,7 +56,7 @@ class PropertiesUtilsTestCase {
         |
         |""".stripMargin
 
-    val map = PropertiesUtils.extractDynamicProperties(dynamicProperties)
+    val map = 
FlinkConfigurationUtils.extractDynamicProperties(dynamicProperties)
     Assertions.assertEquals(map("env.java.opts1"), "-Dfile.encoding=UTF-8")
     Assertions.assertEquals(map("env.java.opts2"), "-Dfile.enc\\\"oding=UTF-8")
     Assertions.assertEquals(map("env.java.opts3"), " -Dfile.encoding=UTF-8")
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 800a6e2b2..b50747ce0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -22,8 +22,8 @@ import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkDeployMode;
 import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.ResolveOrder;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
 import org.apache.streampark.common.util.HttpClientUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.core.util.YarnQueueLabelExpression;
 
@@ -180,8 +180,8 @@ public class FlinkCluster implements Serializable {
     @JsonIgnore
     public Map<String, Object> getProperties() {
         Map<String, Object> propertyMap = new HashMap<>();
-        Map<String, String> dynamicPropertyMap = PropertiesUtils
-            .extractDynamicPropertiesAsJava(this.getDynamicProperties());
+        Map<String, String> dynamicPropertyMap =
+            
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(this.getDynamicProperties());
         propertyMap.putAll(this.getOptionMap());
         propertyMap.putAll(dynamicPropertyMap);
         ResolveOrder resolveOrder = ResolveOrder.of(this.getResolveOrder());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index b700bdf67..7b13727a9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -19,7 +19,7 @@ package org.apache.streampark.console.core.entity;
 
 import org.apache.streampark.common.conf.FlinkVersion;
 import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
@@ -76,9 +76,8 @@ public class FlinkEnv implements Serializable {
     private transient String streamParkScalaVersion = 
scala.util.Properties.versionNumberString();
 
     public void doSetFlinkConf() throws ApiDetailException {
-
         File yaml;
-        float ver = 
Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
+        Float ver = getVersionNumber();
         if (ver < 1.19f) {
             yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
             if (!yaml.exists()) {
@@ -119,7 +118,10 @@ public class FlinkEnv implements Serializable {
 
     public Map<String, String> convertFlinkYamlAsMap() {
         String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
-        return PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+        if (isLegacyFlinkConf()) {
+            return 
FlinkConfigurationUtils.loadLegacyFlinkConf(flinkYamlString);
+        }
+        return FlinkConfigurationUtils.loadFlinkConf(flinkYamlString);
     }
 
     @JsonIgnore
@@ -166,8 +168,19 @@ public class FlinkEnv implements Serializable {
     public Properties getFlinkConfig() {
         String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
         Properties flinkConfig = new Properties();
-        Map<String, String> config = 
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+        Map<String, String> config = 
FlinkConfigurationUtils.loadLegacyFlinkConf(flinkYamlString);
         flinkConfig.putAll(config);
         return flinkConfig;
     }
+
+    private Float getVersionNumber() {
+        if (StringUtils.isNotBlank(this.version)) {
+            return Float.parseFloat(getVersionOfFirst() + "." + 
getVersionOfMiddle());
+        }
+        throw new RuntimeException("Flink version is null");
+    }
+
+    public boolean isLegacyFlinkConf() {
+        return getVersionNumber() < 1.19f;
+    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java
index e5c8bfd7c..dff43e0e8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java
@@ -19,11 +19,9 @@ package org.apache.streampark.console.core.entity;
 
 import org.apache.streampark.common.conf.SparkVersion;
 import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
@@ -36,8 +34,6 @@ import java.io.File;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 
 @Getter
 @Setter
@@ -91,14 +87,6 @@ public class SparkEnv implements Serializable {
         this.setScalaVersion(this.getSparkVersion().scalaVersion());
     }
 
-    public Map<String, String> convertSparkYamlAsMap() {
-        if (sparkConf == null) {
-            return new HashMap<>();
-        }
-        String sparkYamlString = DeflaterUtils.unzipString(sparkConf);
-        return PropertiesUtils.loadFlinkConfYaml(sparkYamlString);
-    }
-
     @JsonIgnore
     public SparkVersion getSparkVersion() {
         if (this.sparkVersion == null) {
@@ -113,31 +101,4 @@ public class SparkEnv implements Serializable {
         }
     }
 
-    public String getLargeVersion() {
-        if (StringUtils.isNotBlank(this.version)) {
-            return this.version.substring(0, this.version.lastIndexOf("."));
-        }
-        return null;
-    }
-
-    public String getVersionOfFirst() {
-        if (StringUtils.isNotBlank(this.version)) {
-            return this.version.split("\\.")[0];
-        }
-        return null;
-    }
-
-    public String getVersionOfMiddle() {
-        if (StringUtils.isNotBlank(this.version)) {
-            return this.version.split("\\.")[1];
-        }
-        return null;
-    }
-
-    public String getVersionOfLast() {
-        if (StringUtils.isNotBlank(this.version)) {
-            return this.version.split("\\.")[2];
-        }
-        return null;
-    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
index 554cbc4e4..0b3aab314 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
@@ -19,7 +19,7 @@ package org.apache.streampark.console.core.runner;
 
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkDeployMode;
-import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
 import org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -64,7 +64,8 @@ public class QuickStartRunner implements ApplicationRunner {
     @Override
     public void run(ApplicationArguments args) throws Exception {
 
-        Map<String, Map<String, String>> map = 
PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs());
+        Map<String, Map<String, String>> map =
+            
FlinkConfigurationUtils.extractMultipleArgumentsAsJava(args.getSourceArgs());
 
         Map<String, String> quickstart = map.get("quickstart");
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
index 4605deb99..df86e4215 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
@@ -31,8 +31,8 @@ import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
 import org.apache.streampark.common.util.HadoopUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.base.util.Tuple2;
@@ -805,7 +805,7 @@ public class FlinkApplicationActionServiceImpl
         }
 
         Map<String, String> dynamicProperties =
-            PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties);
+            
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(runtimeProperties);
         properties.putAll(dynamicProperties);
         ResolveOrder resolveOrder = 
ResolveOrder.of(application.getResolveOrder());
         if (resolveOrder != null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 18ce26cbe..8f4038090 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -28,7 +28,7 @@ import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.HadoopUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.SparkConfigurationUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.core.entity.ApplicationBuildPipeline;
@@ -322,8 +322,8 @@ public class SparkApplicationActionServiceImpl
             application.getAppName(),
             application.getMainClass(),
             appConf,
-            
PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()),
-            PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs),
+            
SparkConfigurationUtils.extractPropertiesAsJava(application.getAppProperties()),
+            SparkConfigurationUtils.extractArgumentsAsJava(applicationArgs),
             application.getApplicationType(),
             application.getHadoopUser(),
             buildResult,
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
index 523d862f4..5420c8167 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.enums.FlinkDeployMode;
 import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.InternalException;
@@ -80,7 +81,6 @@ import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
 import static 
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
-import static 
org.apache.streampark.common.util.PropertiesUtils.extractDynamicPropertiesAsJava;
 import static 
org.apache.streampark.console.core.enums.CheckPointTypeEnum.CHECKPOINT;
 
 @Slf4j
@@ -328,7 +328,7 @@ public class FlinkSavepointServiceImpl extends 
ServiceImpl<FlinkSavepointMapper,
     @VisibleForTesting
     @Nullable
     public String getSavepointFromDynamicProps(String dynamicProps) {
-        return 
extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key());
+        return 
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key());
     }
 
     /**
@@ -388,7 +388,8 @@ public class FlinkSavepointServiceImpl extends 
ServiceImpl<FlinkSavepointMapper,
      * Try get the 'state.checkpoints.num-retained' from the dynamic 
properties.
      */
     private Optional<Integer> tryGetChkNumRetainedFromDynamicProps(String 
dynamicProps) {
-        String rawCfgValue = 
extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
+        String rawCfgValue =
+            
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
         if (StringUtils.isBlank(rawCfgValue)) {
             return Optional.empty();
         }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java
index 5979c489f..4e4f09128 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.watcher;
 
 import org.apache.streampark.common.enums.FlinkDeployMode;
-import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
 import org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -128,7 +128,7 @@ public class FlinkK8sWatcherWrapper {
         FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
         Properties properties = flinkEnv.getFlinkConfig();
 
-        Map<String, String> dynamicProperties = PropertiesUtils
+        Map<String, String> dynamicProperties = FlinkConfigurationUtils
             .extractDynamicPropertiesAsJava(app.getDynamicProperties());
         String archiveDir = 
dynamicProperties.get(JobManagerOptions.ARCHIVE_DIR.key());
         if (archiveDir != null) {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 647aa1515..ca1327927 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -469,7 +469,7 @@ trait FlinkClientTrait extends Logger {
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
     val programArgs = new ArrayBuffer[String]()
-    programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)
+    programArgs ++= 
FlinkConfigurationUtils.extractArguments(submitRequest.args)
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
 


Reply via email to