This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7a511d5b1 [Improve] submit flink job default file.encoding parameter 
improvement (#3838)
7a511d5b1 is described below

commit 7a511d5b161e1fcbe4ddbd8b154e4f1cb176178c
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 6 19:47:33 2024 +0800

    [Improve] submit flink job default file.encoding parameter improvement 
(#3838)
    
    * [Improve]  flink on yarn-per-job mode bug fixed #3761
    * [Improve] FlinkClientTrait improvement
    * [Improve] SubmitRequest minor improvement
    * [Improve] submit flink job file.encoding parameter improvement
---
 .../common/enums/FlinkDevelopmentMode.java         |  1 +
 .../flink/client/bean/SubmitRequest.scala          |  2 +
 .../flink/client/trait/FlinkClientTrait.scala      | 55 ++++++++++------------
 3 files changed, 28 insertions(+), 30 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkDevelopmentMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkDevelopmentMode.java
index e94881a6c..107d000af 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkDevelopmentMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkDevelopmentMode.java
@@ -34,6 +34,7 @@ public enum FlinkDevelopmentMode {
 
     /** Py flink Mode */
     PYFLINK("Python Flink", 3);
+
     private final String name;
 
     private final Integer mode;
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index a1ab7df30..b8be52440 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -113,6 +113,8 @@ case class SubmitRequest(
 
   def hasProp(key: String): Boolean = properties.containsKey(key)
 
+  def getProp(key: String): Any = properties.get(key)
+
   private[this] def getParameterMap(prefix: String = ""): Map[String, String] 
= {
     if (this.appConf == null) {
       return Map.empty[String, String]
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 4cebe8d4e..9ffafebc0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -83,9 +83,6 @@ trait FlinkClientTrait extends Logger {
     // prepare flink config
     val flinkConfig = prepareConfig(submitRequest)
 
-    // set JVMOptions..
-    setJvmOptions(submitRequest, flinkConfig)
-
     setConfig(submitRequest, flinkConfig)
 
     Try(doSubmit(submitRequest, flinkConfig)) match {
@@ -135,7 +132,7 @@ trait FlinkClientTrait extends Logger {
         }
     }
 
-    // set common parameter
+    // 1) set common parameter
     flinkConfig
       .safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
       .safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
@@ -152,7 +149,7 @@ trait FlinkClientTrait extends Logger {
       flinkConfig.safeSet(retainedOption, 
flinkDefaultConfiguration.get(retainedOption))
     }
 
-    // set savepoint parameter
+    // 2) set savepoint parameter
     if (StringUtils.isNotBlank(submitRequest.savePoint)) {
       flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, 
submitRequest.savePoint)
       flinkConfig.setBoolean(
@@ -166,30 +163,28 @@ trait FlinkClientTrait extends Logger {
       }
     }
 
-    flinkConfig
-  }
-
-  private[this] def setJvmOptions(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration): Unit = {
+    // 4) set env.xx.opts parameter
     if (MapUtils.isNotEmpty(submitRequest.properties)) {
-      submitRequest.properties.foreach(
-        x => {
-          val k = x._1.trim
-          val v = x._2.toString
-          if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
-          }
-        })
+      // file.encoding...
+      if (submitRequest.hasProp(CoreOptions.FLINK_JVM_OPTIONS.key())) {
+        val jvmOpt = 
submitRequest.getProp(CoreOptions.FLINK_JVM_OPTIONS.key()).toString
+        if (!jvmOpt.contains("-Dfile.encoding=")) {
+          // set default file.encoding
+          val opt = s"-Dfile.encoding=UTF-8 $jvmOpt"
+          submitRequest.properties.put(CoreOptions.FLINK_JVM_OPTIONS.key(), 
opt)
+        }
+      }
+
+      submitRequest.properties
+        .filter(_._1.startsWith("env."))
+        .foreach(
+          x => {
+            logInfo(s"env opts:  ${x._1}: ${x._2}")
+            flinkConfig.setString(x._1, x._2.toString)
+          })
     }
+
+    flinkConfig
   }
 
   def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@@ -358,7 +353,7 @@ trait FlinkClientTrait extends Logger {
 
   private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
     if (submitRequest.hasProp(KEY_FLINK_PARALLELISM())) {
-      
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
+      Integer.valueOf(submitRequest.getProp(KEY_FLINK_PARALLELISM()).toString)
     } else {
       getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
         .getInteger(CoreOptions.DEFAULT_PARALLELISM, 
CoreOptions.DEFAULT_PARALLELISM.defaultValue())
@@ -415,8 +410,8 @@ trait FlinkClientTrait extends Logger {
       if (MapUtils.isNotEmpty(submitRequest.properties)) {
         submitRequest.properties.foreach {
           key =>
-            if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
-              logInfo(s"submit application dynamicProperties:  ${key._1} 
:${key._2}")
+            if (!key._1.startsWith("env.")) {
+              logInfo(s"application dynamicProperties:  ${key._1} :${key._2}")
               array += s"-D${key._1}=${key._2}"
             }
         }

Reply via email to