This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new c9a5499eb [Improve] submit flink job error message improvement (#3841)
c9a5499eb is described below

commit c9a5499ebadd234900c728851d67e497efc9873d
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 7 12:32:03 2024 +0800

    [Improve] submit flink job error message improvement (#3841)
    
    * [Improve] submit flink job error message improvement
    * [Improve] shell script minor improvement
---
 .../{streampark-console-config => }/config.yaml    |  0
 .../logback-spring.xml                             |  0
 helm/streampark/templates/configmap.yaml           |  4 +-
 .../src/main/assembly/bin/streampark.sh            |  6 +-
 .../console/base/config/SpringProperties.java      | 13 ++--
 .../flink/client/bean/SubmitRequest.scala          |  4 +
 .../impl/KubernetesNativeApplicationClient.scala   | 40 ++++------
 .../streampark/flink/client/impl/LocalClient.scala | 35 +++------
 .../flink/client/impl/YarnApplicationClient.scala  | 47 ++++++------
 .../flink/client/impl/YarnPerJobClient.scala       | 72 ++++++++---------
 .../flink/client/impl/YarnSessionClient.scala      | 51 +++++--------
 .../flink/client/trait/FlinkClientTrait.scala      | 89 +++++++++++++---------
 12 files changed, 168 insertions(+), 193 deletions(-)

diff --git a/helm/streampark/conf/streampark-console-config/config.yaml 
b/helm/streampark/conf/config.yaml
similarity index 100%
rename from helm/streampark/conf/streampark-console-config/config.yaml
rename to helm/streampark/conf/config.yaml
diff --git a/helm/streampark/conf/streampark-console-config/logback-spring.xml 
b/helm/streampark/conf/logback-spring.xml
similarity index 100%
rename from helm/streampark/conf/streampark-console-config/logback-spring.xml
rename to helm/streampark/conf/logback-spring.xml
diff --git a/helm/streampark/templates/configmap.yaml 
b/helm/streampark/templates/configmap.yaml
index 0ec13b8a9..8932eb680 100644
--- a/helm/streampark/templates/configmap.yaml
+++ b/helm/streampark/templates/configmap.yaml
@@ -25,14 +25,14 @@ metadata:
 data:
   config.yaml: |+
 {{- if .Values.streamParkDefaultConfiguration.append }}
-    {{- $.Files.Get "conf/streampark-console-config/config.yaml"  | nindent 4 
-}}
+    {{- $.Files.Get "conf/config.yaml"  | nindent 4 -}}
 {{- end }}
 {{- if index (.Values.streamParkDefaultConfiguration) "config.yaml" }}
     {{- index (.Values.streamParkDefaultConfiguration) "config.yaml" | nindent 
4 -}}
 {{- end }}
   logback-spring.xml: |+
 {{- if .Values.streamParkDefaultConfiguration.append }}
-    {{- $.Files.Get "conf/streampark-console-config/logback-spring.xml"  | 
nindent 4 -}}
+    {{- $.Files.Get "conf/logback-spring.xml"  | nindent 4 -}}
 {{- end }}
 {{- if index (.Values.streamParkDefaultConfiguration) "logback-spring.xml" }}
     {{- index (.Values.streamParkDefaultConfiguration) "logback-spring.xml" | 
nindent 4 -}}
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 5e0e550ce..dbb0f2153 100755
--- 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -361,8 +361,8 @@ start() {
    # shellcheck disable=SC2006
    local workspace=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml 
"streampark.workspace.local" "$CONFIG"`
    if [[ ! -d $workspace ]]; then
-     echo_r "ERROR: streampark.workspace.local: \"$workspace\" is invalid 
path, Please reconfigure in $CONFIG"
-     echo_r "NOTE: \"streampark.workspace.local\" Do not set under 
APP_HOME($APP_HOME). Set it to a secure directory outside of APP_HOME.  "
+     echo_r "ERROR: streampark.workspace.local: \"$workspace\" is an invalid 
path, please reconfigure in $CONFIG"
+     echo_r "NOTE: \"streampark.workspace.local\" should not be set under 
APP_HOME($APP_HOME) directory. Set it to a secure directory outside of 
APP_HOME."
      exit 1;
    fi
 
@@ -372,7 +372,7 @@ start() {
   fi
 
   if [[ "${HADOOP_HOME}"x == ""x ]]; then
-    echo_y "WARN: HADOOP_HOME is undefined on your system env,please check it."
+    echo_y "WARN: HADOOP_HOME is undefined on your system env."
   else
     echo_w "Using HADOOP_HOME:   ${HADOOP_HOME}"
   fi
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
index 82c3e877d..855330dd9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
@@ -40,9 +40,9 @@ public class SpringProperties {
     File oldConfig = getOldConfig();
     if (oldConfig != null) {
       log.warn(
-          "in the \"conf\" directory, found the \"application.yml\" file. The 
\"application.yml\" file is deprecated. "
-              + "For compatibility, this \"application.yml\" file will be used 
preferentially. The latest configuration file is \"config.yaml\". "
-              + "It is recommended to use \"config.yaml\". Note: 
\"application.yml\" will be completely deprecated in version 2.2.0. ");
+          "In the \"conf\" directory, found the \"application.yml\" file. 
Please be aware that the \"application.yml\" file is now deprecated. "
+              + "For compatibility reasons, this \"application.yml\" file will 
still be used as the preference. However, the latest configuration file is now 
\"config.yaml\", "
+              + "and it is recommended to use \"config.yaml\" instead. \nNOTE: 
the \"application.yml\" file will be completely deprecated in version 2.2.0.");
       SystemPropertyUtils.set("spring.config.location", 
oldConfig.getAbsolutePath());
       return new Properties();
     } else {
@@ -77,8 +77,9 @@ public class SpringProperties {
             springConfig.put("spring.datasource.driver-class-name", 
"com.mysql.jdbc.Driver");
           } catch (ClassNotFoundException e1) {
             throw new ExceptionInInitializerError(
-                "datasource.dialect is mysql, \"com.mysql.cj.jdbc.Driver\" and 
\"com.mysql.jdbc.Driver\" classes not found, Please ensure that the MySQL 
Connector/J can be found under $streampark/lib,\n"
-                    + "Notice: The MySQL Connector/J is incompatible with the 
Apache 2.0 license, You need to download and put it into $streampark/lib");
+                "The datasource.dialect is MySQL, but the classes 
\"com.mysql.cj.jdbc.Driver\" and \"com.mysql.jdbc.Driver\" not found. "
+                    + "Please ensure that the MySQL Connector/J is located 
under $streampark/lib.\n"
+                    + "Note: The MySQL Connector/J is not compatible with the 
Apache 2.0 license. You need to download it and place it into 
$streampark/lib.");
           }
         }
         break;
@@ -147,7 +148,7 @@ public class SpringProperties {
     if (StringUtils.isBlank(appHome)) {
       throw new ExceptionInInitializerError(
           String.format(
-              "[StreamPark] The system initialization check failed. If started 
local for development and debugging,"
+              "[StreamPark] The system initialization check has failed. If you 
started locally for development and debugging,"
                   + " please ensure the -D%s parameter is clearly specified,"
                   + " more detail: 
https://streampark.apache.org/docs/user-guide/deployment";,
               ConfigConst.KEY_APP_HOME()));
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index dd24a6c14..a74ba5119 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -98,6 +98,10 @@ case class SubmitRequest(
     }
   }
 
+  def hasProp(key: String): Boolean = properties.containsKey(key)
+
+  def getProp(key: String): Any = properties.get(key)
+
   private[this] def getParameterMap(prefix: String = ""): Map[String, String] 
= {
     if (this.appConf == null) {
       return Map.empty[String, String]
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index 7dcc250be..f8606590a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -59,33 +59,23 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
buildResult.flinkImageTag)
 
     // retrieve k8s cluster and submit flink job on application mode
-    var clusterDescriptor: KubernetesClusterDescriptor = null
-    var clusterClient: ClusterClient[String] = null
+    val (descriptor, clusterSpecification) = 
getK8sClusterDescriptorAndSpecification(flinkConfig)
+    val clusterDescriptor = descriptor
+    val applicationConfig = 
ApplicationConfiguration.fromConfiguration(flinkConfig)
+    val clusterClient = clusterDescriptor
+      .deployApplicationCluster(clusterSpecification, applicationConfig)
+      .getClusterClient
 
-    try {
-      val (descriptor, clusterSpecification) = 
getK8sClusterDescriptorAndSpecification(flinkConfig)
-      clusterDescriptor = descriptor
-      val applicationConfig = 
ApplicationConfiguration.fromConfiguration(flinkConfig)
-      clusterClient = clusterDescriptor
-        .deployApplicationCluster(clusterSpecification, applicationConfig)
-        .getClusterClient
+    val clusterId = clusterClient.getClusterId
+    val result = SubmitResponse(
+      clusterId,
+      flinkConfig.toMap,
+      submitRequest.jobId,
+      clusterClient.getWebInterfaceURL)
+    logInfo(s"[flink-submit] flink job has been submitted. 
${flinkConfIdentifierInfo(flinkConfig)}")
 
-      val clusterId = clusterClient.getClusterId
-      val result = SubmitResponse(
-        clusterId,
-        flinkConfig.toMap,
-        submitRequest.jobId,
-        clusterClient.getWebInterfaceURL)
-      logInfo(
-        s"[flink-submit] flink job has been submitted. 
${flinkConfIdentifierInfo(flinkConfig)}")
-      result
-    } catch {
-      case e: Exception =>
-        logError(s"submit flink job fail in ${submitRequest.executionMode} 
mode")
-        throw e
-    } finally {
-      Utils.close(clusterDescriptor, clusterClient)
-    }
+    closeSubmit(submitRequest, clusterDescriptor, clusterClient)
+    result
   }
 
   override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
index 380832fab..0658ea0ad 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
@@ -17,13 +17,11 @@
 
 package org.apache.streampark.flink.client.impl
 
-import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
 import org.apache.streampark.flink.client.bean._
 
 import org.apache.flink.client.deployment.executors.RemoteExecutor
-import org.apache.flink.client.program.{ClusterClient, MiniClusterClient, 
PackagedProgram}
-import org.apache.flink.client.program.MiniClusterClient.MiniClusterId
+import org.apache.flink.client.program.MiniClusterClient
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.minicluster.{MiniCluster, 
MiniClusterConfiguration}
 
@@ -43,28 +41,15 @@ object LocalClient extends FlinkClientTrait {
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
-    var packageProgram: PackagedProgram = null
-    var client: ClusterClient[MiniClusterId] = null
-    try {
-      // build JobGraph
-      val packageProgramJobGraph =
-        super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
-      packageProgram = packageProgramJobGraph._1
-      val jobGraph = packageProgramJobGraph._2
-      client = createLocalCluster(flinkConfig)
-      val jobId = client.submitJob(jobGraph).get().toString
-      SubmitResponse(jobId, flinkConfig.toMap, jobId, 
client.getWebInterfaceURL)
-    } catch {
-      case e: Exception =>
-        logError(s"submit flink job fail in ${submitRequest.executionMode} 
mode")
-        e.printStackTrace()
-        throw e
-    } finally {
-      if (submitRequest.safePackageProgram) {
-        Utils.close(packageProgram)
-      }
-      Utils.close(client)
-    }
+    // build JobGraph
+    val programJobGraph = super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
+    val packageProgram = programJobGraph._1
+    val jobGraph = programJobGraph._2
+    val client = createLocalCluster(flinkConfig)
+    val jobId = client.submitJob(jobGraph).get().toString
+    val resp = SubmitResponse(jobId, flinkConfig.toMap, jobId, 
client.getWebInterfaceURL)
+    closeSubmit(submitRequest, packageProgram, client)
+    resp
   }
 
   override def doTriggerSavepoint(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 8601c70a3..a3939b007 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -103,34 +103,31 @@ object YarnApplicationClient extends YarnClientTrait {
         val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
         val clientFactory =
           
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
+
+        val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
+        logInfo(s"""
+                   
|------------------------<<specification>>-------------------------
+                   |$clusterSpecification
+                   
|------------------------------------------------------------------
+                   |""".stripMargin)
+
+        val applicationConfiguration = 
ApplicationConfiguration.fromConfiguration(flinkConfig)
         val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
-        var clusterClient: ClusterClient[ApplicationId] = null
-        try {
-          val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
-          logInfo(s"""
-                     
|------------------------<<specification>>-------------------------
-                     |$clusterSpecification
-                     
|------------------------------------------------------------------
-                     |""".stripMargin)
-
-          val applicationConfiguration = 
ApplicationConfiguration.fromConfiguration(flinkConfig)
-          var applicationId: ApplicationId = null
-          var jobManagerUrl: String = null
-          clusterClient = clusterDescriptor
-            .deployApplicationCluster(clusterSpecification, 
applicationConfiguration)
-            .getClusterClient
-          applicationId = clusterClient.getClusterId
-          jobManagerUrl = clusterClient.getWebInterfaceURL
-          logInfo(s"""
-                     
|-------------------------<<applicationId>>------------------------
-                     |Flink Job Started: applicationId: $applicationId
-                     
|__________________________________________________________________
-                     |""".stripMargin)
+        val clusterClient = clusterDescriptor
+          .deployApplicationCluster(clusterSpecification, 
applicationConfiguration)
+          .getClusterClient
+        val applicationId = clusterClient.getClusterId
+        val jobManagerUrl = clusterClient.getWebInterfaceURL
+        logInfo(s"""
+                   
|-------------------------<<applicationId>>------------------------
+                   |Flink Job Started: applicationId: $applicationId
+                   
|__________________________________________________________________
+                   |""".stripMargin)
 
+        val resp =
           SubmitResponse(applicationId.toString, flinkConfig.toMap, 
jobManagerUrl = jobManagerUrl)
-        } finally {
-          Utils.close(clusterDescriptor, clusterClient)
-        }
+        closeSubmit(submitRequest, clusterDescriptor, clusterClient)
+        resp
       }
     })
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 71499e290..469cccbf7 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -17,7 +17,6 @@
 
 package org.apache.streampark.flink.client.impl
 
-import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.util.FlinkUtils
@@ -63,8 +62,6 @@ object YarnPerJobClient extends YarnClientTrait {
     val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
     val clientFactory =
       
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-    var packagedProgram: PackagedProgram = null
-    var clusterClient: ClusterClient[ApplicationId] = null
 
     val clusterDescriptor = {
       val clusterDescriptor =
@@ -75,49 +72,44 @@ object YarnPerJobClient extends YarnClientTrait {
       clusterDescriptor
     }
 
-    try {
-      clusterClient = {
-        val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
-        logInfo(s"""
-                   
|------------------------<<specification>>-------------------------
-                   |$clusterSpecification
-                   
|------------------------------------------------------------------
-                   |""".stripMargin)
-
-        val packageProgramJobGraph =
-          super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
-        packagedProgram = packageProgramJobGraph._1
-        val jobGraph = packageProgramJobGraph._2
-
-        logInfo(s"""
-                   
|-------------------------<<applicationId>>------------------------
-                   |jobGraph getJobID: ${jobGraph.getJobID.toString}
-                   
|__________________________________________________________________
-                   |""".stripMargin)
-        deployInternal(
-          clusterDescriptor,
-          clusterSpecification,
-          submitRequest.effectiveAppName,
-          classOf[YarnJobClusterEntrypoint].getName,
-          jobGraph,
-          true).getClusterClient
-
-      }
-      val applicationId = clusterClient.getClusterId
-      val jobManagerUrl = clusterClient.getWebInterfaceURL
+    var packagedProgram: PackagedProgram = null
+    val clusterClient = {
+      val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
+      logInfo(s"""
+                 
|------------------------<<specification>>-------------------------
+                 |$clusterSpecification
+                 
|------------------------------------------------------------------
+                 |""".stripMargin)
+
+      val programJobGraph = super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
+      packagedProgram = programJobGraph._1
+      val jobGraph = programJobGraph._2
+
       logInfo(s"""
                  
|-------------------------<<applicationId>>------------------------
-                 |Flink Job Started: applicationId: $applicationId
+                 |jobGraph getJobID: ${jobGraph.getJobID.toString}
                  
|__________________________________________________________________
                  |""".stripMargin)
+      deployInternal(
+        clusterDescriptor,
+        clusterSpecification,
+        submitRequest.effectiveAppName,
+        classOf[YarnJobClusterEntrypoint].getName,
+        jobGraph,
+        true).getClusterClient
+    }
+    val applicationId = clusterClient.getClusterId
+    val jobManagerUrl = clusterClient.getWebInterfaceURL
+    logInfo(s"""
+               
|-------------------------<<applicationId>>------------------------
+               |Flink Job Started: applicationId: $applicationId
+               
|__________________________________________________________________
+               |""".stripMargin)
 
+    val resp =
       SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl 
= jobManagerUrl)
-    } finally {
-      if (submitRequest.safePackageProgram) {
-        Utils.close(packagedProgram)
-      }
-      Utils.close(clusterClient, clusterDescriptor)
-    }
+    closeSubmit(submitRequest, packagedProgram, clusterClient, 
clusterDescriptor)
+    resp
   }
 
   override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index e901a30c1..97370a268 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -30,7 +30,6 @@ import org.apache.flink.yarn.YarnClusterDescriptor
 import org.apache.flink.yarn.configuration.{YarnConfigOptions, 
YarnDeploymentTarget}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.{ApplicationId, 
FinalApplicationStatus}
-import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.ConverterUtils
 
 import java.util
@@ -102,42 +101,28 @@ object YarnSessionClient extends YarnClientTrait {
                |""".stripMargin)
   }
 
+  @throws[Exception]
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
-    var clusterDescriptor: YarnClusterDescriptor = null
-    var packageProgram: PackagedProgram = null
-    var client: ClusterClient[ApplicationId] = null
-    try {
-      val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
-      clusterDescriptor = yarnClusterDescriptor._2
-      val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
-      val packageProgramJobGraph =
-        super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
-      packageProgram = packageProgramJobGraph._1
-      val jobGraph = packageProgramJobGraph._2
-
-      client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
-      val jobId = client.submitJob(jobGraph).get().toString
-      val jobManagerUrl = client.getWebInterfaceURL
+    val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
+    val clusterDescriptor = yarnClusterDescriptor._2
+    val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
+    val programJobGraph = super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
+    val packageProgram = programJobGraph._1
+    val jobGraph = programJobGraph._2
+    val client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
+    val jobId = client.submitJob(jobGraph).get().toString
+    val jobManagerUrl = client.getWebInterfaceURL
 
-      logInfo(s"""
-                 
|-------------------------<<applicationId>>------------------------
-                 |Flink Job Started: jobId: $jobId , applicationId: 
${yarnClusterId.toString}
-                 
|__________________________________________________________________
-                 |""".stripMargin)
-      SubmitResponse(yarnClusterId.toString, flinkConfig.toMap, jobId, 
jobManagerUrl)
-    } catch {
-      case e: Exception =>
-        logError(s"submit flink job fail in ${submitRequest.executionMode} 
mode")
-        e.printStackTrace()
-        throw e
-    } finally {
-      if (submitRequest.safePackageProgram) {
-        Utils.close(packageProgram)
-      }
-      Utils.close(client, clusterDescriptor)
-    }
+    logInfo(s"""
+               
|-------------------------<<applicationId>>------------------------
+               |Flink Job Started: jobId: $jobId , applicationId: 
${yarnClusterId.toString}
+               
|__________________________________________________________________
+               |""".stripMargin)
+    val resp = SubmitResponse(yarnClusterId.toString, flinkConfig.toMap, 
jobId, jobManagerUrl)
+    closeSubmit(submitRequest, packageProgram, client, clusterDescriptor)
+    resp
   }
 
   override def doCancel(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index f1b171310..c9bc9b59d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -78,7 +78,25 @@ trait FlinkClientTrait extends Logger {
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
 
+    val flinkConfig = prepareConfig(submitRequest)
+
+    setConfig(submitRequest, flinkConfig)
+
+    Try(doSubmit(submitRequest, flinkConfig)) match {
+      case Success(resp) => resp
+      case Failure(e) =>
+        logError(
+          s"flink job ${submitRequest.appName} start failed, " +
+            s"executionMode: ${submitRequest.executionMode.getName}, " +
+            s"detail: ${Utils.stringifyException(e)}")
+        throw e
+    }
+  }
+
+  private[this] def prepareConfig(submitRequest: SubmitRequest): Configuration 
= {
+
     val (commandLine, flinkConfig) = 
getCommandLineAndFlinkConfig(submitRequest)
+
     if (submitRequest.userJarFile != null) {
       val uri = 
PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
       val programOptions = ProgramOptions.create(commandLine)
@@ -88,7 +106,7 @@ trait FlinkClientTrait extends Logger {
       executionParameters.applyToConfiguration(flinkConfig)
     }
 
-    // set common parameter
+    // 1) set common parameter
     flinkConfig
       .safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
       .safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
@@ -97,9 +115,7 @@ trait FlinkClientTrait extends Logger {
       .safeSet(ApplicationConfiguration.APPLICATION_ARGS, 
extractProgramArgs(submitRequest))
       .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
submitRequest.jobId)
 
-    if (
-      
!submitRequest.properties.containsKey(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())
-    ) {
+    if 
(!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
       val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
         submitRequest.flinkVersion.flinkHome)
       // state.checkpoints.num-retained
@@ -107,44 +123,36 @@ trait FlinkClientTrait extends Logger {
       flinkConfig.safeSet(retainedOption, 
flinkDefaultConfiguration.get(retainedOption))
     }
 
-    // set savepoint parameter
-    if (submitRequest.savePoint != null) {
+    // 2) set savepoint parameter
+    if (StringUtils.isNotBlank(submitRequest.savePoint)) {
       flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, 
submitRequest.savePoint)
       flinkConfig.setBoolean(
         SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
         submitRequest.allowNonRestoredState)
     }
 
-    // set JVMOptions..
-    setJvmOptions(submitRequest, flinkConfig)
-
-    setConfig(submitRequest, flinkConfig)
-
-    doSubmit(submitRequest, flinkConfig)
-
-  }
-
-  private[this] def setJvmOptions(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration): Unit = {
+    // 4) set env.xx.opts parameter
     if (MapUtils.isNotEmpty(submitRequest.properties)) {
-      submitRequest.properties.foreach(
-        x => {
-          val k = x._1.trim
-          val v = x._2.toString
-          if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
-          } else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
-            flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
-          }
-        })
+      // file.encoding...
+      if (submitRequest.hasProp(CoreOptions.FLINK_JVM_OPTIONS.key())) {
+        val jvmOpt = 
submitRequest.getProp(CoreOptions.FLINK_JVM_OPTIONS.key()).toString
+        if (!jvmOpt.contains("-Dfile.encoding=")) {
+          // set default file.encoding
+          val opt = s"-Dfile.encoding=UTF-8 $jvmOpt"
+          submitRequest.properties.put(CoreOptions.FLINK_JVM_OPTIONS.key(), 
opt)
+        }
+      }
+
+      submitRequest.properties
+        .filter(_._1.startsWith("env."))
+        .foreach(
+          x => {
+            logInfo(s"env opts:  ${x._1}: ${x._2}")
+            flinkConfig.setString(x._1, x._2.toString)
+          })
     }
+
+    flinkConfig
   }
 
   def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@@ -524,4 +532,17 @@ trait FlinkClientTrait extends Logger {
     clientWrapper.triggerSavepoint(jobID, savepointPath).get()
   }
 
+  def closeSubmit(submitRequest: SubmitRequest, close: AutoCloseable*): Unit = 
{
+    close.foreach(
+      x => {
+        if (x.isInstanceOf[PackagedProgram]) {
+          if (submitRequest.safePackageProgram) {
+            Utils.close(x)
+          }
+        } else {
+          Utils.close(x)
+        }
+      })
+  }
+
 }

Reply via email to