This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 7a511d5b1 [Improve] submit flink job default file.encoding parameter
improvement (#3838)
7a511d5b1 is described below
commit 7a511d5b161e1fcbe4ddbd8b154e4f1cb176178c
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 6 19:47:33 2024 +0800
[Improve] submit flink job default file.encoding parameter improvement
(#3838)
* [Improve] flink on yarn-per-job mode bug fixed #3761
* [Improve] FlinkClientTrait improvement
* [Improve] SubmitRequest minor improvement
* [Improve] submit flink job file.encoding parameter improvement
---
.../common/enums/FlinkDevelopmentMode.java | 1 +
.../flink/client/bean/SubmitRequest.scala | 2 +
.../flink/client/trait/FlinkClientTrait.scala | 55 ++++++++++------------
3 files changed, 28 insertions(+), 30 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkDevelopmentMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkDevelopmentMode.java
index e94881a6c..107d000af 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkDevelopmentMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkDevelopmentMode.java
@@ -34,6 +34,7 @@ public enum FlinkDevelopmentMode {
/** Py flink Mode */
PYFLINK("Python Flink", 3);
+
private final String name;
private final Integer mode;
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index a1ab7df30..b8be52440 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -113,6 +113,8 @@ case class SubmitRequest(
def hasProp(key: String): Boolean = properties.containsKey(key)
+ def getProp(key: String): Any = properties.get(key)
+
private[this] def getParameterMap(prefix: String = ""): Map[String, String]
= {
if (this.appConf == null) {
return Map.empty[String, String]
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 4cebe8d4e..9ffafebc0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -83,9 +83,6 @@ trait FlinkClientTrait extends Logger {
// prepare flink config
val flinkConfig = prepareConfig(submitRequest)
- // set JVMOptions..
- setJvmOptions(submitRequest, flinkConfig)
-
setConfig(submitRequest, flinkConfig)
Try(doSubmit(submitRequest, flinkConfig)) match {
@@ -135,7 +132,7 @@ trait FlinkClientTrait extends Logger {
}
}
- // set common parameter
+ // 1) set common parameter
flinkConfig
.safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
.safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
@@ -152,7 +149,7 @@ trait FlinkClientTrait extends Logger {
flinkConfig.safeSet(retainedOption,
flinkDefaultConfiguration.get(retainedOption))
}
- // set savepoint parameter
+ // 2) set savepoint parameter
if (StringUtils.isNotBlank(submitRequest.savePoint)) {
flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH,
submitRequest.savePoint)
flinkConfig.setBoolean(
@@ -166,30 +163,28 @@ trait FlinkClientTrait extends Logger {
}
}
- flinkConfig
- }
-
- private[this] def setJvmOptions(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration): Unit = {
+ // 4) set env.xx.opts parameter
if (MapUtils.isNotEmpty(submitRequest.properties)) {
- submitRequest.properties.foreach(
- x => {
- val k = x._1.trim
- val v = x._2.toString
- if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
- }
- })
+ // file.encoding...
+ if (submitRequest.hasProp(CoreOptions.FLINK_JVM_OPTIONS.key())) {
+ val jvmOpt =
submitRequest.getProp(CoreOptions.FLINK_JVM_OPTIONS.key()).toString
+ if (!jvmOpt.contains("-Dfile.encoding=")) {
+ // set default file.encoding
+ val opt = s"-Dfile.encoding=UTF-8 $jvmOpt"
+ submitRequest.properties.put(CoreOptions.FLINK_JVM_OPTIONS.key(),
opt)
+ }
+ }
+
+ submitRequest.properties
+ .filter(_._1.startsWith("env."))
+ .foreach(
+ x => {
+ logInfo(s"env opts: ${x._1}: ${x._2}")
+ flinkConfig.setString(x._1, x._2.toString)
+ })
}
+
+ flinkConfig
}
def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@@ -358,7 +353,7 @@ trait FlinkClientTrait extends Logger {
private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
if (submitRequest.hasProp(KEY_FLINK_PARALLELISM())) {
-
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
+ Integer.valueOf(submitRequest.getProp(KEY_FLINK_PARALLELISM()).toString)
} else {
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
.getInteger(CoreOptions.DEFAULT_PARALLELISM,
CoreOptions.DEFAULT_PARALLELISM.defaultValue())
@@ -415,8 +410,8 @@ trait FlinkClientTrait extends Logger {
if (MapUtils.isNotEmpty(submitRequest.properties)) {
submitRequest.properties.foreach {
key =>
- if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
- logInfo(s"submit application dynamicProperties: ${key._1}
:${key._2}")
+ if (!key._1.startsWith("env.")) {
+ logInfo(s"application dynamicProperties: ${key._1} :${key._2}")
array += s"-D${key._1}=${key._2}"
}
}