This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch yarn-app
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/yarn-app by this push:
new ba65306e9 [Improve] yarn application mode bug fixed.
ba65306e9 is described below
commit ba65306e9550982422044ccd72c5bcce7dbc2d6f
Author: benjobs <[email protected]>
AuthorDate: Wed Nov 8 23:28:55 2023 +0800
[Improve] yarn application mode bug fixed.
---
.../console/core/service/impl/ApplicationServiceImpl.java | 6 +++++-
.../flink/client/impl/YarnApplicationClient.scala | 14 ++++++--------
2 files changed, 11 insertions(+), 9 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 99f21fd59..f86cb2628 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1500,7 +1500,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
"%s/%s", application.getAppLib(),
application.getModule().concat(".jar"));
break;
case APACHE_FLINK:
- flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
+ if (application.getFsOperator().exists(application.getAppLib())) {
+ flinkUserJar = String.format("%s/%s", application.getAppLib(),
application.getJar());
+ } else {
+ flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
+ }
break;
default:
throw new IllegalArgumentException(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 2b981b305..b84aae322 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -66,14 +66,12 @@ object YarnApplicationClient extends YarnClientTrait {
submitRequest.hdfsWorkspace.appJars,
submitRequest.hdfsWorkspace.appPlugins
)
- submitRequest.developmentMode match {
- case DevelopmentMode.FLINK_SQL =>
- array +=
s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
- val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
- if (HdfsUtils.exists(jobLib)) {
- array += jobLib
- }
- case _ =>
+ val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
+ if (HdfsUtils.exists(jobLib)) {
+ array += jobLib
+ }
+ if (submitRequest.developmentMode == DevelopmentMode.FLINK_SQL) {
+ array +=
s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
}
array.toList
}