This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch issue-2869
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/issue-2869 by this push:
new c980bdd35 minor improvement
c980bdd35 is described below
commit c980bdd3571f885a071c6967653e23293de5bb99
Author: benjobs <[email protected]>
AuthorDate: Mon Aug 7 09:07:34 2023 +0800
minor improvement
---
.../flink/client/trait/FlinkClientTrait.scala | 34 +++++++++-------------
1 file changed, 14 insertions(+), 20 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 60fecedf3..5c65da5ba 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -132,17 +132,6 @@ trait FlinkClientTrait extends Logger {
}
// set JVMOptions..
- setJvmOptions(submitRequest, flinkConfig)
-
- setConfig(submitRequest, flinkConfig)
-
- doSubmit(submitRequest, flinkConfig)
-
- }
-
- private[this] def setJvmOptions(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration): Unit = {
if (MapUtils.isNotEmpty(submitRequest.properties)) {
submitRequest.properties.foreach(
x =>
@@ -151,6 +140,11 @@ trait FlinkClientTrait extends Logger {
case _ =>
})
}
+
+ setConfig(submitRequest, flinkConfig)
+
+ doSubmit(submitRequest, flinkConfig)
+
}
def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@@ -459,21 +453,21 @@ trait FlinkClientTrait extends Logger {
}
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
- programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml +=
PARAM_KEY_APP_NAME += DeflaterUtils
- .zipString(submitRequest.effectiveAppName) +=
PARAM_KEY_FLINK_PARALLELISM += getParallelism(
- submitRequest).toString
+
+ programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml
+ programArgs += PARAM_KEY_APP_NAME +=
DeflaterUtils.zipString(submitRequest.effectiveAppName)
+ programArgs += PARAM_KEY_FLINK_PARALLELISM +=
getParallelism(submitRequest).toString
+
submitRequest.developmentMode match {
case DevelopmentMode.FLINK_SQL =>
- programArgs += PARAM_KEY_FLINK_SQL
- programArgs += submitRequest.flinkSQL
+ programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL
if (submitRequest.appConf != null) {
- programArgs += PARAM_KEY_APP_CONF
- programArgs += submitRequest.appConf
+ programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
}
case _ if
Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
- programArgs += PARAM_KEY_APP_CONF
- programArgs += submitRequest.appConf
+ programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
}
+
}
programArgs.toList.asJava
}