This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 83fb8c6a8 [Bug] dynamicProperties env.java.opts not support bug fixed 
(#2019)
83fb8c6a8 is described below

commit 83fb8c6a84331e97187ce9489bd3abc8364b8c7c
Author: benjobs <[email protected]>
AuthorDate: Tue Nov 15 10:05:03 2022 +0800

    [Bug] dynamicProperties env.java.opts not support bug fixed (#2019)
---
 .../streampark/flink/submit/FlinkSubmitter.scala   | 40 +++++++++++++++-------
 .../flink/submit/test/ParameterTestCase.scala      | 25 ++++++++++++++
 2 files changed, 53 insertions(+), 12 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/FlinkSubmitter.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/FlinkSubmitter.scala
index 0f32f5f48..7fc3c0720 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/FlinkSubmitter.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/FlinkSubmitter.scala
@@ -27,11 +27,16 @@ import java.util.regex.Pattern
 import java.util.{Map => JavaMap}
 import javax.annotation.Nonnull
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 object FlinkSubmitter extends Logger {
 
   private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
 
+  private[this] lazy val MULTI_PROPERTY_REGEXP = "-D(.*?)\\s*=\\s*\\\"(.*)\\\""
+
+  private[this] lazy val MULTI_PROPERTY_PATTERN = 
Pattern.compile(MULTI_PROPERTY_REGEXP)
+
   private[this] val FLINK_SUBMIT_CLASS_NAME = 
"org.apache.streampark.flink.submit.FlinkSubmit"
 
   private[this] val SUBMIT_REQUEST_CLASS_NAME = 
"org.apache.streampark.flink.submit.bean.SubmitRequest"
@@ -92,22 +97,33 @@ object FlinkSubmitter extends Logger {
    * extract flink configuration from application.properties
    */
   @Nonnull def extractDynamicProperties(properties: String): Map[String, 
String] = {
-    if (StringUtils.isEmpty(properties)) {
-      Map.empty[String, String]
-    } else {
-      properties.split("\\s?-D") match {
-        case x if Utils.isEmpty(x) => Map.empty
-        case d =>
-          d.filter(_.nonEmpty)
-            .map(_.trim)
-            .map(PROPERTY_PATTERN.matcher(_))
-            .filter(_.matches)
-            .map(m => m.group(1) -> m.group(2).replace("\"", "").trim)
-            .toMap
+    if (StringUtils.isEmpty(properties)) Map.empty[String, String] else {
+      val map = mutable.Map[String, String]()
+      val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
+      simple.split("\\s?-D") match {
+        case d if Utils.notEmpty(d) => d.foreach(x => {
+          if (x.nonEmpty) {
+            val p = PROPERTY_PATTERN.matcher(x.trim)
+            if (p.matches) {
+              map += p.group(1).trim -> p.group(2).trim
+            }
+          }
+        })
+        case _ =>
       }
+      val matcher = MULTI_PROPERTY_PATTERN.matcher(properties)
+      while (matcher.find()) {
+        val opts = matcher.group()
+        val index = opts.indexOf("=")
+        val key = opts.substring(2, index).trim
+        val value = opts.substring(index + 1).trim.replaceAll("(^\"|\"$)", "")
+        map += key -> value
+      }
+      map.toMap
     }
   }
 
   @Nonnull def extractDynamicPropertiesAsJava(properties: String): 
JavaMap[String, String] = new util.HashMap[String, 
String](extractDynamicProperties(properties).asJava)
 
+
 }
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
index f87fc9242..47fd188fb 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.flink.submit.test
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.streampark.flink.submit.FlinkSubmitter
 import org.junit.jupiter.api.{Assertions, Test}
 
 import scala.collection.mutable.ArrayBuffer
@@ -107,4 +108,28 @@ class ParameterTestCase {
     Assertions.assertEquals("d", programArgs(5))
     Assertions.assertEquals("yyy", programArgs(7))
   }
+
+  @Test def testDynamicProperties(): Unit = {
+    val dynamicProperties =
+      """
+        |-Denv.java.opts1="-Dfile.encoding=UTF-8"
+        |-Denv.java.opts2 = "-Dfile.enc\"oding=UTF-8"
+        |-Denv.java.opts3 = " -Dfile.encoding=UTF-8"
+        |-Dyarn.application.id=123
+        |-Dyarn.application.name="streampark job"
+        |-Dyarn.application.queue=flink
+        |-Ddiy.param.name=apache streampark
+        |
+        |""".stripMargin
+
+    val map = FlinkSubmitter.extractDynamicProperties(dynamicProperties)
+    Assertions.assertEquals(map("env.java.opts1"), "-Dfile.encoding=UTF-8")
+    Assertions.assertEquals(map("env.java.opts2"), "-Dfile.enc\\\"oding=UTF-8")
+    Assertions.assertEquals(map("env.java.opts3"), " -Dfile.encoding=UTF-8")
+    Assertions.assertEquals(map("yarn.application.id"), "123")
+    Assertions.assertEquals(map("yarn.application.name"), "streampark job")
+    Assertions.assertEquals(map("yarn.application.queue"), "flink")
+    Assertions.assertEquals(map("diy.param.name"), "apache streampark")
+  }
+
 }

Reply via email to