This is an automated email from the ASF dual-hosted git repository.
kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 46e537cd9 issue-2869 bug fixed. (#2924)
46e537cd9 is described below
commit 46e537cd9176c23efb3e177a8c7bbcaf742e9ac5
Author: benjobs <[email protected]>
AuthorDate: Thu Aug 10 22:12:20 2023 +0800
issue-2869 bug fixed. (#2924)
---
.../common/util/PropertiesUtilsTestCase.scala | 64 ++++++-----
.../flink/client/trait/FlinkClientTrait.scala | 117 ++++++++++-----------
.../flink/client/test/ParameterTestCase.scala | 67 ------------
3 files changed, 90 insertions(+), 158 deletions(-)
diff --git
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index e04e0e3be..88f7b3575 100644
---
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -19,26 +19,34 @@ package org.apache.streampark.common.util
import org.apache.commons.lang3.StringUtils
import org.junit.jupiter.api.{Assertions, Test}
+import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
+import scala.language.postfixOps
class PropertiesUtilsTestCase {
@Test def testExtractProgramArgs(): Unit = {
- val argsStr = "--host localhost:8123\n\n\n" +
- "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
+ val argsStr = "--host localhost:8123\n" +
+ "--sql \"insert into table_a select * from table_b\"\n" +
"--c d\r\n" +
- "--x yyy"
+ "--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n"
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(argsStr)) {
- val multiLineChar = "\"\"\""
+ val multiChar = "\""
val array = argsStr.split("\\s+")
- if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
- array.foreach(programArgs +=)
+ if (!array.exists(_.startsWith(multiChar))) {
+ array.foreach(
+ x => {
+ if (x.trim.nonEmpty) {
+ programArgs += x
+ }
+ })
} else {
val argsArray = new ArrayBuffer[String]()
val tempBuffer = new ArrayBuffer[String]()
- def processElement(index: Int, multiLine: Boolean): Unit = {
+ @tailrec
+ def processElement(index: Int, multi: Boolean): Unit = {
if (index == array.length) {
if (tempBuffer.nonEmpty) {
@@ -48,43 +56,43 @@ class PropertiesUtilsTestCase {
}
val next = index + 1
- val elem = array(index)
+ val elem = array(index).trim
+ val until = if (elem.endsWith(multiChar)) 1 else 0
- if (elem.trim.nonEmpty) {
- if (!multiLine) {
- if (elem.startsWith(multiLineChar)) {
- tempBuffer += elem.drop(3)
- processElement(next, true)
- } else {
- argsArray += elem
- processElement(next, false)
- }
- } else {
- if (elem.endsWith(multiLineChar)) {
- tempBuffer += elem.dropRight(3)
+ if (elem.isEmpty) {
+ processElement(next, multi = false)
+ } else {
+ if (multi) {
+ if (elem.endsWith(multiChar)) {
+ tempBuffer += elem.dropRight(1)
argsArray += tempBuffer.mkString(" ")
tempBuffer.clear()
- processElement(next, false)
+ processElement(next, multi = false)
} else {
tempBuffer += elem
- processElement(next, multiLine)
+ processElement(next, multi)
+ }
+ } else {
+ if (elem.startsWith(multiChar)) {
+ tempBuffer += elem.drop(1).dropRight(until)
+ processElement(next, multi = true)
+ } else {
+ argsArray += elem.dropRight(until)
+ processElement(next, multi = false)
}
}
- } else {
- tempBuffer += elem
- processElement(next, false)
}
}
- processElement(0, false)
- argsArray.foreach(x => programArgs += x.trim)
+ processElement(0, multi = false)
+ argsArray.foreach(x => programArgs += x)
}
}
Assertions.assertEquals("localhost:8123", programArgs(1))
Assertions.assertEquals("insert into table_a select * from table_b",
programArgs(3))
Assertions.assertEquals("d", programArgs(5))
- Assertions.assertEquals("yyy", programArgs(7))
+ Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7))
}
@Test def testDynamicProperties(): Unit = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 8eec8118c..5c65da5ba 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -58,6 +58,14 @@ trait FlinkClientTrait extends Logger {
private[client] lazy val PARAM_KEY_APP_NAME = KEY_APP_NAME(PARAM_PREFIX)
private[client] lazy val PARAM_KEY_FLINK_PARALLELISM =
KEY_FLINK_PARALLELISM(PARAM_PREFIX)
+ private[this] lazy val javaEnvOpts = List(
+ CoreOptions.FLINK_JVM_OPTIONS,
+ CoreOptions.FLINK_JM_JVM_OPTIONS,
+ CoreOptions.FLINK_HS_JVM_OPTIONS,
+ CoreOptions.FLINK_TM_JVM_OPTIONS,
+ CoreOptions.FLINK_CLI_JVM_OPTIONS
+ )
+
@throws[Exception]
def submit(submitRequest: SubmitRequest): SubmitResponse = {
logInfo(
@@ -124,7 +132,14 @@ trait FlinkClientTrait extends Logger {
}
// set JVMOptions..
- setJvmOptions(submitRequest, flinkConfig)
+ if (MapUtils.isNotEmpty(submitRequest.properties)) {
+ submitRequest.properties.foreach(
+ x =>
+ javaEnvOpts.find(_.key == x._1.trim) match {
+ case Some(p) => flinkConfig.set(p, x._2.toString)
+ case _ =>
+ })
+ }
setConfig(submitRequest, flinkConfig)
@@ -132,29 +147,6 @@ trait FlinkClientTrait extends Logger {
}
- private[this] def setJvmOptions(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration): Unit = {
- if (MapUtils.isNotEmpty(submitRequest.properties)) {
- submitRequest.properties.foreach(
- x => {
- val k = x._1.trim
- val v = x._2.toString
- if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
- }
- })
- }
- }
-
def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@throws[Exception]
@@ -310,7 +302,6 @@ trait FlinkClientTrait extends Logger {
submitRequest.appOption
.filter(
x => {
- // 验证参数是否合法...
val verify = commandLineOptions.hasOption(x._1)
if (!verify) logWarn(s"param:${x._1} is error,skip it.")
verify
@@ -405,78 +396,78 @@ trait FlinkClientTrait extends Logger {
}
private[this] def extractProgramArgs(submitRequest: SubmitRequest):
JavaList[String] = {
-
val programArgs = new ArrayBuffer[String]()
+ val args = submitRequest.args
- if (StringUtils.isNotEmpty(submitRequest.args)) {
- val multiLineChar = "\"\"\""
- val array = submitRequest.args.split("\\s+")
- if (!array.exists(_.startsWith(multiLineChar))) {
+ if (StringUtils.isNotEmpty(args)) {
+ val multiChar = "\""
+ val array = args.split("\\s+")
+ if (!array.exists(_.startsWith(multiChar))) {
array.foreach(programArgs +=)
} else {
val argsArray = new ArrayBuffer[String]()
val tempBuffer = new ArrayBuffer[String]()
- @tailrec def processElement(index: Int, multiLine: Boolean): Unit = {
+ @tailrec
+ def processElement(index: Int, multi: Boolean): Unit = {
+
if (index == array.length) {
if (tempBuffer.nonEmpty) {
argsArray += tempBuffer.mkString(" ")
}
return
}
+
val next = index + 1
- val elem = array(index)
+ val elem = array(index).trim
- if (elem.trim.nonEmpty) {
- if (!multiLine) {
- if (elem.startsWith(multiLineChar)) {
- tempBuffer += elem.drop(3)
- processElement(next, multiLine = true)
- } else {
- argsArray += elem
- processElement(next, multiLine = false)
- }
- } else {
- if (elem.endsWith(multiLineChar)) {
- tempBuffer += elem.dropRight(3)
+ if (elem.isEmpty) {
+ processElement(next, multi = false)
+ } else {
+ if (multi) {
+ if (elem.endsWith(multiChar)) {
+ tempBuffer += elem.dropRight(1)
argsArray += tempBuffer.mkString(" ")
tempBuffer.clear()
- processElement(next, multiLine = false)
+ processElement(next, multi = false)
} else {
tempBuffer += elem
- processElement(next, multiLine)
+ processElement(next, multi)
+ }
+ } else {
+ val until = if (elem.endsWith(multiChar)) 1 else 0
+ if (elem.startsWith(multiChar)) {
+ tempBuffer += elem.drop(1).dropRight(until)
+ processElement(next, multi = true)
+ } else {
+ argsArray += elem.dropRight(until)
+ processElement(next, multi = false)
}
}
- } else {
- tempBuffer += elem
- processElement(next, multiLine = false)
}
}
- processElement(0, multiLine = false)
- argsArray.foreach(x => programArgs += x.trim)
+ processElement(0, multi = false)
+ argsArray.foreach(x => programArgs += x)
}
}
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
- programArgs += PARAM_KEY_FLINK_CONF
- programArgs += submitRequest.flinkYaml
- programArgs += PARAM_KEY_APP_NAME
- programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
- programArgs += PARAM_KEY_FLINK_PARALLELISM
- programArgs += getParallelism(submitRequest).toString
+
+ programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml
+ programArgs += PARAM_KEY_APP_NAME +=
DeflaterUtils.zipString(submitRequest.effectiveAppName)
+ programArgs += PARAM_KEY_FLINK_PARALLELISM +=
getParallelism(submitRequest).toString
+
submitRequest.developmentMode match {
case DevelopmentMode.FLINK_SQL =>
- programArgs += PARAM_KEY_FLINK_SQL
- programArgs += submitRequest.flinkSQL
+ programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL
if (submitRequest.appConf != null) {
- programArgs += PARAM_KEY_APP_CONF
- programArgs += submitRequest.appConf
+ programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
}
case _ if
Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
- programArgs += PARAM_KEY_APP_CONF
- programArgs += submitRequest.appConf
+ programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
}
+
}
programArgs.toList.asJava
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
index 32929d2d2..4827d824d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
@@ -16,12 +16,9 @@
*/
package org.apache.streampark.flink.client.test
-import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.java.utils.ParameterTool
import org.junit.jupiter.api.{Assertions, Test}
-import scala.collection.mutable.ArrayBuffer
-
class ParameterTestCase {
@Test def readArgs(): Unit = {
@@ -42,68 +39,4 @@ class ParameterTestCase {
Assertions.assertEquals("5",
param.get("flink.deployment.option.parallelism"))
}
- @Test def testExtractProgramArgs(): Unit = {
- val argsStr = "--host localhost:8123\n\n\n" +
- "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
- "--c d\r\n" +
- "--x yyy"
- val programArgs = new ArrayBuffer[String]()
- if (StringUtils.isNotEmpty(argsStr)) {
- val multiLineChar = "\"\"\""
- val array = argsStr.split("\\s+")
- if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
- array.foreach(programArgs +=)
- } else {
- val argsArray = new ArrayBuffer[String]()
- val tempBuffer = new ArrayBuffer[String]()
-
- def processElement(index: Int, multiLine: Boolean): Unit = {
-
- if (index == array.length) {
- if (tempBuffer.nonEmpty) {
- argsArray += tempBuffer.mkString(" ")
- }
- return
- }
-
- val next = index + 1
- val elem = array(index)
-
- if (elem.trim.nonEmpty) {
- if (!multiLine) {
- if (elem.startsWith(multiLineChar)) {
- tempBuffer += elem.drop(3)
- processElement(next, true)
- } else {
- argsArray += elem
- processElement(next, false)
- }
- } else {
- if (elem.endsWith(multiLineChar)) {
- tempBuffer += elem.dropRight(3)
- argsArray += tempBuffer.mkString(" ")
- tempBuffer.clear()
- processElement(next, false)
- } else {
- tempBuffer += elem
- processElement(next, multiLine)
- }
- }
- } else {
- tempBuffer += elem
- processElement(next, false)
- }
- }
-
- processElement(0, false)
- argsArray.foreach(x => programArgs += x.trim)
- }
- }
-
- Assertions.assertEquals("localhost:8123", programArgs(1))
- Assertions.assertEquals("insert into table_a select * from table_b",
programArgs(3))
- Assertions.assertEquals("d", programArgs(5))
- Assertions.assertEquals("yyy", programArgs(7))
- }
-
}