This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 83fb8c6a8 [Bug] dynamicProperties env.java.opts not support bug fixed
(#2019)
83fb8c6a8 is described below
commit 83fb8c6a84331e97187ce9489bd3abc8364b8c7c
Author: benjobs <[email protected]>
AuthorDate: Tue Nov 15 10:05:03 2022 +0800
[Bug] dynamicProperties env.java.opts not support bug fixed (#2019)
---
.../streampark/flink/submit/FlinkSubmitter.scala | 40 +++++++++++++++-------
.../flink/submit/test/ParameterTestCase.scala | 25 ++++++++++++++
2 files changed, 53 insertions(+), 12 deletions(-)
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/FlinkSubmitter.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/FlinkSubmitter.scala
index 0f32f5f48..7fc3c0720 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/FlinkSubmitter.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/FlinkSubmitter.scala
@@ -27,11 +27,16 @@ import java.util.regex.Pattern
import java.util.{Map => JavaMap}
import javax.annotation.Nonnull
import scala.collection.JavaConverters._
+import scala.collection.mutable
object FlinkSubmitter extends Logger {
private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
+ private[this] lazy val MULTI_PROPERTY_REGEXP = "-D(.*?)\\s*=\\s*\\\"(.*)\\\""
+
+ private[this] lazy val MULTI_PROPERTY_PATTERN =
Pattern.compile(MULTI_PROPERTY_REGEXP)
+
private[this] val FLINK_SUBMIT_CLASS_NAME =
"org.apache.streampark.flink.submit.FlinkSubmit"
private[this] val SUBMIT_REQUEST_CLASS_NAME =
"org.apache.streampark.flink.submit.bean.SubmitRequest"
@@ -92,22 +97,33 @@ object FlinkSubmitter extends Logger {
* extract flink configuration from application.properties
*/
@Nonnull def extractDynamicProperties(properties: String): Map[String,
String] = {
- if (StringUtils.isEmpty(properties)) {
- Map.empty[String, String]
- } else {
- properties.split("\\s?-D") match {
- case x if Utils.isEmpty(x) => Map.empty
- case d =>
- d.filter(_.nonEmpty)
- .map(_.trim)
- .map(PROPERTY_PATTERN.matcher(_))
- .filter(_.matches)
- .map(m => m.group(1) -> m.group(2).replace("\"", "").trim)
- .toMap
+ if (StringUtils.isEmpty(properties)) Map.empty[String, String] else {
+ val map = mutable.Map[String, String]()
+ val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
+ simple.split("\\s?-D") match {
+ case d if Utils.notEmpty(d) => d.foreach(x => {
+ if (x.nonEmpty) {
+ val p = PROPERTY_PATTERN.matcher(x.trim)
+ if (p.matches) {
+ map += p.group(1).trim -> p.group(2).trim
+ }
+ }
+ })
+ case _ =>
}
+ val matcher = MULTI_PROPERTY_PATTERN.matcher(properties)
+ while (matcher.find()) {
+ val opts = matcher.group()
+ val index = opts.indexOf("=")
+ val key = opts.substring(2, index).trim
+ val value = opts.substring(index + 1).trim.replaceAll("(^\"|\"$)", "")
+ map += key -> value
+ }
+ map.toMap
}
}
@Nonnull def extractDynamicPropertiesAsJava(properties: String):
JavaMap[String, String] = new util.HashMap[String,
String](extractDynamicProperties(properties).asJava)
+
}
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
index f87fc9242..47fd188fb 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.flink.submit.test
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.streampark.flink.submit.FlinkSubmitter
import org.junit.jupiter.api.{Assertions, Test}
import scala.collection.mutable.ArrayBuffer
@@ -107,4 +108,28 @@ class ParameterTestCase {
Assertions.assertEquals("d", programArgs(5))
Assertions.assertEquals("yyy", programArgs(7))
}
+
+ @Test def testDynamicProperties(): Unit = {
+ val dynamicProperties =
+ """
+ |-Denv.java.opts1="-Dfile.encoding=UTF-8"
+ |-Denv.java.opts2 = "-Dfile.enc\"oding=UTF-8"
+ |-Denv.java.opts3 = " -Dfile.encoding=UTF-8"
+ |-Dyarn.application.id=123
+ |-Dyarn.application.name="streampark job"
+ |-Dyarn.application.queue=flink
+ |-Ddiy.param.name=apache streampark
+ |
+ |""".stripMargin
+
+ val map = FlinkSubmitter.extractDynamicProperties(dynamicProperties)
+ Assertions.assertEquals(map("env.java.opts1"), "-Dfile.encoding=UTF-8")
+ Assertions.assertEquals(map("env.java.opts2"), "-Dfile.enc\\\"oding=UTF-8")
+ Assertions.assertEquals(map("env.java.opts3"), " -Dfile.encoding=UTF-8")
+ Assertions.assertEquals(map("yarn.application.id"), "123")
+ Assertions.assertEquals(map("yarn.application.name"), "streampark job")
+ Assertions.assertEquals(map("yarn.application.queue"), "flink")
+ Assertions.assertEquals(map("diy.param.name"), "apache streampark")
+ }
+
}