This is an automated email from the ASF dual-hosted git repository.

kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 46e537cd9 issue-2869 bug fixed. (#2924)
46e537cd9 is described below

commit 46e537cd9176c23efb3e177a8c7bbcaf742e9ac5
Author: benjobs <[email protected]>
AuthorDate: Thu Aug 10 22:12:20 2023 +0800

    issue-2869 bug fixed. (#2924)
---
 .../common/util/PropertiesUtilsTestCase.scala      |  64 ++++++-----
 .../flink/client/trait/FlinkClientTrait.scala      | 117 ++++++++++-----------
 .../flink/client/test/ParameterTestCase.scala      |  67 ------------
 3 files changed, 90 insertions(+), 158 deletions(-)

diff --git 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index e04e0e3be..88f7b3575 100644
--- 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++ 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -19,26 +19,34 @@ package org.apache.streampark.common.util
 import org.apache.commons.lang3.StringUtils
 import org.junit.jupiter.api.{Assertions, Test}
 
+import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
+import scala.language.postfixOps
 
 class PropertiesUtilsTestCase {
 
   @Test def testExtractProgramArgs(): Unit = {
-    val argsStr = "--host localhost:8123\n\n\n" +
-      "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
+    val argsStr = "--host localhost:8123\n" +
+      "--sql \"insert into table_a select * from table_b\"\n" +
       "--c d\r\n" +
-      "--x yyy"
+      "--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n"
     val programArgs = new ArrayBuffer[String]()
     if (StringUtils.isNotEmpty(argsStr)) {
-      val multiLineChar = "\"\"\""
+      val multiChar = "\""
       val array = argsStr.split("\\s+")
-      if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
-        array.foreach(programArgs +=)
+      if (!array.exists(_.startsWith(multiChar))) {
+        array.foreach(
+          x => {
+            if (x.trim.nonEmpty) {
+              programArgs += x
+            }
+          })
       } else {
         val argsArray = new ArrayBuffer[String]()
         val tempBuffer = new ArrayBuffer[String]()
 
-        def processElement(index: Int, multiLine: Boolean): Unit = {
+        @tailrec
+        def processElement(index: Int, multi: Boolean): Unit = {
 
           if (index == array.length) {
             if (tempBuffer.nonEmpty) {
@@ -48,43 +56,43 @@ class PropertiesUtilsTestCase {
           }
 
           val next = index + 1
-          val elem = array(index)
+          val elem = array(index).trim
+          val until = if (elem.endsWith(multiChar)) 1 else 0
 
-          if (elem.trim.nonEmpty) {
-            if (!multiLine) {
-              if (elem.startsWith(multiLineChar)) {
-                tempBuffer += elem.drop(3)
-                processElement(next, true)
-              } else {
-                argsArray += elem
-                processElement(next, false)
-              }
-            } else {
-              if (elem.endsWith(multiLineChar)) {
-                tempBuffer += elem.dropRight(3)
+          if (elem.isEmpty) {
+            processElement(next, multi = false)
+          } else {
+            if (multi) {
+              if (elem.endsWith(multiChar)) {
+                tempBuffer += elem.dropRight(1)
                 argsArray += tempBuffer.mkString(" ")
                 tempBuffer.clear()
-                processElement(next, false)
+                processElement(next, multi = false)
               } else {
                 tempBuffer += elem
-                processElement(next, multiLine)
+                processElement(next, multi)
+              }
+            } else {
+              if (elem.startsWith(multiChar)) {
+                tempBuffer += elem.drop(1).dropRight(until)
+                processElement(next, multi = true)
+              } else {
+                argsArray += elem.dropRight(until)
+                processElement(next, multi = false)
               }
             }
-          } else {
-            tempBuffer += elem
-            processElement(next, false)
           }
         }
 
-        processElement(0, false)
-        argsArray.foreach(x => programArgs += x.trim)
+        processElement(0, multi = false)
+        argsArray.foreach(x => programArgs += x)
       }
     }
 
     Assertions.assertEquals("localhost:8123", programArgs(1))
     Assertions.assertEquals("insert into table_a select * from table_b", 
programArgs(3))
     Assertions.assertEquals("d", programArgs(5))
-    Assertions.assertEquals("yyy", programArgs(7))
+    Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7))
   }
 
   @Test def testDynamicProperties(): Unit = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 8eec8118c..5c65da5ba 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -58,6 +58,14 @@ trait FlinkClientTrait extends Logger {
   private[client] lazy val PARAM_KEY_APP_NAME = KEY_APP_NAME(PARAM_PREFIX)
   private[client] lazy val PARAM_KEY_FLINK_PARALLELISM = 
KEY_FLINK_PARALLELISM(PARAM_PREFIX)
 
+  private[this] lazy val javaEnvOpts = List(
+    CoreOptions.FLINK_JVM_OPTIONS,
+    CoreOptions.FLINK_JM_JVM_OPTIONS,
+    CoreOptions.FLINK_HS_JVM_OPTIONS,
+    CoreOptions.FLINK_TM_JVM_OPTIONS,
+    CoreOptions.FLINK_CLI_JVM_OPTIONS
+  )
+
   @throws[Exception]
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
     logInfo(
@@ -124,7 +132,14 @@ trait FlinkClientTrait extends Logger {
     }
 
     // set JVMOptions..
-    setJvmOptions(submitRequest, flinkConfig)
+    if (MapUtils.isNotEmpty(submitRequest.properties)) {
+      submitRequest.properties.foreach(
+        x =>
+          javaEnvOpts.find(_.key == x._1.trim) match {
+            case Some(p) => flinkConfig.set(p, x._2.toString)
+            case _ =>
+          })
+    }
 
     setConfig(submitRequest, flinkConfig)
 
@@ -132,29 +147,6 @@ trait FlinkClientTrait extends Logger {
 
   }
 
-  private[this] def setJvmOptions(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration): Unit = {
-    if (MapUtils.isNotEmpty(submitRequest.properties)) {
-      submitRequest.properties.foreach(
-        x => {
-          val k = x._1.trim
-          val v = x._2.toString
-          if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
-          }
-        })
-    }
-  }
-
   def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
 
   @throws[Exception]
@@ -310,7 +302,6 @@ trait FlinkClientTrait extends Logger {
       submitRequest.appOption
         .filter(
           x => {
-            // 验证参数是否合法...
             val verify = commandLineOptions.hasOption(x._1)
             if (!verify) logWarn(s"param:${x._1} is error,skip it.")
             verify
@@ -405,78 +396,78 @@ trait FlinkClientTrait extends Logger {
   }
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
-
     val programArgs = new ArrayBuffer[String]()
+    val args = submitRequest.args
 
-    if (StringUtils.isNotEmpty(submitRequest.args)) {
-      val multiLineChar = "\"\"\""
-      val array = submitRequest.args.split("\\s+")
-      if (!array.exists(_.startsWith(multiLineChar))) {
+    if (StringUtils.isNotEmpty(args)) {
+      val multiChar = "\""
+      val array = args.split("\\s+")
+      if (!array.exists(_.startsWith(multiChar))) {
         array.foreach(programArgs +=)
       } else {
         val argsArray = new ArrayBuffer[String]()
         val tempBuffer = new ArrayBuffer[String]()
 
-        @tailrec def processElement(index: Int, multiLine: Boolean): Unit = {
+        @tailrec
+        def processElement(index: Int, multi: Boolean): Unit = {
+
           if (index == array.length) {
             if (tempBuffer.nonEmpty) {
               argsArray += tempBuffer.mkString(" ")
             }
             return
           }
+
           val next = index + 1
-          val elem = array(index)
+          val elem = array(index).trim
 
-          if (elem.trim.nonEmpty) {
-            if (!multiLine) {
-              if (elem.startsWith(multiLineChar)) {
-                tempBuffer += elem.drop(3)
-                processElement(next, multiLine = true)
-              } else {
-                argsArray += elem
-                processElement(next, multiLine = false)
-              }
-            } else {
-              if (elem.endsWith(multiLineChar)) {
-                tempBuffer += elem.dropRight(3)
+          if (elem.isEmpty) {
+            processElement(next, multi = false)
+          } else {
+            if (multi) {
+              if (elem.endsWith(multiChar)) {
+                tempBuffer += elem.dropRight(1)
                 argsArray += tempBuffer.mkString(" ")
                 tempBuffer.clear()
-                processElement(next, multiLine = false)
+                processElement(next, multi = false)
               } else {
                 tempBuffer += elem
-                processElement(next, multiLine)
+                processElement(next, multi)
+              }
+            } else {
+              val until = if (elem.endsWith(multiChar)) 1 else 0
+              if (elem.startsWith(multiChar)) {
+                tempBuffer += elem.drop(1).dropRight(until)
+                processElement(next, multi = true)
+              } else {
+                argsArray += elem.dropRight(until)
+                processElement(next, multi = false)
               }
             }
-          } else {
-            tempBuffer += elem
-            processElement(next, multiLine = false)
           }
         }
 
-        processElement(0, multiLine = false)
-        argsArray.foreach(x => programArgs += x.trim)
+        processElement(0, multi = false)
+        argsArray.foreach(x => programArgs += x)
       }
     }
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
-      programArgs += PARAM_KEY_FLINK_CONF
-      programArgs += submitRequest.flinkYaml
-      programArgs += PARAM_KEY_APP_NAME
-      programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
-      programArgs += PARAM_KEY_FLINK_PARALLELISM
-      programArgs += getParallelism(submitRequest).toString
+
+      programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml
+      programArgs += PARAM_KEY_APP_NAME += 
DeflaterUtils.zipString(submitRequest.effectiveAppName)
+      programArgs += PARAM_KEY_FLINK_PARALLELISM += 
getParallelism(submitRequest).toString
+
       submitRequest.developmentMode match {
         case DevelopmentMode.FLINK_SQL =>
-          programArgs += PARAM_KEY_FLINK_SQL
-          programArgs += submitRequest.flinkSQL
+          programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL
           if (submitRequest.appConf != null) {
-            programArgs += PARAM_KEY_APP_CONF
-            programArgs += submitRequest.appConf
+            programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
           }
         case _ if 
Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
-          programArgs += PARAM_KEY_APP_CONF
-          programArgs += submitRequest.appConf
+          programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
       }
+
     }
     programArgs.toList.asJava
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
index 32929d2d2..4827d824d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
@@ -16,12 +16,9 @@
  */
 package org.apache.streampark.flink.client.test
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.java.utils.ParameterTool
 import org.junit.jupiter.api.{Assertions, Test}
 
-import scala.collection.mutable.ArrayBuffer
-
 class ParameterTestCase {
 
   @Test def readArgs(): Unit = {
@@ -42,68 +39,4 @@ class ParameterTestCase {
     Assertions.assertEquals("5", 
param.get("flink.deployment.option.parallelism"))
   }
 
-  @Test def testExtractProgramArgs(): Unit = {
-    val argsStr = "--host localhost:8123\n\n\n" +
-      "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
-      "--c d\r\n" +
-      "--x yyy"
-    val programArgs = new ArrayBuffer[String]()
-    if (StringUtils.isNotEmpty(argsStr)) {
-      val multiLineChar = "\"\"\""
-      val array = argsStr.split("\\s+")
-      if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
-        array.foreach(programArgs +=)
-      } else {
-        val argsArray = new ArrayBuffer[String]()
-        val tempBuffer = new ArrayBuffer[String]()
-
-        def processElement(index: Int, multiLine: Boolean): Unit = {
-
-          if (index == array.length) {
-            if (tempBuffer.nonEmpty) {
-              argsArray += tempBuffer.mkString(" ")
-            }
-            return
-          }
-
-          val next = index + 1
-          val elem = array(index)
-
-          if (elem.trim.nonEmpty) {
-            if (!multiLine) {
-              if (elem.startsWith(multiLineChar)) {
-                tempBuffer += elem.drop(3)
-                processElement(next, true)
-              } else {
-                argsArray += elem
-                processElement(next, false)
-              }
-            } else {
-              if (elem.endsWith(multiLineChar)) {
-                tempBuffer += elem.dropRight(3)
-                argsArray += tempBuffer.mkString(" ")
-                tempBuffer.clear()
-                processElement(next, false)
-              } else {
-                tempBuffer += elem
-                processElement(next, multiLine)
-              }
-            }
-          } else {
-            tempBuffer += elem
-            processElement(next, false)
-          }
-        }
-
-        processElement(0, false)
-        argsArray.foreach(x => programArgs += x.trim)
-      }
-    }
-
-    Assertions.assertEquals("localhost:8123", programArgs(1))
-    Assertions.assertEquals("insert into table_a select * from table_b", 
programArgs(3))
-    Assertions.assertEquals("d", programArgs(5))
-    Assertions.assertEquals("yyy", programArgs(7))
-  }
-
 }

Reply via email to