This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch yarn-app
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/yarn-app by this push:
     new ba65306e9 [Improve] yarn application mode bug fixed.
ba65306e9 is described below

commit ba65306e9550982422044ccd72c5bcce7dbc2d6f
Author: benjobs <[email protected]>
AuthorDate: Wed Nov 8 23:28:55 2023 +0800

    [Improve] yarn application mode bug fixed.
---
 .../console/core/service/impl/ApplicationServiceImpl.java  |  6 +++++-
 .../flink/client/impl/YarnApplicationClient.scala          | 14 ++++++--------
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 99f21fd59..f86cb2628 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1500,7 +1500,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                     "%s/%s", application.getAppLib(), 
application.getModule().concat(".jar"));
             break;
           case APACHE_FLINK:
-            flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
+            if (application.getFsOperator().exists(application.getAppLib())) {
+              flinkUserJar = String.format("%s/%s", application.getAppLib(), 
application.getJar());
+            } else {
+              flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
+            }
             break;
           default:
             throw new IllegalArgumentException(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 2b981b305..b84aae322 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -66,14 +66,12 @@ object YarnApplicationClient extends YarnClientTrait {
         submitRequest.hdfsWorkspace.appJars,
         submitRequest.hdfsWorkspace.appPlugins
       )
-      submitRequest.developmentMode match {
-        case DevelopmentMode.FLINK_SQL =>
-          array += 
s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
-          val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
-          if (HdfsUtils.exists(jobLib)) {
-            array += jobLib
-          }
-        case _ =>
+      val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
+      if (HdfsUtils.exists(jobLib)) {
+        array += jobLib
+      }
+      if (submitRequest.developmentMode == DevelopmentMode.FLINK_SQL) {
+        array += 
s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
       }
       array.toList
     }

Reply via email to