This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch issue-2869
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/issue-2869 by this push:
     new 365d291a5 minor improvement
365d291a5 is described below

commit 365d291a5b1b739982dadccc0fd46c40b747adc6
Author: benjobs <[email protected]>
AuthorDate: Sun Aug 6 22:24:25 2023 +0800

    minor improvement
---
 .../flink/client/trait/FlinkClientTrait.scala      | 38 +++++++++-------------
 1 file changed, 16 insertions(+), 22 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index e563f1621..60fecedf3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -58,6 +58,14 @@ trait FlinkClientTrait extends Logger {
   private[client] lazy val PARAM_KEY_APP_NAME = KEY_APP_NAME(PARAM_PREFIX)
   private[client] lazy val PARAM_KEY_FLINK_PARALLELISM = 
KEY_FLINK_PARALLELISM(PARAM_PREFIX)
 
+  private[this] lazy val javaEnvOpts = List(
+    CoreOptions.FLINK_JVM_OPTIONS,
+    CoreOptions.FLINK_JM_JVM_OPTIONS,
+    CoreOptions.FLINK_HS_JVM_OPTIONS,
+    CoreOptions.FLINK_TM_JVM_OPTIONS,
+    CoreOptions.FLINK_CLI_JVM_OPTIONS
+  )
+
   @throws[Exception]
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
     logInfo(
@@ -137,21 +145,11 @@ trait FlinkClientTrait extends Logger {
       flinkConfig: Configuration): Unit = {
     if (MapUtils.isNotEmpty(submitRequest.properties)) {
       submitRequest.properties.foreach(
-        x => {
-          val k = x._1.trim
-          val v = x._2.toString
-          if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
-          }
-        })
+        x =>
+          javaEnvOpts.find(_.key == x._1.trim) match {
+            case Some(p) => flinkConfig.set(p, x._2.toString)
+            case _ =>
+          })
     }
   }
 
@@ -310,7 +308,6 @@ trait FlinkClientTrait extends Logger {
       submitRequest.appOption
         .filter(
           x => {
-            // 验证参数是否合法...
             val verify = commandLineOptions.hasOption(x._1)
             if (!verify) logWarn(s"param:${x._1} is error,skip it.")
             verify
@@ -462,12 +459,9 @@ trait FlinkClientTrait extends Logger {
     }
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
-      programArgs += PARAM_KEY_FLINK_CONF
-      programArgs += submitRequest.flinkYaml
-      programArgs += PARAM_KEY_APP_NAME
-      programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
-      programArgs += PARAM_KEY_FLINK_PARALLELISM
-      programArgs += getParallelism(submitRequest).toString
+      programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml += 
PARAM_KEY_APP_NAME += DeflaterUtils
+        .zipString(submitRequest.effectiveAppName) += 
PARAM_KEY_FLINK_PARALLELISM += getParallelism(
+        submitRequest).toString
       submitRequest.developmentMode match {
         case DevelopmentMode.FLINK_SQL =>
           programArgs += PARAM_KEY_FLINK_SQL

Reply via email to