This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new c9a5499eb [Improve] submit flink job error message improvement (#3841)
c9a5499eb is described below
commit c9a5499ebadd234900c728851d67e497efc9873d
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 7 12:32:03 2024 +0800
[Improve] submit flink job error message improvement (#3841)
* [Improve] submit flink job error message improvement
* [Improve] shell script minor improvement
---
.../{streampark-console-config => }/config.yaml | 0
.../logback-spring.xml | 0
helm/streampark/templates/configmap.yaml | 4 +-
.../src/main/assembly/bin/streampark.sh | 6 +-
.../console/base/config/SpringProperties.java | 13 ++--
.../flink/client/bean/SubmitRequest.scala | 4 +
.../impl/KubernetesNativeApplicationClient.scala | 40 ++++------
.../streampark/flink/client/impl/LocalClient.scala | 35 +++------
.../flink/client/impl/YarnApplicationClient.scala | 47 ++++++------
.../flink/client/impl/YarnPerJobClient.scala | 72 ++++++++---------
.../flink/client/impl/YarnSessionClient.scala | 51 +++++--------
.../flink/client/trait/FlinkClientTrait.scala | 89 +++++++++++++---------
12 files changed, 168 insertions(+), 193 deletions(-)
diff --git a/helm/streampark/conf/streampark-console-config/config.yaml
b/helm/streampark/conf/config.yaml
similarity index 100%
rename from helm/streampark/conf/streampark-console-config/config.yaml
rename to helm/streampark/conf/config.yaml
diff --git a/helm/streampark/conf/streampark-console-config/logback-spring.xml
b/helm/streampark/conf/logback-spring.xml
similarity index 100%
rename from helm/streampark/conf/streampark-console-config/logback-spring.xml
rename to helm/streampark/conf/logback-spring.xml
diff --git a/helm/streampark/templates/configmap.yaml
b/helm/streampark/templates/configmap.yaml
index 0ec13b8a9..8932eb680 100644
--- a/helm/streampark/templates/configmap.yaml
+++ b/helm/streampark/templates/configmap.yaml
@@ -25,14 +25,14 @@ metadata:
data:
config.yaml: |+
{{- if .Values.streamParkDefaultConfiguration.append }}
- {{- $.Files.Get "conf/streampark-console-config/config.yaml" | nindent 4
-}}
+ {{- $.Files.Get "conf/config.yaml" | nindent 4 -}}
{{- end }}
{{- if index (.Values.streamParkDefaultConfiguration) "config.yaml" }}
{{- index (.Values.streamParkDefaultConfiguration) "config.yaml" | nindent
4 -}}
{{- end }}
logback-spring.xml: |+
{{- if .Values.streamParkDefaultConfiguration.append }}
- {{- $.Files.Get "conf/streampark-console-config/logback-spring.xml" |
nindent 4 -}}
+ {{- $.Files.Get "conf/logback-spring.xml" | nindent 4 -}}
{{- end }}
{{- if index (.Values.streamParkDefaultConfiguration) "logback-spring.xml" }}
{{- index (.Values.streamParkDefaultConfiguration) "logback-spring.xml" |
nindent 4 -}}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 5e0e550ce..dbb0f2153 100755
---
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -361,8 +361,8 @@ start() {
# shellcheck disable=SC2006
local workspace=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml
"streampark.workspace.local" "$CONFIG"`
if [[ ! -d $workspace ]]; then
- echo_r "ERROR: streampark.workspace.local: \"$workspace\" is invalid
path, Please reconfigure in $CONFIG"
- echo_r "NOTE: \"streampark.workspace.local\" Do not set under
APP_HOME($APP_HOME). Set it to a secure directory outside of APP_HOME. "
+ echo_r "ERROR: streampark.workspace.local: \"$workspace\" is an invalid
path, please reconfigure in $CONFIG"
+ echo_r "NOTE: \"streampark.workspace.local\" should not be set under
APP_HOME($APP_HOME) directory. Set it to a secure directory outside of
APP_HOME."
exit 1;
fi
@@ -372,7 +372,7 @@ start() {
fi
if [[ "${HADOOP_HOME}"x == ""x ]]; then
- echo_y "WARN: HADOOP_HOME is undefined on your system env,please check it."
+ echo_y "WARN: HADOOP_HOME is undefined on your system env."
else
echo_w "Using HADOOP_HOME: ${HADOOP_HOME}"
fi
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
index 82c3e877d..855330dd9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
@@ -40,9 +40,9 @@ public class SpringProperties {
File oldConfig = getOldConfig();
if (oldConfig != null) {
log.warn(
- "in the \"conf\" directory, found the \"application.yml\" file. The
\"application.yml\" file is deprecated. "
- + "For compatibility, this \"application.yml\" file will be used
preferentially. The latest configuration file is \"config.yaml\". "
- + "It is recommended to use \"config.yaml\". Note:
\"application.yml\" will be completely deprecated in version 2.2.0. ");
+ "In the \"conf\" directory, found the \"application.yml\" file.
Please be aware that the \"application.yml\" file is now deprecated. "
+ + "For compatibility reasons, this \"application.yml\" file will
still be used as the preference. However, the latest configuration file is now
\"config.yaml\", "
+ + "and it is recommended to use \"config.yaml\" instead. \nNOTE:
the \"application.yml\" file will be completely deprecated in version 2.2.0.");
SystemPropertyUtils.set("spring.config.location",
oldConfig.getAbsolutePath());
return new Properties();
} else {
@@ -77,8 +77,9 @@ public class SpringProperties {
springConfig.put("spring.datasource.driver-class-name",
"com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e1) {
throw new ExceptionInInitializerError(
- "datasource.dialect is mysql, \"com.mysql.cj.jdbc.Driver\" and
\"com.mysql.jdbc.Driver\" classes not found, Please ensure that the MySQL
Connector/J can be found under $streampark/lib,\n"
- + "Notice: The MySQL Connector/J is incompatible with the
Apache 2.0 license, You need to download and put it into $streampark/lib");
+ "The datasource.dialect is MySQL, but the classes
\"com.mysql.cj.jdbc.Driver\" and \"com.mysql.jdbc.Driver\" not found. "
+ + "Please ensure that the MySQL Connector/J is located
under $streampark/lib.\n"
+ + "Note: The MySQL Connector/J is not compatible with the
Apache 2.0 license. You need to download it and place it into
$streampark/lib.");
}
}
break;
@@ -147,7 +148,7 @@ public class SpringProperties {
if (StringUtils.isBlank(appHome)) {
throw new ExceptionInInitializerError(
String.format(
- "[StreamPark] The system initialization check failed. If started
local for development and debugging,"
+ "[StreamPark] The system initialization check has failed. If you
started locally for development and debugging,"
+ " please ensure the -D%s parameter is clearly specified,"
+ " more detail:
https://streampark.apache.org/docs/user-guide/deployment",
ConfigConst.KEY_APP_HOME()));
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index dd24a6c14..a74ba5119 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -98,6 +98,10 @@ case class SubmitRequest(
}
}
+ def hasProp(key: String): Boolean = properties.containsKey(key)
+
+ def getProp(key: String): Any = properties.get(key)
+
private[this] def getParameterMap(prefix: String = ""): Map[String, String]
= {
if (this.appConf == null) {
return Map.empty[String, String]
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index 7dcc250be..f8606590a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -59,33 +59,23 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
buildResult.flinkImageTag)
// retrieve k8s cluster and submit flink job on application mode
- var clusterDescriptor: KubernetesClusterDescriptor = null
- var clusterClient: ClusterClient[String] = null
+ val (descriptor, clusterSpecification) =
getK8sClusterDescriptorAndSpecification(flinkConfig)
+ val clusterDescriptor = descriptor
+ val applicationConfig =
ApplicationConfiguration.fromConfiguration(flinkConfig)
+ val clusterClient = clusterDescriptor
+ .deployApplicationCluster(clusterSpecification, applicationConfig)
+ .getClusterClient
- try {
- val (descriptor, clusterSpecification) =
getK8sClusterDescriptorAndSpecification(flinkConfig)
- clusterDescriptor = descriptor
- val applicationConfig =
ApplicationConfiguration.fromConfiguration(flinkConfig)
- clusterClient = clusterDescriptor
- .deployApplicationCluster(clusterSpecification, applicationConfig)
- .getClusterClient
+ val clusterId = clusterClient.getClusterId
+ val result = SubmitResponse(
+ clusterId,
+ flinkConfig.toMap,
+ submitRequest.jobId,
+ clusterClient.getWebInterfaceURL)
+ logInfo(s"[flink-submit] flink job has been submitted.
${flinkConfIdentifierInfo(flinkConfig)}")
- val clusterId = clusterClient.getClusterId
- val result = SubmitResponse(
- clusterId,
- flinkConfig.toMap,
- submitRequest.jobId,
- clusterClient.getWebInterfaceURL)
- logInfo(
- s"[flink-submit] flink job has been submitted.
${flinkConfIdentifierInfo(flinkConfig)}")
- result
- } catch {
- case e: Exception =>
- logError(s"submit flink job fail in ${submitRequest.executionMode}
mode")
- throw e
- } finally {
- Utils.close(clusterDescriptor, clusterClient)
- }
+ closeSubmit(submitRequest, clusterDescriptor, clusterClient)
+ result
}
override def doCancel(cancelRequest: CancelRequest, flinkConf:
Configuration): CancelResponse = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
index 380832fab..0658ea0ad 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
@@ -17,13 +17,11 @@
package org.apache.streampark.flink.client.impl
-import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.flink.client.deployment.executors.RemoteExecutor
-import org.apache.flink.client.program.{ClusterClient, MiniClusterClient,
PackagedProgram}
-import org.apache.flink.client.program.MiniClusterClient.MiniClusterId
+import org.apache.flink.client.program.MiniClusterClient
import org.apache.flink.configuration._
import org.apache.flink.runtime.minicluster.{MiniCluster,
MiniClusterConfiguration}
@@ -43,28 +41,15 @@ object LocalClient extends FlinkClientTrait {
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
- var packageProgram: PackagedProgram = null
- var client: ClusterClient[MiniClusterId] = null
- try {
- // build JobGraph
- val packageProgramJobGraph =
- super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
- packageProgram = packageProgramJobGraph._1
- val jobGraph = packageProgramJobGraph._2
- client = createLocalCluster(flinkConfig)
- val jobId = client.submitJob(jobGraph).get().toString
- SubmitResponse(jobId, flinkConfig.toMap, jobId,
client.getWebInterfaceURL)
- } catch {
- case e: Exception =>
- logError(s"submit flink job fail in ${submitRequest.executionMode}
mode")
- e.printStackTrace()
- throw e
- } finally {
- if (submitRequest.safePackageProgram) {
- Utils.close(packageProgram)
- }
- Utils.close(client)
- }
+ // build JobGraph
+ val programJobGraph = super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
+ val packageProgram = programJobGraph._1
+ val jobGraph = programJobGraph._2
+ val client = createLocalCluster(flinkConfig)
+ val jobId = client.submitJob(jobGraph).get().toString
+ val resp = SubmitResponse(jobId, flinkConfig.toMap, jobId,
client.getWebInterfaceURL)
+ closeSubmit(submitRequest, packageProgram, client)
+ resp
}
override def doTriggerSavepoint(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 8601c70a3..a3939b007 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -103,34 +103,31 @@ object YarnApplicationClient extends YarnClientTrait {
val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
val clientFactory =
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
+
+ val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
+ logInfo(s"""
+
|------------------------<<specification>>-------------------------
+ |$clusterSpecification
+
|------------------------------------------------------------------
+ |""".stripMargin)
+
+ val applicationConfiguration =
ApplicationConfiguration.fromConfiguration(flinkConfig)
val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
- var clusterClient: ClusterClient[ApplicationId] = null
- try {
- val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
- logInfo(s"""
-
|------------------------<<specification>>-------------------------
- |$clusterSpecification
-
|------------------------------------------------------------------
- |""".stripMargin)
-
- val applicationConfiguration =
ApplicationConfiguration.fromConfiguration(flinkConfig)
- var applicationId: ApplicationId = null
- var jobManagerUrl: String = null
- clusterClient = clusterDescriptor
- .deployApplicationCluster(clusterSpecification,
applicationConfiguration)
- .getClusterClient
- applicationId = clusterClient.getClusterId
- jobManagerUrl = clusterClient.getWebInterfaceURL
- logInfo(s"""
-
|-------------------------<<applicationId>>------------------------
- |Flink Job Started: applicationId: $applicationId
-
|__________________________________________________________________
- |""".stripMargin)
+ val clusterClient = clusterDescriptor
+ .deployApplicationCluster(clusterSpecification,
applicationConfiguration)
+ .getClusterClient
+ val applicationId = clusterClient.getClusterId
+ val jobManagerUrl = clusterClient.getWebInterfaceURL
+ logInfo(s"""
+
|-------------------------<<applicationId>>------------------------
+ |Flink Job Started: applicationId: $applicationId
+
|__________________________________________________________________
+ |""".stripMargin)
+ val resp =
SubmitResponse(applicationId.toString, flinkConfig.toMap,
jobManagerUrl = jobManagerUrl)
- } finally {
- Utils.close(clusterDescriptor, clusterClient)
- }
+ closeSubmit(submitRequest, clusterDescriptor, clusterClient)
+ resp
}
})
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 71499e290..469cccbf7 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -17,7 +17,6 @@
package org.apache.streampark.flink.client.impl
-import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.util.FlinkUtils
@@ -63,8 +62,6 @@ object YarnPerJobClient extends YarnClientTrait {
val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
val clientFactory =
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
- var packagedProgram: PackagedProgram = null
- var clusterClient: ClusterClient[ApplicationId] = null
val clusterDescriptor = {
val clusterDescriptor =
@@ -75,49 +72,44 @@ object YarnPerJobClient extends YarnClientTrait {
clusterDescriptor
}
- try {
- clusterClient = {
- val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
- logInfo(s"""
-
|------------------------<<specification>>-------------------------
- |$clusterSpecification
-
|------------------------------------------------------------------
- |""".stripMargin)
-
- val packageProgramJobGraph =
- super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
- packagedProgram = packageProgramJobGraph._1
- val jobGraph = packageProgramJobGraph._2
-
- logInfo(s"""
-
|-------------------------<<applicationId>>------------------------
- |jobGraph getJobID: ${jobGraph.getJobID.toString}
-
|__________________________________________________________________
- |""".stripMargin)
- deployInternal(
- clusterDescriptor,
- clusterSpecification,
- submitRequest.effectiveAppName,
- classOf[YarnJobClusterEntrypoint].getName,
- jobGraph,
- true).getClusterClient
-
- }
- val applicationId = clusterClient.getClusterId
- val jobManagerUrl = clusterClient.getWebInterfaceURL
+ var packagedProgram: PackagedProgram = null
+ val clusterClient = {
+ val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
+ logInfo(s"""
+
|------------------------<<specification>>-------------------------
+ |$clusterSpecification
+
|------------------------------------------------------------------
+ |""".stripMargin)
+
+ val programJobGraph = super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
+ packagedProgram = programJobGraph._1
+ val jobGraph = programJobGraph._2
+
logInfo(s"""
|-------------------------<<applicationId>>------------------------
- |Flink Job Started: applicationId: $applicationId
+ |jobGraph getJobID: ${jobGraph.getJobID.toString}
|__________________________________________________________________
|""".stripMargin)
+ deployInternal(
+ clusterDescriptor,
+ clusterSpecification,
+ submitRequest.effectiveAppName,
+ classOf[YarnJobClusterEntrypoint].getName,
+ jobGraph,
+ true).getClusterClient
+ }
+ val applicationId = clusterClient.getClusterId
+ val jobManagerUrl = clusterClient.getWebInterfaceURL
+ logInfo(s"""
+
|-------------------------<<applicationId>>------------------------
+ |Flink Job Started: applicationId: $applicationId
+
|__________________________________________________________________
+ |""".stripMargin)
+ val resp =
SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl
= jobManagerUrl)
- } finally {
- if (submitRequest.safePackageProgram) {
- Utils.close(packagedProgram)
- }
- Utils.close(clusterClient, clusterDescriptor)
- }
+ closeSubmit(submitRequest, packagedProgram, clusterClient,
clusterDescriptor)
+ resp
}
override def doCancel(cancelRequest: CancelRequest, flinkConf:
Configuration): CancelResponse = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index e901a30c1..97370a268 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -30,7 +30,6 @@ import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.flink.yarn.configuration.{YarnConfigOptions,
YarnDeploymentTarget}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.{ApplicationId,
FinalApplicationStatus}
-import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.ConverterUtils
import java.util
@@ -102,42 +101,28 @@ object YarnSessionClient extends YarnClientTrait {
|""".stripMargin)
}
+ @throws[Exception]
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
- var clusterDescriptor: YarnClusterDescriptor = null
- var packageProgram: PackagedProgram = null
- var client: ClusterClient[ApplicationId] = null
- try {
- val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
- clusterDescriptor = yarnClusterDescriptor._2
- val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
- val packageProgramJobGraph =
- super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
- packageProgram = packageProgramJobGraph._1
- val jobGraph = packageProgramJobGraph._2
-
- client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
- val jobId = client.submitJob(jobGraph).get().toString
- val jobManagerUrl = client.getWebInterfaceURL
+ val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
+ val clusterDescriptor = yarnClusterDescriptor._2
+ val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
+ val programJobGraph = super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
+ val packageProgram = programJobGraph._1
+ val jobGraph = programJobGraph._2
+ val client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
+ val jobId = client.submitJob(jobGraph).get().toString
+ val jobManagerUrl = client.getWebInterfaceURL
- logInfo(s"""
-
|-------------------------<<applicationId>>------------------------
- |Flink Job Started: jobId: $jobId , applicationId:
${yarnClusterId.toString}
-
|__________________________________________________________________
- |""".stripMargin)
- SubmitResponse(yarnClusterId.toString, flinkConfig.toMap, jobId,
jobManagerUrl)
- } catch {
- case e: Exception =>
- logError(s"submit flink job fail in ${submitRequest.executionMode}
mode")
- e.printStackTrace()
- throw e
- } finally {
- if (submitRequest.safePackageProgram) {
- Utils.close(packageProgram)
- }
- Utils.close(client, clusterDescriptor)
- }
+ logInfo(s"""
+
|-------------------------<<applicationId>>------------------------
+ |Flink Job Started: jobId: $jobId , applicationId:
${yarnClusterId.toString}
+
|__________________________________________________________________
+ |""".stripMargin)
+ val resp = SubmitResponse(yarnClusterId.toString, flinkConfig.toMap,
jobId, jobManagerUrl)
+ closeSubmit(submitRequest, packageProgram, client, clusterDescriptor)
+ resp
}
override def doCancel(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index f1b171310..c9bc9b59d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -78,7 +78,25 @@ trait FlinkClientTrait extends Logger {
|-------------------------------------------------------------------------------------------
|""".stripMargin)
+ val flinkConfig = prepareConfig(submitRequest)
+
+ setConfig(submitRequest, flinkConfig)
+
+ Try(doSubmit(submitRequest, flinkConfig)) match {
+ case Success(resp) => resp
+ case Failure(e) =>
+ logError(
+ s"flink job ${submitRequest.appName} start failed, " +
+ s"executionMode: ${submitRequest.executionMode.getName}, " +
+ s"detail: ${Utils.stringifyException(e)}")
+ throw e
+ }
+ }
+
+ private[this] def prepareConfig(submitRequest: SubmitRequest): Configuration
= {
+
val (commandLine, flinkConfig) =
getCommandLineAndFlinkConfig(submitRequest)
+
if (submitRequest.userJarFile != null) {
val uri =
PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
val programOptions = ProgramOptions.create(commandLine)
@@ -88,7 +106,7 @@ trait FlinkClientTrait extends Logger {
executionParameters.applyToConfiguration(flinkConfig)
}
- // set common parameter
+ // 1) set common parameter
flinkConfig
.safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
.safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
@@ -97,9 +115,7 @@ trait FlinkClientTrait extends Logger {
.safeSet(ApplicationConfiguration.APPLICATION_ARGS,
extractProgramArgs(submitRequest))
.safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
submitRequest.jobId)
- if (
-
!submitRequest.properties.containsKey(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())
- ) {
+ if
(!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
submitRequest.flinkVersion.flinkHome)
// state.checkpoints.num-retained
@@ -107,44 +123,36 @@ trait FlinkClientTrait extends Logger {
flinkConfig.safeSet(retainedOption,
flinkDefaultConfiguration.get(retainedOption))
}
- // set savepoint parameter
- if (submitRequest.savePoint != null) {
+ // 2) set savepoint parameter
+ if (StringUtils.isNotBlank(submitRequest.savePoint)) {
flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH,
submitRequest.savePoint)
flinkConfig.setBoolean(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
}
- // set JVMOptions..
- setJvmOptions(submitRequest, flinkConfig)
-
- setConfig(submitRequest, flinkConfig)
-
- doSubmit(submitRequest, flinkConfig)
-
- }
-
- private[this] def setJvmOptions(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration): Unit = {
+ // 4) set env.xx.opts parameter
if (MapUtils.isNotEmpty(submitRequest.properties)) {
- submitRequest.properties.foreach(
- x => {
- val k = x._1.trim
- val v = x._2.toString
- if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
- } else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
- flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
- }
- })
+ // file.encoding...
+ if (submitRequest.hasProp(CoreOptions.FLINK_JVM_OPTIONS.key())) {
+ val jvmOpt =
submitRequest.getProp(CoreOptions.FLINK_JVM_OPTIONS.key()).toString
+ if (!jvmOpt.contains("-Dfile.encoding=")) {
+ // set default file.encoding
+ val opt = s"-Dfile.encoding=UTF-8 $jvmOpt"
+ submitRequest.properties.put(CoreOptions.FLINK_JVM_OPTIONS.key(),
opt)
+ }
+ }
+
+ submitRequest.properties
+ .filter(_._1.startsWith("env."))
+ .foreach(
+ x => {
+ logInfo(s"env opts: ${x._1}: ${x._2}")
+ flinkConfig.setString(x._1, x._2.toString)
+ })
}
+
+ flinkConfig
}
def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@@ -524,4 +532,17 @@ trait FlinkClientTrait extends Logger {
clientWrapper.triggerSavepoint(jobID, savepointPath).get()
}
+ def closeSubmit(submitRequest: SubmitRequest, close: AutoCloseable*): Unit =
{
+ close.foreach(
+ x => {
+ if (x.isInstanceOf[PackagedProgram]) {
+ if (submitRequest.safePackageProgram) {
+ Utils.close(x)
+ }
+ } else {
+ Utils.close(x)
+ }
+ })
+ }
+
}