This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 591098314 [Fix] Fix pyflink yarn remote mode bug (#3339)
591098314 is described below
commit 591098314868df48946e58d8395c57e322662e86
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Nov 13 15:26:15 2023 +0800
[Fix] Fix pyflink yarn remote mode bug (#3339)
* [Fix] Fix pyflink yarn remote mode bug
---
.../apache/streampark/flink/client/bean/SubmitRequest.scala | 7 ++++---
.../streampark/flink/client/trait/FlinkClientTrait.scala | 11 ++++++-----
2 files changed, 10 insertions(+), 8 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 2c1a72e51..87ec66d21 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -70,12 +70,13 @@ case class SubmitRequest(
lazy val effectiveAppName: String =
if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else
this.appName
- lazy val classPaths: List[URL] = {
+ lazy val libs: List[URL] = {
val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
- val lib = Try(new
File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
- flinkVersion.flinkLibs ++ lib
+ Try(new
File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
}
+ lazy val classPaths: List[URL] = flinkVersion.flinkLibs ++ libs
+
lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
lazy val allowNonRestoredState: Boolean = Try(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 020301b73..a31250d4c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -42,9 +42,6 @@ import org.apache.flink.runtime.jobgraph.{JobGraph,
SavepointConfigOptions}
import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
-import java.io.File
-import java.net.URL
-import java.util
import java.util.{Collections, List => JavaList, Map => JavaMap}
import scala.collection.convert.ImplicitConversions._
@@ -257,7 +254,6 @@ trait FlinkClientTrait extends Logger {
flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
val pkgBuilder = PackagedProgram.newBuilder
- .setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
)
@@ -281,8 +277,13 @@ trait FlinkClientTrait extends Logger {
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
Constant.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
+ if (submitRequest.libs.nonEmpty) {
+ pkgBuilder.setUserClassPaths(submitRequest.libs)
+ }
case _ =>
- pkgBuilder.setJarFile(submitRequest.userJarFile)
+ pkgBuilder
+ .setUserClassPaths(submitRequest.classPaths)
+ .setJarFile(submitRequest.userJarFile)
}
val packageProgram = pkgBuilder.build()