This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 50ab5b684 [Improve] flink on yarn-per-job mode bug fixed #3761 (#3834)
50ab5b684 is described below
commit 50ab5b6843db4f7c462f36f6d98ec68e2562d47a
Author: benjobs <[email protected]>
AuthorDate: Fri Jul 5 09:44:15 2024 +0800
[Improve] flink on yarn-per-job mode bug fixed #3761 (#3834)
* [Improve] flink on yarn-per-job mode bug fixed #3761
* [Improve] FlinkClientTrait improvement
* [Improve] SubmitRequest minor improvement
---
...ApplicationsFlink116OnYarnWithFlinkSQLTest.java | 5 +-
...ApplicationsFlink117OnYarnWithFlinkSQLTest.java | 5 +-
.../flink/client/bean/SubmitRequest.scala | 4 +-
.../flink/client/trait/FlinkClientTrait.scala | 208 +++++++++++----------
4 files changed, 117 insertions(+), 105 deletions(-)
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
index f768bde3d..ad75ff64c 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
@@ -213,9 +213,8 @@ public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("SUCCESS")));
}
- // This test cannot be executed due to a bug, and will be put online after
issue #3761 fixed
- // @Test
- // @Order(70)
+ @Test
+ @Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
index e3b904589..1f8940221 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
@@ -214,9 +214,8 @@ public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("SUCCESS")));
}
- // This test cannot be executed due to a bug, and will be put online after
issue #3761 fixed
- // @Test
- // @Order(70)
+ @Test
+ @Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 0b97e92f2..a1ab7df30 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums._
-import org.apache.streampark.common.util.{AssertUtils, DeflaterUtils,
HdfsUtils, PropertiesUtils}
+import org.apache.streampark.common.util._
import org.apache.streampark.flink.packer.pipeline.{BuildResult,
ShadedBuildResponse}
import org.apache.streampark.flink.util.FlinkUtils
import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
@@ -111,6 +111,8 @@ case class SubmitRequest(
}
}
+ def hasProp(key: String): Boolean = properties.containsKey(key)
+
private[this] def getParameterMap(prefix: String = ""): Map[String, String]
= {
if (this.appConf == null) {
return Map.empty[String, String]
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 7ff741163..4cebe8d4e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -23,7 +23,7 @@ import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums._
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util._
-import org.apache.streampark.flink.client.bean.{SubmitResponse, _}
+import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -39,7 +39,6 @@ import org.apache.flink.client.program.{ClusterClient,
PackagedProgram, Packaged
import org.apache.flink.configuration._
import org.apache.flink.python.PythonOptions
import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
-import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
import java.io.File
@@ -81,10 +80,42 @@ trait FlinkClientTrait extends Logger {
|-------------------------------------------------------------------------------------------
|""".stripMargin)
+ // prepare flink config
+ val flinkConfig = prepareConfig(submitRequest)
+
+ // set JVMOptions..
+ setJvmOptions(submitRequest, flinkConfig)
+
+ setConfig(submitRequest, flinkConfig)
+
+ Try(doSubmit(submitRequest, flinkConfig)) match {
+ case Success(resp) => resp
+ case Failure(e) =>
+ logError(
+ s"flink job ${submitRequest.appName} start failed, " +
+ s"executionMode: ${submitRequest.executionMode.getName}, " +
+ s"detail: ${ExceptionUtils.stringifyException(e)}")
+ throw e
+ }
+ }
+
+ private[this] def prepareConfig(submitRequest: SubmitRequest): Configuration
= {
+
val (commandLine, flinkConfig) =
getCommandLineAndFlinkConfig(submitRequest)
submitRequest.developmentMode match {
case FlinkDevelopmentMode.PYFLINK =>
+ val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
+ AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv
File does not exist")
+
+ flinkConfig
+ // python.archives
+ .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
+ // python.client.executable
+ .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
Constant.PYTHON_EXECUTABLE)
+ // python.executable
+ .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
+
val flinkOptPath: String =
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
if (StringUtils.isBlank(flinkOptPath)) {
logWarn(s"Get environment variable
${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
@@ -113,9 +144,7 @@ trait FlinkClientTrait extends Logger {
.safeSet(ApplicationConfiguration.APPLICATION_ARGS,
extractProgramArgs(submitRequest))
.safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
submitRequest.jobId)
- if (
-
!submitRequest.properties.containsKey(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())
- ) {
+ if
(!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
submitRequest.flinkVersion.flinkHome)
// state.checkpoints.num-retained
@@ -124,31 +153,20 @@ trait FlinkClientTrait extends Logger {
}
// set savepoint parameter
- if (submitRequest.savePoint != null) {
+ if (StringUtils.isNotBlank(submitRequest.savePoint)) {
flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH,
submitRequest.savePoint)
flinkConfig.setBoolean(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
- val enableRestoreModeState = submitRequest.flinkVersion.checkVersion(
- FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode !=
null
- if (enableRestoreModeState) {
+ val enableRestoreMode =
+ submitRequest.restoreMode != null &&
submitRequest.flinkVersion.checkVersion(
+ FlinkRestoreMode.SINCE_FLINK_VERSION)
+ if (enableRestoreMode) {
flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE,
submitRequest.restoreMode.getName);
}
}
- // set JVMOptions..
- setJvmOptions(submitRequest, flinkConfig)
-
- setConfig(submitRequest, flinkConfig)
-
- Try(doSubmit(submitRequest, flinkConfig)) match {
- case Success(resp) => resp
- case Failure(e) =>
- logError(
- s"flink job ${submitRequest.appName} start failed, executionMode:
${submitRequest.executionMode.getName}, detail: ${ExceptionUtils
- .stringifyException(e)}")
- throw e
- }
+ flinkConfig
}
private[this] def setJvmOptions(
@@ -262,45 +280,41 @@ trait FlinkClientTrait extends Logger {
submitRequest: SubmitRequest,
jarFile: File): (PackagedProgram, JobGraph) = {
- val pkgBuilder = PackagedProgram.newBuilder
- .setEntryPointClassName(
-
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
- )
- .setArguments(
- flinkConfig
- .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
- .orElse(Lists.newArrayList()): _*
- )
- .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-
- submitRequest.developmentMode match {
- case FlinkDevelopmentMode.PYFLINK =>
- val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
- AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv
File does not exist")
+ val packagedProgramBuilder = {
+ val builder = PackagedProgram.newBuilder
+ .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
+ .setEntryPointClassName(
+
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+ )
+ .setArguments(
+ flinkConfig
+ .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+ .orElse(Lists.newArrayList()): _*
+ )
- flinkConfig
- // python.archives
- .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
- // python.client.executable
- .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
Constant.PYTHON_EXECUTABLE)
- // python.executable
- .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
- if (submitRequest.libs.nonEmpty) {
- pkgBuilder.setUserClassPaths(submitRequest.libs)
- }
- case _ =>
- pkgBuilder
- .setUserClassPaths(submitRequest.classPaths)
- .setJarFile(jarFile)
+ submitRequest.developmentMode match {
+ case FlinkDevelopmentMode.PYFLINK =>
+ if (submitRequest.libs.nonEmpty) {
+ // BUG: https://github.com/apache/incubator-streampark/issues/3761
+ //
builder.setUserClassPaths(Lists.newArrayList(submitRequest.libs: _*))
+ }
+ case _ =>
+ builder
+ .setJarFile(jarFile)
+ // BUG: https://github.com/apache/incubator-streampark/issues/3761
+ // .setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
+ }
+ builder
}
- val packageProgram = pkgBuilder.build()
+ val packageProgram = packagedProgramBuilder.build()
val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
flinkConfig,
getParallelism(submitRequest),
null,
false)
+
packageProgram -> jobGraph
}
@@ -337,16 +351,13 @@ trait FlinkClientTrait extends Logger {
private[this] def getCustomCommandLines(flinkHome: String):
JavaList[CustomCommandLine] = {
val flinkDefaultConfiguration: Configuration =
getFlinkDefaultConfiguration(flinkHome)
// 1. find the configuration directory
- val configurationDirectory = s"$flinkHome/conf"
+ val confDir = s"$flinkHome/conf"
// 2. load the custom command lines
- val customCommandLines =
- loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
- new CliFrontend(flinkDefaultConfiguration, customCommandLines)
- customCommandLines
+ loadCustomCommandLines(flinkDefaultConfiguration, confDir)
}
private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
- if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
+ if (submitRequest.hasProp(KEY_FLINK_PARALLELISM())) {
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
} else {
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
@@ -363,21 +374,22 @@ trait FlinkClientTrait extends Logger {
val cliArgs = {
val optionMap = new mutable.HashMap[String, Any]()
submitRequest.appOption
- .filter(
- x => {
- val verify = commandLineOptions.hasOption(x._1)
- if (!verify) logWarn(s"param:${x._1} is error,skip it.")
- verify
- })
- .foreach(
- x => {
- val opt = commandLineOptions.getOption(x._1.trim).getOpt
- Try(x._2.toBoolean).getOrElse(x._2) match {
- case b if b.isInstanceOf[Boolean] =>
- if (b.asInstanceOf[Boolean]) optionMap += s"-$opt" -> true
- case v => optionMap += s"-$opt" -> v
+ .foreach {
+ opt =>
+ val verify = commandLineOptions.hasOption(opt._1)
+ if (!verify) {
+ logWarn(s"param:${opt._1} is error,skip it.")
+ } else {
+ val option = commandLineOptions.getOption(opt._1.trim).getOpt
+ Try(opt._2.toBoolean).getOrElse(opt._2) match {
+ case b if b.isInstanceOf[Boolean] =>
+ if (b.asInstanceOf[Boolean]) {
+ optionMap += s"-$option" -> true
+ }
+ case v => optionMap += s"-$option" -> v
+ }
}
- })
+ }
// fromSavePoint
if (submitRequest.savePoint != null) {
@@ -391,9 +403,9 @@ trait FlinkClientTrait extends Logger {
val array = new ArrayBuffer[String]()
optionMap.foreach(
- x => {
- array += x._1
- x._2 match {
+ opt => {
+ array += opt._1
+ opt._2 match {
case v: String => array += v
case _ =>
}
@@ -401,12 +413,13 @@ trait FlinkClientTrait extends Logger {
// app properties
if (MapUtils.isNotEmpty(submitRequest.properties)) {
- submitRequest.properties.foreach(
- x => {
- if (!x._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
- array += s"-D${x._1}=${x._2}"
+ submitRequest.properties.foreach {
+ key =>
+ if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
+ logInfo(s"submit application dynamicProperties: ${key._1}
:${key._2}")
+ array += s"-D${key._1}=${key._2}"
}
- })
+ }
}
array.toArray
}
@@ -480,21 +493,22 @@ trait FlinkClientTrait extends Logger {
}
// execution.runtime-mode
- val addRuntimeModeState =
- submitRequest.properties.nonEmpty &&
submitRequest.properties.containsKey(
- ExecutionOptions.RUNTIME_MODE.key())
- if (addRuntimeModeState) {
- programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
- programArgs +=
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
+
Try(submitRequest.properties(ExecutionOptions.RUNTIME_MODE.key()).toString)
match {
+ case Success(runtimeMode) =>
+ programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+ programArgs += runtimeMode
+ case _ =>
}
- val addUserJarFileState =
- submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK &&
submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
- if (addUserJarFileState) {
- // python file
- programArgs.add("-py")
- programArgs.add(submitRequest.userJarFile.getAbsolutePath)
+ if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
+ // TODO why executionMode is not yarn-application ???
+ if (submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION) {
+ // python file
+ programArgs.add("-py")
+ programArgs.add(submitRequest.userJarFile.getAbsolutePath)
+ }
}
+
Lists.newArrayList(programArgs: _*)
}
@@ -507,9 +521,9 @@ trait FlinkClientTrait extends Logger {
val configuration = new Configuration()
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
flinkDefaultConfiguration.keySet.foreach(
- x => {
- flinkDefaultConfiguration.getString(x, null) match {
- case v if v != null => configuration.setString(x, v)
+ key => {
+ flinkDefaultConfiguration.getString(key, null) match {
+ case v if v != null => configuration.setString(key, v)
case _ =>
}
})
@@ -537,9 +551,7 @@ trait FlinkClientTrait extends Logger {
val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false)
val withDrain = Try(cancelRequest.withDrain).getOrElse(false)
- (
- Try(cancelRequest.withSavepoint).getOrElse(false),
- Try(cancelRequest.withDrain).getOrElse(false)) match {
+ (withSavepoint, withDrain) match {
case (false, false) =>
client.cancel(jobID).get()
null