This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch classpaths
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 7997c5dd1a1a50c81b61fd2ac015519bde9b589f
Author: benjobs <[email protected]>
AuthorDate: Sat Nov 4 23:24:28 2023 +0800

    [Improve] submit flink job userclassPaths improvement
---
 .../flink/client/bean/SubmitRequest.scala          | 11 +++++++--
 .../flink/client/impl/YarnApplicationClient.scala  |  4 ----
 .../flink/client/trait/FlinkClientTrait.scala      | 27 +++++-----------------
 3 files changed, 15 insertions(+), 27 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 25e17120d..1fe9f98c8 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointResto
 import javax.annotation.Nullable
 
 import java.io.File
+import java.net.URL
 import java.util.{Map => JavaMap}
 
 import scala.collection.convert.ImplicitConversions._
@@ -55,7 +56,8 @@ case class SubmitRequest(
     @Nullable k8sSubmitParam: KubernetesSubmitParam,
     @Nullable extraParameter: JavaMap[String, Any]) {
 
-  lazy val appProperties: Map[String, String] = 
getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
+  private[this] lazy val appProperties: Map[String, String] = getParameterMap(
+    KEY_FLINK_PROPERTY_PREFIX)
 
   lazy val appOption: Map[String, String] = 
getParameterMap(KEY_FLINK_OPTION_PREFIX)
 
@@ -68,9 +70,14 @@ case class SubmitRequest(
   lazy val effectiveAppName: String =
     if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else 
this.appName
 
+  lazy val localLibs: List[URL] = {
+    val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
+    Try(new 
File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
+  }
+
   lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
 
-  lazy val allowNonRestoredState = Try(
+  lazy val allowNonRestoredState: Boolean = Try(
     
properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean)
     .getOrElse(false)
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f7b38cdde..3470e8e67 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -101,10 +101,6 @@ object YarnApplicationClient extends YarnClientTrait {
       if (!FsOperator.hdfs.exists(pyVenv)) {
         throw new RuntimeException(s"$pyVenv File does not exist")
       }
-
-      // including $app/lib
-      includingPipelineJars(submitRequest, flinkConfig)
-
       // yarn.ship-files
       val shipFiles = new util.ArrayList[String]()
       shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 435d882c7..ffc6b47a8 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait`
 import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, 
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
+import org.apache.streampark.common.enums._
 import org.apache.streampark.common.fs.FsOperator
 import org.apache.streampark.common.util._
 import org.apache.streampark.flink.client.bean._
@@ -43,6 +43,7 @@ import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import java.io.File
+import java.net.URL
 import java.util
 import java.util.{Collections, List => JavaList, Map => JavaMap}
 
@@ -256,9 +257,10 @@ trait FlinkClientTrait extends Logger {
       flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
 
     val pkgBuilder = PackagedProgram.newBuilder
-      .setUserClassPaths(
-        Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
-      )
+
+    val classPaths = submitRequest.flinkVersion.flinkLibs ++ 
submitRequest.localLibs
+    pkgBuilder
+      .setUserClassPaths(Lists.newArrayList(classPaths: _*))
       .setEntryPointClassName(
         
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
       )
@@ -275,14 +277,6 @@ trait FlinkClientTrait extends Logger {
         if (!FsOperator.lfs.exists(pythonVenv)) {
           throw new RuntimeException(s"$pythonVenv File does not exist")
         }
-        // including $app/lib
-        val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-        if (FileUtils.exists(localLib) && 
FileUtils.directoryNotBlank(localLib)) {
-          val localLibUrl = new 
File(localLib).listFiles().map(_.toURI.toURL).toList
-          pkgBuilder.setUserClassPaths(
-            Lists.newArrayList(localLibUrl: _*)
-          )
-        }
         flinkConfig
           // python.archives
           .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
@@ -600,13 +594,4 @@ trait FlinkClientTrait extends Logger {
     clientWrapper.triggerSavepoint(jobID, savepointPath, 
savepointRequest.nativeFormat).get()
   }
 
-  private[client] def includingPipelineJars(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration) = {
-    val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-    if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
-      flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
-    }
-  }
-
 }

Reply via email to