This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new c038ef69c [Fix] Fix pyflink yarn application mode maven dependency
bug (#3322)
c038ef69c is described below
commit c038ef69c37fca9104f06dc54ab7368e2dc9d936
Author: ChengJie1053 <[email protected]>
AuthorDate: Tue Nov 7 17:07:40 2023 +0800
[Fix] Fix pyflink yarn application mode maven dependency bug (#3322)
* [Improve] Console core module code optimization
* [Fix] Fix pyflink yarn application mode maven dependency bug
---
.../streampark/flink/client/impl/YarnApplicationClient.scala | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 3470e8e67..f4fe45761 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.client.impl
import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
+import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.FlinkDevelopmentMode
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils}
@@ -101,6 +101,12 @@ object YarnApplicationClient extends YarnClientTrait {
if (!FsOperator.hdfs.exists(pyVenv)) {
throw new RuntimeException(s"$pyVenv File does not exist")
}
+
+ val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+ if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib))
{
+ flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+ }
+
// yarn.ship-files
val shipFiles = new util.ArrayList[String]()
shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)