This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 591098314 [Fix] Fix pyflink yarn remote mode bug (#3339)
591098314 is described below

commit 591098314868df48946e58d8395c57e322662e86
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Nov 13 15:26:15 2023 +0800

    [Fix] Fix pyflink yarn remote mode bug (#3339)
    
    * [Fix] Fix pyflink yarn remote mode bug
---
 .../apache/streampark/flink/client/bean/SubmitRequest.scala   |  7 ++++---
 .../streampark/flink/client/trait/FlinkClientTrait.scala      | 11 ++++++-----
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 2c1a72e51..87ec66d21 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -70,12 +70,13 @@ case class SubmitRequest(
   lazy val effectiveAppName: String =
     if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else 
this.appName
 
-  lazy val classPaths: List[URL] = {
+  lazy val libs: List[URL] = {
     val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
-    val lib = Try(new 
File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
-    flinkVersion.flinkLibs ++ lib
+    Try(new 
File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
   }
 
+  lazy val classPaths: List[URL] = flinkVersion.flinkLibs ++ libs
+
   lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
 
   lazy val allowNonRestoredState: Boolean = Try(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 020301b73..a31250d4c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -42,9 +42,6 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, 
SavepointConfigOptions}
 import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
-import java.io.File
-import java.net.URL
-import java.util
 import java.util.{Collections, List => JavaList, Map => JavaMap}
 
 import scala.collection.convert.ImplicitConversions._
@@ -257,7 +254,6 @@ trait FlinkClientTrait extends Logger {
       flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
 
     val pkgBuilder = PackagedProgram.newBuilder
-      .setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
       .setEntryPointClassName(
         
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
       )
@@ -281,8 +277,13 @@ trait FlinkClientTrait extends Logger {
           .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
           // python.executable
           .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
+        if (submitRequest.libs.nonEmpty) {
+          pkgBuilder.setUserClassPaths(submitRequest.libs)
+        }
       case _ =>
-        pkgBuilder.setJarFile(submitRequest.userJarFile)
+        pkgBuilder
+          .setUserClassPaths(submitRequest.classPaths)
+          .setJarFile(submitRequest.userJarFile)
     }
 
     val packageProgram = pkgBuilder.build()

Reply via email to