This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch issue-2869
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/issue-2869 by this push:
     new c980bdd35 minor improvement
c980bdd35 is described below

commit c980bdd3571f885a071c6967653e23293de5bb99
Author: benjobs <[email protected]>
AuthorDate: Mon Aug 7 09:07:34 2023 +0800

    minor improvement
---
 .../flink/client/trait/FlinkClientTrait.scala      | 34 +++++++++-------------
 1 file changed, 14 insertions(+), 20 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 60fecedf3..5c65da5ba 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -132,17 +132,6 @@ trait FlinkClientTrait extends Logger {
     }
 
     // set JVMOptions..
-    setJvmOptions(submitRequest, flinkConfig)
-
-    setConfig(submitRequest, flinkConfig)
-
-    doSubmit(submitRequest, flinkConfig)
-
-  }
-
-  private[this] def setJvmOptions(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration): Unit = {
     if (MapUtils.isNotEmpty(submitRequest.properties)) {
       submitRequest.properties.foreach(
         x =>
@@ -151,6 +140,11 @@ trait FlinkClientTrait extends Logger {
             case _ =>
           })
     }
+
+    setConfig(submitRequest, flinkConfig)
+
+    doSubmit(submitRequest, flinkConfig)
+
   }
 
   def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@@ -459,21 +453,21 @@ trait FlinkClientTrait extends Logger {
     }
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
-      programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml += 
PARAM_KEY_APP_NAME += DeflaterUtils
-        .zipString(submitRequest.effectiveAppName) += 
PARAM_KEY_FLINK_PARALLELISM += getParallelism(
-        submitRequest).toString
+
+      programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml
+      programArgs += PARAM_KEY_APP_NAME += 
DeflaterUtils.zipString(submitRequest.effectiveAppName)
+      programArgs += PARAM_KEY_FLINK_PARALLELISM += 
getParallelism(submitRequest).toString
+
       submitRequest.developmentMode match {
         case DevelopmentMode.FLINK_SQL =>
-          programArgs += PARAM_KEY_FLINK_SQL
-          programArgs += submitRequest.flinkSQL
+          programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL
           if (submitRequest.appConf != null) {
-            programArgs += PARAM_KEY_APP_CONF
-            programArgs += submitRequest.appConf
+            programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
           }
         case _ if 
Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
-          programArgs += PARAM_KEY_APP_CONF
-          programArgs += submitRequest.appConf
+          programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
       }
+
     }
     programArgs.toList.asJava
   }

Reply via email to