This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch issue-2869
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/issue-2869 by this push:
new 365d291a5 minor improvement
365d291a5 is described below
commit 365d291a5b1b739982dadccc0fd46c40b747adc6
Author: benjobs <[email protected]>
AuthorDate: Sun Aug 6 22:24:25 2023 +0800
minor improvement
---
.../flink/client/trait/FlinkClientTrait.scala | 38 +++++++++-------------
1 file changed, 16 insertions(+), 22 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index e563f1621..60fecedf3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -58,6 +58,14 @@ trait FlinkClientTrait extends Logger {
private[client] lazy val PARAM_KEY_APP_NAME = KEY_APP_NAME(PARAM_PREFIX)
private[client] lazy val PARAM_KEY_FLINK_PARALLELISM =
KEY_FLINK_PARALLELISM(PARAM_PREFIX)
+ private[this] lazy val javaEnvOpts = List(
+ CoreOptions.FLINK_JVM_OPTIONS,
+ CoreOptions.FLINK_JM_JVM_OPTIONS,
+ CoreOptions.FLINK_HS_JVM_OPTIONS,
+ CoreOptions.FLINK_TM_JVM_OPTIONS,
+ CoreOptions.FLINK_CLI_JVM_OPTIONS
+ )
+
@throws[Exception]
def submit(submitRequest: SubmitRequest): SubmitResponse = {
logInfo(
@@ -137,21 +145,11 @@ trait FlinkClientTrait extends Logger {
flinkConfig: Configuration): Unit = {
if (MapUtils.isNotEmpty(submitRequest.properties)) {
submitRequest.properties.foreach(
- x => {
- val k = x._1.trim
- val v = x._2.toString
- if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
- }
- })
+ x =>
+ javaEnvOpts.find(_.key == x._1.trim) match {
+ case Some(p) => flinkConfig.set(p, x._2.toString)
+ case _ =>
+ })
}
}
@@ -310,7 +308,6 @@ trait FlinkClientTrait extends Logger {
submitRequest.appOption
.filter(
x => {
- // 验证参数是否合法...
val verify = commandLineOptions.hasOption(x._1)
if (!verify) logWarn(s"param:${x._1} is error,skip it.")
verify
@@ -462,12 +459,9 @@ trait FlinkClientTrait extends Logger {
}
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
- programArgs += PARAM_KEY_FLINK_CONF
- programArgs += submitRequest.flinkYaml
- programArgs += PARAM_KEY_APP_NAME
- programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
- programArgs += PARAM_KEY_FLINK_PARALLELISM
- programArgs += getParallelism(submitRequest).toString
+ programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml +=
PARAM_KEY_APP_NAME += DeflaterUtils
+ .zipString(submitRequest.effectiveAppName) +=
PARAM_KEY_FLINK_PARALLELISM += getParallelism(
+ submitRequest).toString
submitRequest.developmentMode match {
case DevelopmentMode.FLINK_SQL =>
programArgs += PARAM_KEY_FLINK_SQL