This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 50ab5b684 [Improve]  flink on yarn-per-job mode bug fixed #3761 (#3834)
50ab5b684 is described below

commit 50ab5b6843db4f7c462f36f6d98ec68e2562d47a
Author: benjobs <[email protected]>
AuthorDate: Fri Jul 5 09:44:15 2024 +0800

    [Improve]  flink on yarn-per-job mode bug fixed #3761 (#3834)
    
    * [Improve]  flink on yarn-per-job mode bug fixed #3761
    
    * [Improve] FlinkClientTrait improvement
    
    * [Improve] SubmitRequest minor improvement
---
 ...ApplicationsFlink116OnYarnWithFlinkSQLTest.java |   5 +-
 ...ApplicationsFlink117OnYarnWithFlinkSQLTest.java |   5 +-
 .../flink/client/bean/SubmitRequest.scala          |   4 +-
 .../flink/client/trait/FlinkClientTrait.scala      | 208 +++++++++++----------
 4 files changed, 117 insertions(+), 105 deletions(-)

diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
index f768bde3d..ad75ff64c 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
@@ -213,9 +213,8 @@ public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
                                 .anyMatch(it -> it.contains("SUCCESS")));
     }
 
-    // This test cannot be executed due to a bug, and will be put online after 
issue #3761 fixed
-    // @Test
-    // @Order(70)
+    @Test
+    @Order(70)
     void testStartFlinkApplicationOnYarnPerJobMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
index e3b904589..1f8940221 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
@@ -214,9 +214,8 @@ public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
                                 .anyMatch(it -> it.contains("SUCCESS")));
     }
 
-    // This test cannot be executed due to a bug, and will be put online after 
issue #3761 fixed
-    // @Test
-    // @Order(70)
+    @Test
+    @Order(70)
     void testStartFlinkApplicationOnYarnPerJobMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 0b97e92f2..a1ab7df30 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
 import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums._
-import org.apache.streampark.common.util.{AssertUtils, DeflaterUtils, 
HdfsUtils, PropertiesUtils}
+import org.apache.streampark.common.util._
 import org.apache.streampark.flink.packer.pipeline.{BuildResult, 
ShadedBuildResponse}
 import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
@@ -111,6 +111,8 @@ case class SubmitRequest(
     }
   }
 
+  def hasProp(key: String): Boolean = properties.containsKey(key)
+
   private[this] def getParameterMap(prefix: String = ""): Map[String, String] 
= {
     if (this.appConf == null) {
       return Map.empty[String, String]
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 7ff741163..4cebe8d4e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -23,7 +23,7 @@ import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums._
 import org.apache.streampark.common.fs.FsOperator
 import org.apache.streampark.common.util._
-import org.apache.streampark.flink.client.bean.{SubmitResponse, _}
+import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
 
@@ -39,7 +39,6 @@ import org.apache.flink.client.program.{ClusterClient, 
PackagedProgram, Packaged
 import org.apache.flink.configuration._
 import org.apache.flink.python.PythonOptions
 import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
-import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import java.io.File
@@ -81,10 +80,42 @@ trait FlinkClientTrait extends Logger {
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
 
+    // prepare flink config
+    val flinkConfig = prepareConfig(submitRequest)
+
+    // set JVMOptions..
+    setJvmOptions(submitRequest, flinkConfig)
+
+    setConfig(submitRequest, flinkConfig)
+
+    Try(doSubmit(submitRequest, flinkConfig)) match {
+      case Success(resp) => resp
+      case Failure(e) =>
+        logError(
+          s"flink job ${submitRequest.appName} start failed, " +
+            s"executionMode: ${submitRequest.executionMode.getName}, " +
+            s"detail: ${ExceptionUtils.stringifyException(e)}")
+        throw e
+    }
+  }
+
+  private[this] def prepareConfig(submitRequest: SubmitRequest): Configuration 
= {
+
     val (commandLine, flinkConfig) = 
getCommandLineAndFlinkConfig(submitRequest)
 
     submitRequest.developmentMode match {
       case FlinkDevelopmentMode.PYFLINK =>
+        val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
+        AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv 
File does not exist")
+
+        flinkConfig
+          // python.archives
+          .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
+          // python.client.executable
+          .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
+          // python.executable
+          .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
+
         val flinkOptPath: String = 
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
         if (StringUtils.isBlank(flinkOptPath)) {
           logWarn(s"Get environment variable 
${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
@@ -113,9 +144,7 @@ trait FlinkClientTrait extends Logger {
       .safeSet(ApplicationConfiguration.APPLICATION_ARGS, 
extractProgramArgs(submitRequest))
       .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
submitRequest.jobId)
 
-    if (
-      
!submitRequest.properties.containsKey(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())
-    ) {
+    if 
(!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
       val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
         submitRequest.flinkVersion.flinkHome)
       // state.checkpoints.num-retained
@@ -124,31 +153,20 @@ trait FlinkClientTrait extends Logger {
     }
 
     // set savepoint parameter
-    if (submitRequest.savePoint != null) {
+    if (StringUtils.isNotBlank(submitRequest.savePoint)) {
       flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, 
submitRequest.savePoint)
       flinkConfig.setBoolean(
         SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
         submitRequest.allowNonRestoredState)
-      val enableRestoreModeState = submitRequest.flinkVersion.checkVersion(
-        FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != 
null
-      if (enableRestoreModeState) {
+      val enableRestoreMode =
+        submitRequest.restoreMode != null && 
submitRequest.flinkVersion.checkVersion(
+          FlinkRestoreMode.SINCE_FLINK_VERSION)
+      if (enableRestoreMode) {
         flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE, 
submitRequest.restoreMode.getName);
       }
     }
 
-    // set JVMOptions..
-    setJvmOptions(submitRequest, flinkConfig)
-
-    setConfig(submitRequest, flinkConfig)
-
-    Try(doSubmit(submitRequest, flinkConfig)) match {
-      case Success(resp) => resp
-      case Failure(e) =>
-        logError(
-          s"flink job ${submitRequest.appName} start failed, executionMode: 
${submitRequest.executionMode.getName}, detail: ${ExceptionUtils
-              .stringifyException(e)}")
-        throw e
-    }
+    flinkConfig
   }
 
   private[this] def setJvmOptions(
@@ -262,45 +280,41 @@ trait FlinkClientTrait extends Logger {
       submitRequest: SubmitRequest,
       jarFile: File): (PackagedProgram, JobGraph) = {
 
-    val pkgBuilder = PackagedProgram.newBuilder
-      .setEntryPointClassName(
-        
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
-      )
-      .setArguments(
-        flinkConfig
-          .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
-          .orElse(Lists.newArrayList()): _*
-      )
-      .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-
-    submitRequest.developmentMode match {
-      case FlinkDevelopmentMode.PYFLINK =>
-        val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
-        AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv 
File does not exist")
+    val packagedProgramBuilder = {
+      val builder = PackagedProgram.newBuilder
+        .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
+        .setEntryPointClassName(
+          
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+        )
+        .setArguments(
+          flinkConfig
+            .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+            .orElse(Lists.newArrayList()): _*
+        )
 
-        flinkConfig
-          // python.archives
-          .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
-          // python.client.executable
-          .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
-          // python.executable
-          .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
-        if (submitRequest.libs.nonEmpty) {
-          pkgBuilder.setUserClassPaths(submitRequest.libs)
-        }
-      case _ =>
-        pkgBuilder
-          .setUserClassPaths(submitRequest.classPaths)
-          .setJarFile(jarFile)
+      submitRequest.developmentMode match {
+        case FlinkDevelopmentMode.PYFLINK =>
+          if (submitRequest.libs.nonEmpty) {
+            // BUG: https://github.com/apache/incubator-streampark/issues/3761
+            // 
builder.setUserClassPaths(Lists.newArrayList(submitRequest.libs: _*))
+          }
+        case _ =>
+          builder
+            .setJarFile(jarFile)
+        // BUG: https://github.com/apache/incubator-streampark/issues/3761
+        // .setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
+      }
+      builder
     }
 
-    val packageProgram = pkgBuilder.build()
+    val packageProgram = packagedProgramBuilder.build()
     val jobGraph = PackagedProgramUtils.createJobGraph(
       packageProgram,
       flinkConfig,
       getParallelism(submitRequest),
       null,
       false)
+
     packageProgram -> jobGraph
   }
 
@@ -337,16 +351,13 @@ trait FlinkClientTrait extends Logger {
   private[this] def getCustomCommandLines(flinkHome: String): 
JavaList[CustomCommandLine] = {
     val flinkDefaultConfiguration: Configuration = 
getFlinkDefaultConfiguration(flinkHome)
     // 1. find the configuration directory
-    val configurationDirectory = s"$flinkHome/conf"
+    val confDir = s"$flinkHome/conf"
     // 2. load the custom command lines
-    val customCommandLines =
-      loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
-    new CliFrontend(flinkDefaultConfiguration, customCommandLines)
-    customCommandLines
+    loadCustomCommandLines(flinkDefaultConfiguration, confDir)
   }
 
   private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
-    if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
+    if (submitRequest.hasProp(KEY_FLINK_PARALLELISM())) {
       
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
     } else {
       getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
@@ -363,21 +374,22 @@ trait FlinkClientTrait extends Logger {
     val cliArgs = {
       val optionMap = new mutable.HashMap[String, Any]()
       submitRequest.appOption
-        .filter(
-          x => {
-            val verify = commandLineOptions.hasOption(x._1)
-            if (!verify) logWarn(s"param:${x._1} is error,skip it.")
-            verify
-          })
-        .foreach(
-          x => {
-            val opt = commandLineOptions.getOption(x._1.trim).getOpt
-            Try(x._2.toBoolean).getOrElse(x._2) match {
-              case b if b.isInstanceOf[Boolean] =>
-                if (b.asInstanceOf[Boolean]) optionMap += s"-$opt" -> true
-              case v => optionMap += s"-$opt" -> v
+        .foreach {
+          opt =>
+            val verify = commandLineOptions.hasOption(opt._1)
+            if (!verify) {
+              logWarn(s"param:${opt._1} is error,skip it.")
+            } else {
+              val option = commandLineOptions.getOption(opt._1.trim).getOpt
+              Try(opt._2.toBoolean).getOrElse(opt._2) match {
+                case b if b.isInstanceOf[Boolean] =>
+                  if (b.asInstanceOf[Boolean]) {
+                    optionMap += s"-$option" -> true
+                  }
+                case v => optionMap += s"-$option" -> v
+              }
             }
-          })
+        }
 
       // fromSavePoint
       if (submitRequest.savePoint != null) {
@@ -391,9 +403,9 @@ trait FlinkClientTrait extends Logger {
 
       val array = new ArrayBuffer[String]()
       optionMap.foreach(
-        x => {
-          array += x._1
-          x._2 match {
+        opt => {
+          array += opt._1
+          opt._2 match {
             case v: String => array += v
             case _ =>
           }
@@ -401,12 +413,13 @@ trait FlinkClientTrait extends Logger {
 
       // app properties
       if (MapUtils.isNotEmpty(submitRequest.properties)) {
-        submitRequest.properties.foreach(
-          x => {
-            if (!x._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
-              array += s"-D${x._1}=${x._2}"
+        submitRequest.properties.foreach {
+          key =>
+            if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
+              logInfo(s"submit application dynamicProperties:  ${key._1} 
:${key._2}")
+              array += s"-D${key._1}=${key._2}"
             }
-          })
+        }
       }
       array.toArray
     }
@@ -480,21 +493,22 @@ trait FlinkClientTrait extends Logger {
     }
 
     // execution.runtime-mode
-    val addRuntimeModeState =
-      submitRequest.properties.nonEmpty && 
submitRequest.properties.containsKey(
-        ExecutionOptions.RUNTIME_MODE.key())
-    if (addRuntimeModeState) {
-      programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
-      programArgs += 
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
+    
Try(submitRequest.properties(ExecutionOptions.RUNTIME_MODE.key()).toString) 
match {
+      case Success(runtimeMode) =>
+        programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+        programArgs += runtimeMode
+      case _ =>
     }
 
-    val addUserJarFileState =
-      submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK && 
submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
-    if (addUserJarFileState) {
-      // python file
-      programArgs.add("-py")
-      programArgs.add(submitRequest.userJarFile.getAbsolutePath)
+    if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
+      // TODO why executionMode is not yarn-application ???
+      if (submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION) {
+        // python file
+        programArgs.add("-py")
+        programArgs.add(submitRequest.userJarFile.getAbsolutePath)
+      }
     }
+
     Lists.newArrayList(programArgs: _*)
   }
 
@@ -507,9 +521,9 @@ trait FlinkClientTrait extends Logger {
     val configuration = new Configuration()
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
     flinkDefaultConfiguration.keySet.foreach(
-      x => {
-        flinkDefaultConfiguration.getString(x, null) match {
-          case v if v != null => configuration.setString(x, v)
+      key => {
+        flinkDefaultConfiguration.getString(key, null) match {
+          case v if v != null => configuration.setString(key, v)
           case _ =>
         }
       })
@@ -537,9 +551,7 @@ trait FlinkClientTrait extends Logger {
     val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false)
     val withDrain = Try(cancelRequest.withDrain).getOrElse(false)
 
-    (
-      Try(cancelRequest.withSavepoint).getOrElse(false),
-      Try(cancelRequest.withDrain).getOrElse(false)) match {
+    (withSavepoint, withDrain) match {
       case (false, false) =>
         client.cancel(jobID).get()
         null

Reply via email to