This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch flink-new-conf in repository https://gitbox.apache.org/repos/asf/streampark.git
commit f6bd4953691cc65fc64f79e57f18a0b0d4092c28 Author: benjobs <[email protected]> AuthorDate: Sun Jun 22 02:55:03 2025 +0800 [Improve] Compatibility improvement for parsing Flink config file --- .../common/util/FlinkConfigurationUtils.scala | 203 ++++++++++++++++ .../streampark/common/util/PropertiesUtils.scala | 260 ++------------------- .../common/util/SparkConfigurationUtils.scala | 68 ++++++ .../common/util/PropertiesUtilsTestCase.scala | 4 +- .../console/core/entity/FlinkCluster.java | 6 +- .../streampark/console/core/entity/FlinkEnv.java | 23 +- .../streampark/console/core/entity/SparkEnv.java | 39 ---- .../console/core/runner/QuickStartRunner.java | 5 +- .../impl/FlinkApplicationActionServiceImpl.java | 4 +- .../impl/SparkApplicationActionServiceImpl.java | 6 +- .../service/impl/FlinkSavepointServiceImpl.java | 7 +- .../core/watcher/FlinkK8sWatcherWrapper.java | 4 +- .../flink/client/trait/FlinkClientTrait.scala | 2 +- 13 files changed, 324 insertions(+), 307 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala new file mode 100644 index 000000000..f43c0a0b8 --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util + +import org.apache.streampark.common.util.Implicits._ + +import org.apache.commons.lang3.StringUtils + +import javax.annotation.Nonnull + +import java.io.File +import java.util.Scanner +import java.util.concurrent.atomic.AtomicInteger +import java.util.regex.Pattern + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +object FlinkConfigurationUtils extends Logger { + + private[this] val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)") + + private[this] val MULTI_PROPERTY_REGEXP = "-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']" + + private[this] val MULTI_PROPERTY_PATTERN = Pattern.compile(MULTI_PROPERTY_REGEXP) + + /** + * @param file + * @return + */ + def loadFlinkConf(file: File): JavaMap[String, String] = { + AssertUtils.required( + file != null && file.exists() && file.isFile, + "[StreamPark] loadFlinkConfYaml: file must not be null") + loadFlinkConf(org.apache.commons.io.FileUtils.readFileToString(file)) + } + + def loadFlinkConf(yaml: String): JavaMap[String, String] = { + AssertUtils.required(yaml != null && yaml.nonEmpty, "[StreamPark] loadFlinkConfYaml: yaml must not be null") + PropertiesUtils.fromYamlText(yaml) + } + + def loadLegacyFlinkConf(file: File): JavaMap[String, String] = { + AssertUtils.required( + file != null && file.exists() && file.isFile, + "[StreamPark] loadFlinkConfYaml: file must not be null") + loadLegacyFlinkConf(org.apache.commons.io.FileUtils.readFileToString(file)) + } + + def loadLegacyFlinkConf(yaml: String): JavaMap[String, String] = { + AssertUtils.required(yaml != null && yaml.nonEmpty, "[StreamPark] loadFlinkConfYaml: yaml must not be null") + val flinkConf = new JavaHashMap[String, String]() + val scanner: Scanner = new Scanner(yaml) + val lineNo: AtomicInteger = new AtomicInteger(0) + while (scanner.hasNextLine) { + val line = scanner.nextLine() + lineNo.incrementAndGet() + // 1. check for comments + // [FLINK-27299] flink parsing parameter bug fixed. + val comments = line.split("^#|\\s+#", 2) + val conf = comments(0).trim + // 2. get key and value + if (conf.nonEmpty) { + val kv = conf.split(": ", 2) + // skip line with no valid key-value pair + if (kv.length == 2) { + val key = kv(0).trim + val value = kv(1).trim + // sanity check + if (key.nonEmpty && value.nonEmpty) { + flinkConf += key -> value + } else { + logWarn(s"Error after splitting key and value in configuration ${lineNo.get()}: $line") + } + } else { + logWarn(s"Error while trying to split key and value in configuration. $lineNo : $line") + } + } + } + flinkConf + } + + /** extract flink configuration from application.properties */ + @Nonnull def extractDynamicProperties(properties: String): Map[String, String] = { + if (StringUtils.isEmpty(properties)) Map.empty[String, String] + else { + val map = mutable.Map[String, String]() + val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "") + simple.split("\\s?-D") match { + case d if Utils.isNotEmpty(d) => + d.foreach(x => { + if (x.nonEmpty) { + val p = PROPERTY_PATTERN.matcher(x.trim) + if (p.matches) { + map += p.group(1).trim -> p.group(2).trim + } + } + }) + case _ => + } + val matcher = MULTI_PROPERTY_PATTERN.matcher(properties) + while (matcher.find()) { + val opts = matcher.group() + val index = opts.indexOf("=") + val key = opts.substring(2, index).trim + val value = + opts.substring(index + 1).trim.replaceAll("(^[\"|']|[\"|']$)", "") + map += key -> value + } + map.toMap + } + } + + @Nonnull def extractArguments(args: String): List[String] = { + val programArgs = new ArrayBuffer[String]() + if (StringUtils.isNotEmpty(args)) { + return extractArguments(args.split("\\s+")) + } + programArgs.toList + } + + def extractArguments(array: Array[String]): List[String] = { + val programArgs = new ArrayBuffer[String]() + val iter = array.iterator + while (iter.hasNext) { + val v = iter.next() + val p = v.take(1) + p match { + case "'" | "\"" => + var value = v + if (!v.endsWith(p)) { + while (!value.endsWith(p) && iter.hasNext) { + value += s" ${iter.next()}" + } + } + programArgs += value.substring(1, value.length - 1) + case _ => + val regexp = "(.*)='(.*)'$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + val regexp = "(.*)=\"(.*)\"$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + programArgs += v + } + } + } + } + programArgs.toList + } + + def extractMultipleArguments(array: Array[String]): Map[String, Map[String, String]] = { + val iter = array.iterator + val map = mutable.Map[String, mutable.Map[String, String]]() + while (iter.hasNext) { + val v = iter.next() + v.take(2) match { + case "--" => + val kv = iter.next() + val regexp = "(.*)=(.*)" + if (kv.matches(regexp)) { + val values = kv.split("=") + val k1 = values(0).trim + val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "") + val k = v.drop(2) + map.get(k) match { + case Some(m) => m += k1 -> v1 + case _ => map += k -> mutable.Map(k1 -> v1) + } + } + case _ => + } + } + map.map(x => x._1 -> x._2.toMap).toMap + } + + @Nonnull def extractDynamicPropertiesAsJava(properties: String): JavaMap[String, String] = + new JavaHashMap[String, String](extractDynamicProperties(properties)) + + @Nonnull def extractMultipleArgumentsAsJava(args: Array[String]): JavaMap[String, JavaMap[String, String]] = { + val map = + extractMultipleArguments(args).map(c => c._1 -> new JavaHashMap[String, String](c._2)) + new JavaHashMap[String, JavaMap[String, String]](map) + } + +} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index f7f68ae45..32843a94f 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -23,30 +23,13 @@ import com.typesafe.config.ConfigFactory import org.apache.commons.lang3.StringUtils import org.yaml.snakeyaml.Yaml -import javax.annotation.Nonnull - import java.io._ import java.util.{Properties, Scanner} -import java.util.concurrent.atomic.AtomicInteger -import java.util.regex.Pattern import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} object PropertiesUtils extends Logger { - private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)") - - private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN = Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$") - - // scalastyle:off - private[this] lazy val SPARK_ARGUMENT_REGEXP = "\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?" - // scalastyle:on - - private[this] lazy val MULTI_PROPERTY_REGEXP = "-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']" - - private[this] lazy val MULTI_PROPERTY_PATTERN = Pattern.compile(MULTI_PROPERTY_REGEXP) - def readFile(filename: String): String = { val file = new File(filename) require(file.exists(), s"[StreamPark] readFile: file $file does not exist") @@ -60,41 +43,10 @@ object PropertiesUtils extends Logger { buffer.toString() } - private[this] def eachYamlItem( - k: String, - v: Any, - prefix: String = "", - proper: MutableMap[String, String] = MutableMap[String, String]()): Map[String, String] = { - v match { - case map: JavaLinkedMap[_, _] => - map - .flatMap(x => { - prefix match { - case "" => eachYamlItem(x._1.toString, x._2, k, proper) - case other => - eachYamlItem(x._1.toString, x._2, s"$other.$k", proper) - } - }) - .toMap - case text => - if (text != null) { - val value = text.toString.trim - prefix match { - case "" => proper += k -> value - case other => proper += s"$other.$k" -> value - } - } - proper.toMap - } - } - def fromYamlText(text: String): Map[String, String] = { try { - new Yaml() - .load(text) - .asInstanceOf[java.util.Map[String, Map[String, Any]]] - .flatMap(x => eachYamlItem(x._1, x._2)) - .toMap + val map = new Yaml().load[JavaMap[String, Object]](text) + flatten(map) } catch { case e: IOException => throw new IllegalArgumentException(s"Failed when loading conf error:", e) @@ -149,15 +101,12 @@ object PropertiesUtils extends Logger { /** Load Yaml present in the given file. */ def fromYamlFile(inputStream: InputStream): Map[String, String] = { - require( + AssertUtils.required( inputStream != null, s"[StreamPark] fromYamlFile: Properties inputStream must not be null") try { - new Yaml() - .load(inputStream) - .asInstanceOf[java.util.Map[String, Map[String, Any]]] - .flatMap(x => eachYamlItem(x._1, x._2)) - .toMap + val map = new Yaml().load[JavaMap[String, Object]](inputStream) + flatten(map) } catch { case e: IOException => throw new IllegalArgumentException(s"Failed when loading yaml from inputStream", e) @@ -236,195 +185,16 @@ object PropertiesUtils extends Logger { def fromPropertiesFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaHashMap[String, String](fromPropertiesFile(inputStream)) - /** - * @param file - * @return - */ - def loadFlinkConfYaml(file: File): JavaMap[String, String] = { - require( - file != null && file.exists() && file.isFile, - "[StreamPark] loadFlinkConfYaml: file must not be null") - loadFlinkConfYaml(org.apache.commons.io.FileUtils.readFileToString(file)) - } - - def loadFlinkConfYaml(yaml: String): JavaMap[String, String] = { - require(yaml != null && yaml.nonEmpty, "[StreamPark] loadFlinkConfYaml: yaml must not be null") - val flinkConf = new JavaHashMap[String, String]() - val scanner: Scanner = new Scanner(yaml) - val lineNo: AtomicInteger = new AtomicInteger(0) - while (scanner.hasNextLine) { - val line = scanner.nextLine() - lineNo.incrementAndGet() - // 1. check for comments - // [FLINK-27299] flink parsing parameter bug fixed. - val comments = line.split("^#|\\s+#", 2) - val conf = comments(0).trim - // 2. get key and value - if (conf.nonEmpty) { - val kv = conf.split(": ", 2) - // skip line with no valid key-value pair - if (kv.length == 2) { - val key = kv(0).trim - val value = kv(1).trim - // sanity check - if (key.nonEmpty && value.nonEmpty) { - flinkConf += key -> value - } else { - logWarn(s"Error after splitting key and value in configuration ${lineNo.get()}: $line") - } - } else { - logWarn(s"Error while trying to split key and value in configuration. $lineNo : $line") - } - } - } - flinkConf - } - - /** extract flink configuration from application.properties */ - @Nonnull def extractDynamicProperties(properties: String): Map[String, String] = { - if (StringUtils.isEmpty(properties)) Map.empty[String, String] - else { - val map = mutable.Map[String, String]() - val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "") - simple.split("\\s?-D") match { - case d if Utils.isNotEmpty(d) => - d.foreach(x => { - if (x.nonEmpty) { - val p = PROPERTY_PATTERN.matcher(x.trim) - if (p.matches) { - map += p.group(1).trim -> p.group(2).trim - } - } - }) - case _ => - } - val matcher = MULTI_PROPERTY_PATTERN.matcher(properties) - while (matcher.find()) { - val opts = matcher.group() - val index = opts.indexOf("=") - val key = opts.substring(2, index).trim - val value = - opts.substring(index + 1).trim.replaceAll("(^[\"|']|[\"|']$)", "") - map += key -> value - } - map.toMap - } - } - - @Nonnull def extractArguments(args: String): List[String] = { - val programArgs = new ArrayBuffer[String]() - if (StringUtils.isNotEmpty(args)) { - return extractArguments(args.split("\\s+")) - } - programArgs.toList - } - - def extractArguments(array: Array[String]): List[String] = { - val programArgs = new ArrayBuffer[String]() - val iter = array.iterator - while (iter.hasNext) { - val v = iter.next() - val p = v.take(1) - p match { - case "'" | "\"" => - var value = v - if (!v.endsWith(p)) { - while (!value.endsWith(p) && iter.hasNext) { - value += s" ${iter.next()}" - } - } - programArgs += value.substring(1, value.length - 1) - case _ => - val regexp = "(.*)='(.*)'$" - if (v.matches(regexp)) { - programArgs += v.replaceAll(regexp, "$1=$2") - } else { - val regexp = "(.*)=\"(.*)\"$" - if (v.matches(regexp)) { - programArgs += v.replaceAll(regexp, "$1=$2") - } else { - programArgs += v - } - } - } - } - programArgs.toList - } - - def extractMultipleArguments(array: Array[String]): Map[String, Map[String, String]] = { - val iter = array.iterator - val map = mutable.Map[String, mutable.Map[String, String]]() - while (iter.hasNext) { - val v = iter.next() - v.take(2) match { - case "--" => - val kv = iter.next() - val regexp = "(.*)=(.*)" - if (kv.matches(regexp)) { - val values = kv.split("=") - val k1 = values(0).trim - val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "") - val k = v.drop(2) - map.get(k) match { - case Some(m) => m += k1 -> v1 - case _ => map += k -> mutable.Map(k1 -> v1) - } - } - case _ => - } - } - map.map(x => x._1 -> x._2.toMap).toMap - } - - @Nonnull def extractDynamicPropertiesAsJava(properties: String): JavaMap[String, String] = - new JavaHashMap[String, String](extractDynamicProperties(properties)) - - @Nonnull def extractMultipleArgumentsAsJava( - args: Array[String]): JavaMap[String, JavaMap[String, String]] = { - val map = - extractMultipleArguments(args).map(c => c._1 -> new JavaHashMap[String, String](c._2)) - new JavaHashMap[String, JavaMap[String, String]](map) - } - - /** extract spark configuration from sparkApplication.appProperties */ - @Nonnull def extractSparkPropertiesAsJava(properties: String): JavaMap[String, String] = - new JavaHashMap[String, String](extractSparkProperties(properties)) - - @Nonnull def extractSparkProperties(properties: String): Map[String, String] = { - if (StringUtils.isEmpty(properties)) Map.empty[String, String] - else { - val map = mutable.Map[String, String]() - properties.split("(\\s)*(--conf|-c)(\\s)+") match { - case d if Utils.isNotEmpty(d) => - d.foreach(x => { - if (x.nonEmpty) { - val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x) - if (p.matches) { - map += p.group(1).trim -> p.group(2).trim - } - } - }) - case _ => - } - map.toMap - } + private[this] def flatten(map: JavaMap[String, Object], prefix: String = ""): Map[String, String] = { + map.asScala.flatMap { + case (k, v: JavaMap[String, Object] @unchecked) => flatten(v, s"$prefix$k.") + case (k, v: String) => + if (StringUtils.isBlank(v)) Map.empty[String, String] else Map(s"$prefix$k" -> v) + case (k, v: JavaCollection[_]) => + if (v.isEmpty) Map.empty[String, String] else Map(s"$prefix$k" -> v.toString) + case (k, v) => + if (v == null) Map.empty[String, String] else Map(s"$prefix$k" -> v.toString) + }.toMap } - /** extract spark configuration from sparkApplication.appArgs */ - @Nonnull def extractSparkArgumentsAsJava(arguments: String): JavaList[String] = { - val list = new JavaArrayList[String]() - if (StringUtils.isEmpty(arguments)) list - else { - arguments.split(SPARK_ARGUMENT_REGEXP) match { - case d if Utils.isNotEmpty(d) => - d.foreach(x => { - if (x.nonEmpty) { - list.add(x) - } - }) - case _ => - } - list - } - } } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala new file mode 100644 index 000000000..281914e8b --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkConfigurationUtils.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util + +import org.apache.streampark.common.util.Implicits._ + +import com.google.common.collect.Lists +import org.apache.commons.lang3.StringUtils + +import javax.annotation.Nonnull + +import java.util.regex.Pattern + +import scala.collection.mutable + +object SparkConfigurationUtils extends Logger { + + private[this] val SPARK_PROPERTY_COMPLEX_PATTERN = Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$") + + // scalastyle:off + private[this] val SPARK_ARGUMENT_REGEXP = "\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?" + // scalastyle:on + + /** extract spark configuration from sparkApplication.appProperties */ + @Nonnull def extractPropertiesAsJava(properties: String): JavaMap[String, String] = + new JavaHashMap[String, String](extractProperties(properties)) + + @Nonnull def extractProperties(properties: String): Map[String, String] = { + if (StringUtils.isEmpty(properties)) { + Map.empty[String, String] + } else { + val map = mutable.Map[String, String]() + properties.split("(\\s)*(--conf|-c)(\\s)+").filter(Utils.isNotEmpty(_)) + .foreach(x => + if (x.nonEmpty) { + val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x) + if (p.matches) { + map += p.group(1).trim -> p.group(2).trim + } + }) + map.toMap + } + } + + /** extract spark configuration from sparkApplication.appArgs */ + @Nonnull def extractArgumentsAsJava(arguments: String): JavaList[String] = { + if (StringUtils.isEmpty(arguments)) { + Lists.newArrayList() + } else { + arguments.split(SPARK_ARGUMENT_REGEXP).filter(_.nonEmpty).toList.asJava + } + } +} diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala index 20aadc6c1..40f9a5afc 100644 --- a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala @@ -39,7 +39,7 @@ class PropertiesUtilsTestCase { "--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 " + "--sink-conf sink.label-prefix=label" + "--table-conf replication_num=1" - val programArgs = PropertiesUtils.extractArguments(args) + val programArgs = FlinkConfigurationUtils.extractArguments(args) Assertions.assertTrue(programArgs.contains("username=root")) } @@ -56,7 +56,7 @@ class PropertiesUtilsTestCase { | |""".stripMargin - val map = PropertiesUtils.extractDynamicProperties(dynamicProperties) + val map = FlinkConfigurationUtils.extractDynamicProperties(dynamicProperties) Assertions.assertEquals(map("env.java.opts1"), "-Dfile.encoding=UTF-8") Assertions.assertEquals(map("env.java.opts2"), "-Dfile.enc\\\"oding=UTF-8") Assertions.assertEquals(map("env.java.opts3"), " -Dfile.encoding=UTF-8") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index 800a6e2b2..b50747ce0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -22,8 +22,8 @@ import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkDeployMode; import org.apache.streampark.common.enums.FlinkK8sRestExposedType; import org.apache.streampark.common.enums.ResolveOrder; +import org.apache.streampark.common.util.FlinkConfigurationUtils; import org.apache.streampark.common.util.HttpClientUtils; -import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.core.util.YarnQueueLabelExpression; @@ -180,8 +180,8 @@ public class FlinkCluster implements Serializable { @JsonIgnore public Map<String, Object> getProperties() { Map<String, Object> propertyMap = new HashMap<>(); - Map<String, String> dynamicPropertyMap = PropertiesUtils - .extractDynamicPropertiesAsJava(this.getDynamicProperties()); + Map<String, String> dynamicPropertyMap = + FlinkConfigurationUtils.extractDynamicPropertiesAsJava(this.getDynamicProperties()); propertyMap.putAll(this.getOptionMap()); propertyMap.putAll(dynamicPropertyMap); ResolveOrder resolveOrder = ResolveOrder.of(this.getResolveOrder()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java index b700bdf67..7b13727a9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java @@ -19,7 +19,7 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.conf.FlinkVersion; import org.apache.streampark.common.util.DeflaterUtils; -import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.common.util.FlinkConfigurationUtils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApiDetailException; @@ -76,9 +76,8 @@ public class FlinkEnv implements Serializable { private transient String streamParkScalaVersion = scala.util.Properties.versionNumberString(); public void doSetFlinkConf() throws ApiDetailException { - File yaml; - float ver = Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle())); + Float ver = getVersionNumber(); if (ver < 1.19f) { yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); if (!yaml.exists()) { @@ -119,7 +118,10 @@ public class FlinkEnv implements Serializable { public Map<String, String> convertFlinkYamlAsMap() { String flinkYamlString = DeflaterUtils.unzipString(flinkConf); - return PropertiesUtils.loadFlinkConfYaml(flinkYamlString); + if (isLegacyFlinkConf()) { + return FlinkConfigurationUtils.loadLegacyFlinkConf(flinkYamlString); + } + return FlinkConfigurationUtils.loadFlinkConf(flinkYamlString); } @JsonIgnore @@ -166,8 +168,19 @@ public class FlinkEnv implements Serializable { public Properties getFlinkConfig() { String flinkYamlString = DeflaterUtils.unzipString(flinkConf); Properties flinkConfig = new Properties(); - Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(flinkYamlString); + Map<String, String> config = FlinkConfigurationUtils.loadLegacyFlinkConf(flinkYamlString); flinkConfig.putAll(config); return flinkConfig; } + + private Float getVersionNumber() { + if (StringUtils.isNotBlank(this.version)) { + return Float.parseFloat(getVersionOfFirst() + "." + getVersionOfMiddle()); + } + throw new RuntimeException("Flink version is null"); + } + + public boolean isLegacyFlinkConf() { + return getVersionNumber() < 1.19f; + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java index e5c8bfd7c..dff43e0e8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkEnv.java @@ -19,11 +19,9 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.conf.SparkVersion; import org.apache.streampark.common.util.DeflaterUtils; -import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.console.base.exception.ApiDetailException; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; @@ -36,8 +34,6 @@ import java.io.File; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Date; -import java.util.HashMap; -import java.util.Map; @Getter @Setter @@ -91,14 +87,6 @@ public class SparkEnv implements Serializable { this.setScalaVersion(this.getSparkVersion().scalaVersion()); } - public Map<String, String> convertSparkYamlAsMap() { - if (sparkConf == null) { - return new HashMap<>(); - } - String sparkYamlString = DeflaterUtils.unzipString(sparkConf); - return PropertiesUtils.loadFlinkConfYaml(sparkYamlString); - } - @JsonIgnore public SparkVersion getSparkVersion() { if (this.sparkVersion == null) { @@ -113,31 +101,4 @@ public class SparkEnv implements Serializable { } } - public String getLargeVersion() { - if (StringUtils.isNotBlank(this.version)) { - return this.version.substring(0, this.version.lastIndexOf(".")); - } - return null; - } - - public String getVersionOfFirst() { - if (StringUtils.isNotBlank(this.version)) { - return this.version.split("\\.")[0]; - } - return null; - } - - public String getVersionOfMiddle() { - if (StringUtils.isNotBlank(this.version)) { - return this.version.split("\\.")[1]; - } - return null; - } - - public String getVersionOfLast() { - if (StringUtils.isNotBlank(this.version)) { - return this.version.split("\\.")[2]; - } - return null; - } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java index 554cbc4e4..0b3aab314 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java @@ -19,7 +19,7 @@ package org.apache.streampark.console.core.runner; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkDeployMode; -import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.common.util.FlinkConfigurationUtils; import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.entity.FlinkEnv; @@ -64,7 +64,8 @@ public class QuickStartRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { - Map<String, Map<String, String>> map = PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs()); + Map<String, Map<String, String>> map = + FlinkConfigurationUtils.extractMultipleArgumentsAsJava(args.getSourceArgs()); Map<String, String> quickstart = map.get("quickstart"); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index 4605deb99..df86e4215 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -31,8 +31,8 @@ import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.ExceptionUtils; +import org.apache.streampark.common.util.FlinkConfigurationUtils; import org.apache.streampark.common.util.HadoopUtils; -import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApplicationException; import org.apache.streampark.console.base.util.Tuple2; @@ -805,7 +805,7 @@ public class FlinkApplicationActionServiceImpl } Map<String, String> dynamicProperties = - PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties); + FlinkConfigurationUtils.extractDynamicPropertiesAsJava(runtimeProperties); properties.putAll(dynamicProperties); ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder()); if (resolveOrder != null) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java index 18ce26cbe..8f4038090 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java @@ -28,7 +28,7 @@ import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.ExceptionUtils; import org.apache.streampark.common.util.HadoopUtils; -import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.common.util.SparkConfigurationUtils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApplicationException; import org.apache.streampark.console.core.entity.ApplicationBuildPipeline; @@ -322,8 +322,8 @@ public class SparkApplicationActionServiceImpl application.getAppName(), application.getMainClass(), appConf, - PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()), - PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs), + SparkConfigurationUtils.extractPropertiesAsJava(application.getAppProperties()), + SparkConfigurationUtils.extractArgumentsAsJava(applicationArgs), application.getApplicationType(), application.getHadoopUser(), buildResult, diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java index 523d862f4..5420c8167 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.streampark.common.enums.FlinkDeployMode; import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.ExceptionUtils; +import org.apache.streampark.common.util.FlinkConfigurationUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.InternalException; @@ -80,7 +81,6 @@ import java.util.concurrent.TimeoutException; import static org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS; import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY; -import static org.apache.streampark.common.util.PropertiesUtils.extractDynamicPropertiesAsJava; import static org.apache.streampark.console.core.enums.CheckPointTypeEnum.CHECKPOINT; @Slf4j @@ -328,7 +328,7 @@ public class FlinkSavepointServiceImpl extends ServiceImpl<FlinkSavepointMapper, @VisibleForTesting @Nullable public String getSavepointFromDynamicProps(String dynamicProps) { - return extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key()); + return FlinkConfigurationUtils.extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key()); } /** @@ -388,7 +388,8 @@ public class FlinkSavepointServiceImpl extends ServiceImpl<FlinkSavepointMapper, * Try get the 'state.checkpoints.num-retained' from the dynamic properties. */ private Optional<Integer> tryGetChkNumRetainedFromDynamicProps(String dynamicProps) { - String rawCfgValue = extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key()); + String rawCfgValue = + FlinkConfigurationUtils.extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key()); if (StringUtils.isBlank(rawCfgValue)) { return Optional.empty(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java index 5979c489f..4e4f09128 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sWatcherWrapper.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.watcher; import org.apache.streampark.common.enums.FlinkDeployMode; -import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.common.util.FlinkConfigurationUtils; import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.entity.FlinkEnv; @@ -128,7 +128,7 @@ public class FlinkK8sWatcherWrapper { FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId()); Properties properties = flinkEnv.getFlinkConfig(); - Map<String, String> dynamicProperties = PropertiesUtils + Map<String, String> dynamicProperties = FlinkConfigurationUtils .extractDynamicPropertiesAsJava(app.getDynamicProperties()); String archiveDir = dynamicProperties.get(JobManagerOptions.ARCHIVE_DIR.key()); if (archiveDir != null) { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 647aa1515..ca1327927 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -469,7 +469,7 @@ trait FlinkClientTrait extends Logger { private[this] def extractProgramArgs(submitRequest: SubmitRequest): JavaList[String] = { val programArgs = new ArrayBuffer[String]() - programArgs ++= PropertiesUtils.extractArguments(submitRequest.args) + programArgs ++= FlinkConfigurationUtils.extractArguments(submitRequest.args) if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
