This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4ab0db57d [Improve] Compatibility improvement for parsing Flink config
file (#4264)
4ab0db57d is described below
commit 4ab0db57d4f51954aa465a7dda09bac0f96a9347
Author: benjobs <[email protected]>
AuthorDate: Sun Jun 22 09:35:12 2025 +0800
[Improve] Compatibility improvement for parsing Flink config file (#4264)
---
.../common/util/FlinkConfigurationUtils.scala | 203 ++++++++++++++++
.../streampark/common/util/PropertiesUtils.scala | 260 ++-------------------
.../common/util/SparkConfigurationUtils.scala | 68 ++++++
.../common/util/PropertiesUtilsTestCase.scala | 4 +-
.../console/core/entity/FlinkCluster.java | 6 +-
.../streampark/console/core/entity/FlinkEnv.java | 23 +-
.../streampark/console/core/entity/SparkEnv.java | 39 ----
.../console/core/runner/QuickStartRunner.java | 5 +-
.../impl/FlinkApplicationActionServiceImpl.java | 4 +-
.../impl/SparkApplicationActionServiceImpl.java | 6 +-
.../service/impl/FlinkSavepointServiceImpl.java | 7 +-
.../core/watcher/FlinkK8sWatcherWrapper.java | 4 +-
.../flink/client/trait/FlinkClientTrait.scala | 2 +-
13 files changed, 324 insertions(+), 307 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala
new file mode 100644
index 000000000..f43c0a0b8
--- /dev/null
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util
+
+import org.apache.streampark.common.util.Implicits._
+
+import org.apache.commons.lang3.StringUtils
+
+import javax.annotation.Nonnull
+
+import java.io.File
+import java.util.Scanner
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+object FlinkConfigurationUtils extends Logger {
+
+ private[this] val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
+
+ private[this] val MULTI_PROPERTY_REGEXP =
"-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
+
+ private[this] val MULTI_PROPERTY_PATTERN =
Pattern.compile(MULTI_PROPERTY_REGEXP)
+
+ /**
+ * @param file
+ * @return
+ */
+ def loadFlinkConf(file: File): JavaMap[String, String] = {
+ AssertUtils.required(
+ file != null && file.exists() && file.isFile,
+ "[StreamPark] loadFlinkConfYaml: file must not be null")
+ loadFlinkConf(org.apache.commons.io.FileUtils.readFileToString(file))
+ }
+
+ def loadFlinkConf(yaml: String): JavaMap[String, String] = {
+ AssertUtils.required(yaml != null && yaml.nonEmpty, "[StreamPark]
loadFlinkConfYaml: yaml must not be null")
+ PropertiesUtils.fromYamlText(yaml)
+ }
+
+ def loadLegacyFlinkConf(file: File): JavaMap[String, String] = {
+ AssertUtils.required(
+ file != null && file.exists() && file.isFile,
+ "[StreamPark] loadFlinkConfYaml: file must not be null")
+ loadLegacyFlinkConf(org.apache.commons.io.FileUtils.readFileToString(file))
+ }
+
+ def loadLegacyFlinkConf(yaml: String): JavaMap[String, String] = {
+ AssertUtils.required(yaml != null && yaml.nonEmpty, "[StreamPark]
loadFlinkConfYaml: yaml must not be null")
+ val flinkConf = new JavaHashMap[String, String]()
+ val scanner: Scanner = new Scanner(yaml)
+ val lineNo: AtomicInteger = new AtomicInteger(0)
+ while (scanner.hasNextLine) {
+ val line = scanner.nextLine()
+ lineNo.incrementAndGet()
+ // 1. check for comments
+ // [FLINK-27299] flink parsing parameter bug fixed.
+ val comments = line.split("^#|\\s+#", 2)
+ val conf = comments(0).trim
+ // 2. get key and value
+ if (conf.nonEmpty) {
+ val kv = conf.split(": ", 2)
+ // skip line with no valid key-value pair
+ if (kv.length == 2) {
+ val key = kv(0).trim
+ val value = kv(1).trim
+ // sanity check
+ if (key.nonEmpty && value.nonEmpty) {
+ flinkConf += key -> value
+ } else {
+ logWarn(s"Error after splitting key and value in configuration
${lineNo.get()}: $line")
+ }
+ } else {
+ logWarn(s"Error while trying to split key and value in
configuration. $lineNo : $line")
+ }
+ }
+ }
+ flinkConf
+ }
+
+ /** extract flink configuration from application.properties */
+ @Nonnull def extractDynamicProperties(properties: String): Map[String,
String] = {
+ if (StringUtils.isEmpty(properties)) Map.empty[String, String]
+ else {
+ val map = mutable.Map[String, String]()
+ val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
+ simple.split("\\s?-D") match {
+ case d if Utils.isNotEmpty(d) =>
+ d.foreach(x => {
+ if (x.nonEmpty) {
+ val p = PROPERTY_PATTERN.matcher(x.trim)
+ if (p.matches) {
+ map += p.group(1).trim -> p.group(2).trim
+ }
+ }
+ })
+ case _ =>
+ }
+ val matcher = MULTI_PROPERTY_PATTERN.matcher(properties)
+ while (matcher.find()) {
+ val opts = matcher.group()
+ val index = opts.indexOf("=")
+ val key = opts.substring(2, index).trim
+ val value =
+ opts.substring(index + 1).trim.replaceAll("(^[\"|']|[\"|']$)", "")
+ map += key -> value
+ }
+ map.toMap
+ }
+ }
+
+ @Nonnull def extractArguments(args: String): List[String] = {
+ val programArgs = new ArrayBuffer[String]()
+ if (StringUtils.isNotEmpty(args)) {
+ return extractArguments(args.split("\\s+"))
+ }
+ programArgs.toList
+ }
+
+ def extractArguments(array: Array[String]): List[String] = {
+ val programArgs = new ArrayBuffer[String]()
+ val iter = array.iterator
+ while (iter.hasNext) {
+ val v = iter.next()
+ val p = v.take(1)
+ p match {
+ case "'" | "\"" =>
+ var value = v
+ if (!v.endsWith(p)) {
+ while (!value.endsWith(p) && iter.hasNext) {
+ value += s" ${iter.next()}"
+ }
+ }
+ programArgs += value.substring(1, value.length - 1)
+ case _ =>
+ val regexp = "(.*)='(.*)'$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ val regexp = "(.*)=\"(.*)\"$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ programArgs += v
+ }
+ }
+ }
+ }
+ programArgs.toList
+ }
+
+ def extractMultipleArguments(array: Array[String]): Map[String, Map[String,
String]] = {
+ val iter = array.iterator
+ val map = mutable.Map[String, mutable.Map[String, String]]()
+ while (iter.hasNext) {
+ val v = iter.next()
+ v.take(2) match {
+ case "--" =>
+ val kv = iter.next()
+ val regexp = "(.*)=(.*)"
+ if (kv.matches(regexp)) {
+ val values = kv.split("=")
+ val k1 = values(0).trim
+ val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
+ val k = v.drop(2)
+ map.get(k) match {
+ case Some(m) => m += k1 -> v1
+ case _ => map += k -> mutable.Map(k1 -> v1)
+ }
+ }
+ case _ =>
+ }
+ }
+ map.map(x => x._1 -> x._2.toMap).toMap
+ }
+
+ @Nonnull def extractDynamicPropertiesAsJava(properties: String):
JavaMap[String, String] =
+ new JavaHashMap[String, String](extractDynamicProperties(properties))
+
+ @Nonnull def extractMultipleArgumentsAsJava(args: Array[String]):
JavaMap[String, JavaMap[String, String]] = {
+ val map =
+ extractMultipleArguments(args).map(c => c._1 -> new JavaHashMap[String,
String](c._2))
+ new JavaHashMap[String, JavaMap[String, String]](map)
+ }
+
+}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f7f68ae45..32843a94f 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -23,30 +23,13 @@ import com.typesafe.config.ConfigFactory
import org.apache.commons.lang3.StringUtils
import org.yaml.snakeyaml.Yaml
-import javax.annotation.Nonnull
-
import java.io._
import java.util.{Properties, Scanner}
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.regex.Pattern
import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
object PropertiesUtils extends Logger {
- private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
-
- private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN =
Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")
-
- // scalastyle:off
- private[this] lazy val SPARK_ARGUMENT_REGEXP =
"\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
- // scalastyle:on
-
- private[this] lazy val MULTI_PROPERTY_REGEXP =
"-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
-
- private[this] lazy val MULTI_PROPERTY_PATTERN =
Pattern.compile(MULTI_PROPERTY_REGEXP)
-
def readFile(filename: String): String = {
val file = new File(filename)
require(file.exists(), s"[StreamPark] readFile: file $file does not exist")
@@ -60,41 +43,10 @@ object PropertiesUtils extends Logger {
buffer.toString()
}
- private[this] def eachYamlItem(
- k: String,
- v: Any,
- prefix: String = "",
- proper: MutableMap[String, String] = MutableMap[String, String]()):
Map[String, String] = {
- v match {
- case map: JavaLinkedMap[_, _] =>
- map
- .flatMap(x => {
- prefix match {
- case "" => eachYamlItem(x._1.toString, x._2, k, proper)
- case other =>
- eachYamlItem(x._1.toString, x._2, s"$other.$k", proper)
- }
- })
- .toMap
- case text =>
- if (text != null) {
- val value = text.toString.trim
- prefix match {
- case "" => proper += k -> value
- case other => proper += s"$other.$k" -> value
- }
- }
- proper.toMap
- }
- }
-
def fromYamlText(text: String): Map[String, String] = {
try {
- new Yaml()
- .load(text)
- .asInstanceOf[java.util.Map[String, Map[String, Any]]]
- .flatMap(x => eachYamlItem(x._1, x._2))
- .toMap
+ val map = new Yaml().load[JavaMap[String, Object]](text)
+ flatten(map)
} catch {
case e: IOException =>
throw new IllegalArgumentException(s"Failed when loading conf error:",
e)
@@ -149,15 +101,12 @@ object PropertiesUtils extends Logger {
/** Load Yaml present in the given file. */
def fromYamlFile(inputStream: InputStream): Map[String, String] = {
- require(
+ AssertUtils.required(
inputStream != null,
s"[StreamPark] fromYamlFile: Properties inputStream must not be null")
try {
- new Yaml()
- .load(inputStream)
- .asInstanceOf[java.util.Map[String, Map[String, Any]]]
- .flatMap(x => eachYamlItem(x._1, x._2))
- .toMap
+ val map = new Yaml().load[JavaMap[String, Object]](inputStream)
+ flatten(map)
} catch {
case e: IOException =>
throw new IllegalArgumentException(s"Failed when loading yaml from
inputStream", e)
@@ -236,195 +185,16 @@ object PropertiesUtils extends Logger {
def fromPropertiesFileAsJava(inputStream: InputStream): JavaMap[String,
String] =
new JavaHashMap[String, String](fromPropertiesFile(inputStream))
- /**
- * @param file
- * @return
- */
- def loadFlinkConfYaml(file: File): JavaMap[String, String] = {
- require(
- file != null && file.exists() && file.isFile,
- "[StreamPark] loadFlinkConfYaml: file must not be null")
- loadFlinkConfYaml(org.apache.commons.io.FileUtils.readFileToString(file))
- }
-
- def loadFlinkConfYaml(yaml: String): JavaMap[String, String] = {
- require(yaml != null && yaml.nonEmpty, "[StreamPark] loadFlinkConfYaml:
yaml must not be null")
- val flinkConf = new JavaHashMap[String, String]()
- val scanner: Scanner = new Scanner(yaml)
- val lineNo: AtomicInteger = new AtomicInteger(0)
- while (scanner.hasNextLine) {
- val line = scanner.nextLine()
- lineNo.incrementAndGet()
- // 1. check for comments
- // [FLINK-27299] flink parsing parameter bug fixed.
- val comments = line.split("^#|\\s+#", 2)
- val conf = comments(0).trim
- // 2. get key and value
- if (conf.nonEmpty) {
- val kv = conf.split(": ", 2)
- // skip line with no valid key-value pair
- if (kv.length == 2) {
- val key = kv(0).trim
- val value = kv(1).trim
- // sanity check
- if (key.nonEmpty && value.nonEmpty) {
- flinkConf += key -> value
- } else {
- logWarn(s"Error after splitting key and value in configuration
${lineNo.get()}: $line")
- }
- } else {
- logWarn(s"Error while trying to split key and value in
configuration. $lineNo : $line")
- }
- }
- }
- flinkConf
- }
-
- /** extract flink configuration from application.properties */
- @Nonnull def extractDynamicProperties(properties: String): Map[String,
String] = {
- if (StringUtils.isEmpty(properties)) Map.empty[String, String]
- else {
- val map = mutable.Map[String, String]()
- val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
- simple.split("\\s?-D") match {
- case d if Utils.isNotEmpty(d) =>
- d.foreach(x => {
- if (x.nonEmpty) {
- val p = PROPERTY_PATTERN.matcher(x.trim)
- if (p.matches) {
- map += p.group(1).trim -> p.group(2).trim
- }
- }
- })
- case _ =>
- }
- val matcher = MULTI_PROPERTY_PATTERN.matcher(properties)
- while (matcher.find()) {
- val opts = matcher.group()
- val index = opts.indexOf("=")
- val key = opts.substring(2, index).trim
- val value =
- opts.substring(index + 1).trim.replaceAll("(^[\"|']|[\"|']$)", "")
- map += key -> value
- }
- map.toMap
- }
- }
-
- @Nonnull def extractArguments(args: String): List[String] = {
- val programArgs = new ArrayBuffer[String]()
- if (StringUtils.isNotEmpty(args)) {
- return extractArguments(args.split("\\s+"))
- }
- programArgs.toList
- }
-
- def extractArguments(array: Array[String]): List[String] = {
- val programArgs = new ArrayBuffer[String]()
- val iter = array.iterator
- while (iter.hasNext) {
- val v = iter.next()
- val p = v.take(1)
- p match {
- case "'" | "\"" =>
- var value = v
- if (!v.endsWith(p)) {
- while (!value.endsWith(p) && iter.hasNext) {
- value += s" ${iter.next()}"
- }
- }
- programArgs += value.substring(1, value.length - 1)
- case _ =>
- val regexp = "(.*)='(.*)'$"
- if (v.matches(regexp)) {
- programArgs += v.replaceAll(regexp, "$1=$2")
- } else {
- val regexp = "(.*)=\"(.*)\"$"
- if (v.matches(regexp)) {
- programArgs += v.replaceAll(regexp, "$1=$2")
- } else {
- programArgs += v
- }
- }
- }
- }
- programArgs.toList
- }
-
- def extractMultipleArguments(array: Array[String]): Map[String, Map[String,
String]] = {
- val iter = array.iterator
- val map = mutable.Map[String, mutable.Map[String, String]]()
- while (iter.hasNext) {
- val v = iter.next()
- v.take(2) match {
- case "--" =>
- val kv = iter.next()
- val regexp = "(.*)=(.*)"
- if (kv.matches(regexp)) {
- val values = kv.split("=")
- val k1 = values(0).trim
- val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
- val k = v.drop(2)
- map.get(k) match {
- case Some(m) => m += k1 -> v1
- case _ => map += k -> mutable.Map(k1 -> v1)
- }
- }
- case _ =>
- }
- }
- map.map(x => x._1 -> x._2.toMap).toMap
- }
-
- @Nonnull def extractDynamicPropertiesAsJava(properties: String):
JavaMap[String, String] =
- new JavaHashMap[String, String](extractDynamicProperties(properties))
-
- @Nonnull def extractMultipleArgumentsAsJava(
- args: Array[String]): JavaMap[String, JavaMap[String, String]] = {
- val map =
- extractMultipleArguments(args).map(c => c._1 -> new JavaHashMap[String,
String](c._2))
- new JavaHashMap[String, JavaMap[String, String]](map)
- }
-
- /** extract spark configuration from sparkApplication.appProperties */
- @Nonnull def extractSparkPropertiesAsJava(properties: String):
JavaMap[String, String] =
- new JavaHashMap[String, String](extractSparkProperties(properties))
-
- @Nonnull def extractSparkProperties(properties: String): Map[String, String]
= {
- if (StringUtils.isEmpty(properties)) Map.empty[String, String]
- else {
- val map = mutable.Map[String, String]()
- properties.split("(\\s)*(--conf|-c)(\\s)+") match {
- case d if Utils.isNotEmpty(d) =>
- d.foreach(x => {
- if (x.nonEmpty) {
- val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
- if (p.matches) {
- map += p.group(1).trim -> p.group(2).trim
- }
- }
- })
- case _ =>
- }
- map.toMap
- }
+ private[this] def flatten(map: JavaMap[String, Object], prefix: String =
""): Map[String, String] = {
+ map.asScala.flatMap {
+ case (k, v: JavaMap[String, Object] @unchecked) => flatten(v,
s"$prefix$k.")
+ case (k, v: String) =>
+ if (StringUtils.isBlank(v)) Map.empty[String, String] else
Map(s"$prefix$k" -> v)
+ case (k, v: JavaCollection[_]) =>
+ if (v.isEmpty) Map.empty[String, String] else Map(s"$prefix$k" ->
v.toString)
+ case (k, v) =>
+ if (v == null) Map.empty[String, String] else Map(s"$prefix$k" ->
v.toString)
+ }.toMap
}
- /** extract spark configuration from sparkApplication.appArgs */
- @Nonnull def extractSparkArgumentsAsJava(arguments: String):
JavaList[String] = {
- val list = new JavaArrayList[String]()
- if (StringUtils.isEmpty(arguments)) list
- else {
- arguments.split(SPARK_ARGUMENT_REGEXP) match {
- case d if Utils.isNotEmpty(d) =>
- d.foreach(x => {
- if (x.nonEmpty) {
- list.add(x)
- }
- })
- case _ =>
- }
- list
- }
- }
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala
new file mode 100644
index 000000000..281914e8b
--- /dev/null
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util
+
+import org.apache.streampark.common.util.Implicits._
+
+import com.google.common.collect.Lists
+import org.apache.commons.lang3.StringUtils
+
+import javax.annotation.Nonnull
+
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+
+object SparkConfigurationUtils extends Logger {
+
+ private[this] val SPARK_PROPERTY_COMPLEX_PATTERN =
Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")
+
+ // scalastyle:off
+ private[this] val SPARK_ARGUMENT_REGEXP =
"\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
+ // scalastyle:on
+
+ /** extract spark configuration from sparkApplication.appProperties */
+ @Nonnull def extractPropertiesAsJava(properties: String): JavaMap[String,
String] =
+ new JavaHashMap[String, String](extractProperties(properties))
+
+ @Nonnull def extractProperties(properties: String): Map[String, String] = {
+ if (StringUtils.isEmpty(properties)) {
+ Map.empty[String, String]
+ } else {
+ val map = mutable.Map[String, String]()
+ properties.split("(\\s)*(--conf|-c)(\\s)+").filter(Utils.isNotEmpty(_))
+ .foreach(x =>
+ if (x.nonEmpty) {
+ val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
+ if (p.matches) {
+ map += p.group(1).trim -> p.group(2).trim
+ }
+ })
+ map.toMap
+ }
+ }
+
+ /** extract spark configuration from sparkApplication.appArgs */
+ @Nonnull def extractArgumentsAsJava(arguments: String): JavaList[String] = {
+ if (StringUtils.isEmpty(arguments)) {
+ Lists.newArrayList()
+ } else {
+ arguments.split(SPARK_ARGUMENT_REGEXP).filter(_.nonEmpty).toList.asJava
+ }
+ }
+}
diff --git
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 20aadc6c1..40f9a5afc 100644
---
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -39,7 +39,7 @@ class PropertiesUtilsTestCase {
"--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 " +
"--sink-conf sink.label-prefix=label" +
"--table-conf replication_num=1"
- val programArgs = PropertiesUtils.extractArguments(args)
+ val programArgs = FlinkConfigurationUtils.extractArguments(args)
Assertions.assertTrue(programArgs.contains("username=root"))
}
@@ -56,7 +56,7 @@ class PropertiesUtilsTestCase {
|
|""".stripMargin
- val map = PropertiesUtils.extractDynamicProperties(dynamicProperties)
+ val map =
FlinkConfigurationUtils.extractDynamicProperties(dynamicProperties)
Assertions.assertEquals(map("env.java.opts1"), "-Dfile.encoding=UTF-8")
Assertions.assertEquals(map("env.java.opts2"), "-Dfile.enc\\\"oding=UTF-8")
Assertions.assertEquals(map("env.java.opts3"), " -Dfile.encoding=UTF-8")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 800a6e2b2..b50747ce0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -22,8 +22,8 @@ import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDeployMode;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.ResolveOrder;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
import org.apache.streampark.common.util.HttpClientUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.util.YarnQueueLabelExpression;
@@ -180,8 +180,8 @@ public class FlinkCluster implements Serializable {
@JsonIgnore
public Map<String, Object> getProperties() {
Map<String, Object> propertyMap = new HashMap<>();
- Map<String, String> dynamicPropertyMap = PropertiesUtils
- .extractDynamicPropertiesAsJava(this.getDynamicProperties());
+ Map<String, String> dynamicPropertyMap =
+
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(this.getDynamicProperties());
propertyMap.putAll(this.getOptionMap());
propertyMap.putAll(dynamicPropertyMap);
ResolveOrder resolveOrder = ResolveOrder.of(this.getResolveOrder());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index b700bdf67..7b13727a9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -19,7 +19,7 @@ package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
@@ -76,9 +76,8 @@ public class FlinkEnv implements Serializable {
private transient String streamParkScalaVersion =
scala.util.Properties.versionNumberString();
public void doSetFlinkConf() throws ApiDetailException {
-
File yaml;
- float ver =
Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
+ Float ver = getVersionNumber();
if (ver < 1.19f) {
yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
if (!yaml.exists()) {
@@ -119,7 +118,10 @@ public class FlinkEnv implements Serializable {
public Map<String, String> convertFlinkYamlAsMap() {
String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
- return PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+ if (isLegacyFlinkConf()) {
+ return
FlinkConfigurationUtils.loadLegacyFlinkConf(flinkYamlString);
+ }
+ return FlinkConfigurationUtils.loadFlinkConf(flinkYamlString);
}
@JsonIgnore
@@ -166,8 +168,19 @@ public class FlinkEnv implements Serializable {
public Properties getFlinkConfig() {
String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
Properties flinkConfig = new Properties();
- Map<String, String> config =
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+ Map<String, String> config =
FlinkConfigurationUtils.loadLegacyFlinkConf(flinkYamlString);
flinkConfig.putAll(config);
return flinkConfig;
}
+
+ private Float getVersionNumber() {
+ if (StringUtils.isNotBlank(this.version)) {
+ return Float.parseFloat(getVersionOfFirst() + "." +
getVersionOfMiddle());
+ }
+ throw new RuntimeException("Flink version is null");
+ }
+
+ public boolean isLegacyFlinkConf() {
+ return getVersionNumber() < 1.19f;
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java
index e5c8bfd7c..dff43e0e8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java
@@ -19,11 +19,9 @@ package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.conf.SparkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
@@ -36,8 +34,6 @@ import java.io.File;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
@Getter
@Setter
@@ -91,14 +87,6 @@ public class SparkEnv implements Serializable {
this.setScalaVersion(this.getSparkVersion().scalaVersion());
}
- public Map<String, String> convertSparkYamlAsMap() {
- if (sparkConf == null) {
- return new HashMap<>();
- }
- String sparkYamlString = DeflaterUtils.unzipString(sparkConf);
- return PropertiesUtils.loadFlinkConfYaml(sparkYamlString);
- }
-
@JsonIgnore
public SparkVersion getSparkVersion() {
if (this.sparkVersion == null) {
@@ -113,31 +101,4 @@ public class SparkEnv implements Serializable {
}
}
- public String getLargeVersion() {
- if (StringUtils.isNotBlank(this.version)) {
- return this.version.substring(0, this.version.lastIndexOf("."));
- }
- return null;
- }
-
- public String getVersionOfFirst() {
- if (StringUtils.isNotBlank(this.version)) {
- return this.version.split("\\.")[0];
- }
- return null;
- }
-
- public String getVersionOfMiddle() {
- if (StringUtils.isNotBlank(this.version)) {
- return this.version.split("\\.")[1];
- }
- return null;
- }
-
- public String getVersionOfLast() {
- if (StringUtils.isNotBlank(this.version)) {
- return this.version.split("\\.")[2];
- }
- return null;
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
index 554cbc4e4..0b3aab314 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
@@ -19,7 +19,7 @@ package org.apache.streampark.console.core.runner;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDeployMode;
-import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -64,7 +64,8 @@ public class QuickStartRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
- Map<String, Map<String, String>> map =
PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs());
+ Map<String, Map<String, String>> map =
+
FlinkConfigurationUtils.extractMultipleArgumentsAsJava(args.getSourceArgs());
Map<String, String> quickstart = map.get("quickstart");
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
index 4605deb99..df86e4215 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
@@ -31,8 +31,8 @@ import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
import org.apache.streampark.common.util.HadoopUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.base.util.Tuple2;
@@ -805,7 +805,7 @@ public class FlinkApplicationActionServiceImpl
}
Map<String, String> dynamicProperties =
- PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties);
+
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(runtimeProperties);
properties.putAll(dynamicProperties);
ResolveOrder resolveOrder =
ResolveOrder.of(application.getResolveOrder());
if (resolveOrder != null) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 18ce26cbe..8f4038090 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -28,7 +28,7 @@ import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.SparkConfigurationUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.core.entity.ApplicationBuildPipeline;
@@ -322,8 +322,8 @@ public class SparkApplicationActionServiceImpl
application.getAppName(),
application.getMainClass(),
appConf,
-
PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()),
- PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs),
+
SparkConfigurationUtils.extractPropertiesAsJava(application.getAppProperties()),
+ SparkConfigurationUtils.extractArgumentsAsJava(applicationArgs),
application.getApplicationType(),
application.getHadoopUser(),
buildResult,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
index 523d862f4..5420c8167 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.enums.FlinkDeployMode;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.ExceptionUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.InternalException;
@@ -80,7 +81,6 @@ import java.util.concurrent.TimeoutException;
import static
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
import static
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
-import static
org.apache.streampark.common.util.PropertiesUtils.extractDynamicPropertiesAsJava;
import static
org.apache.streampark.console.core.enums.CheckPointTypeEnum.CHECKPOINT;
@Slf4j
@@ -328,7 +328,7 @@ public class FlinkSavepointServiceImpl extends
ServiceImpl<FlinkSavepointMapper,
@VisibleForTesting
@Nullable
public String getSavepointFromDynamicProps(String dynamicProps) {
- return
extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key());
+ return
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key());
}
/**
@@ -388,7 +388,8 @@ public class FlinkSavepointServiceImpl extends
ServiceImpl<FlinkSavepointMapper,
* Try get the 'state.checkpoints.num-retained' from the dynamic
properties.
*/
private Optional<Integer> tryGetChkNumRetainedFromDynamicProps(String
dynamicProps) {
- String rawCfgValue =
extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
+ String rawCfgValue =
+
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
if (StringUtils.isBlank(rawCfgValue)) {
return Optional.empty();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java
index 5979c489f..4e4f09128 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.watcher;
import org.apache.streampark.common.enums.FlinkDeployMode;
-import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.FlinkConfigurationUtils;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -128,7 +128,7 @@ public class FlinkK8sWatcherWrapper {
FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
Properties properties = flinkEnv.getFlinkConfig();
- Map<String, String> dynamicProperties = PropertiesUtils
+ Map<String, String> dynamicProperties = FlinkConfigurationUtils
.extractDynamicPropertiesAsJava(app.getDynamicProperties());
String archiveDir =
dynamicProperties.get(JobManagerOptions.ARCHIVE_DIR.key());
if (archiveDir != null) {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 647aa1515..ca1327927 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -469,7 +469,7 @@ trait FlinkClientTrait extends Logger {
private[this] def extractProgramArgs(submitRequest: SubmitRequest):
JavaList[String] = {
val programArgs = new ArrayBuffer[String]()
- programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)
+ programArgs ++=
FlinkConfigurationUtils.extractArguments(submitRequest.args)
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {