This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch yarn-app
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/yarn-app by this push:
     new 3cc157edb [Improve] flink yarn-app mode improvement
3cc157edb is described below

commit 3cc157edb2e736e87cc17cc74d28413c17e938d0
Author: benjobs <[email protected]>
AuthorDate: Thu Nov 9 14:01:42 2023 +0800

    [Improve] flink yarn-app mode improvement
---
 .../core/service/impl/AppBuildPipeServiceImpl.java | 25 +++++++++++-----------
 .../flink/client/impl/YarnApplicationClient.scala  |  1 -
 2 files changed, 13 insertions(+), 13 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 2b4cc9253..965f21e70 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -91,7 +91,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -390,29 +389,31 @@ public class AppBuildPipeServiceImpl
     if (app.isCustomCodeJob()) {
       // customCode upload jar to appHome...
       FsOperator fsOperator = app.getFsOperator();
+
       if (app.isUploadJob()) {
         // 1). upload jar to local upload.
         File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
         File uploadJar = new File(localUploadDIR, app.getJar());
         checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, 
localUploadDIR);
+
         if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
           List<File> jars = new ArrayList<>(0);
+
+          // 1) user jar
           jars.add(uploadJar);
 
-          // 2). jar dependency to local upload
-          if (!app.getDependencyObject().getJar().isEmpty()) {
-            for (String jar : app.getDependencyObject().getJar()) {
-              jars.add(new File(localUploadDIR, jar));
-            }
-          }
+          // 2). jar dependency
+          app.getDependencyObject()
+              .getJar()
+              .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
 
-          // 3. pom dependency to local upload
+          // 3). pom dependency
           if (!app.getDependencyInfo().mavenArts().isEmpty()) {
-            Set<File> dependJars =
-                
MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts());
-            jars.addAll(dependJars);
+            
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
           }
-          fsOperator.mkdirs(app.getAppLib());
+
+          fsOperator.mkCleanDirs(app.getAppLib());
+          // upload jars to uploadDIR
           jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), 
app.getAppLib()));
         }
       } else {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index b84aae322..26af8f8ab 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -62,7 +62,6 @@ object YarnApplicationClient extends YarnClientTrait {
     val providedLibs = {
       val array = ListBuffer(
         submitRequest.hdfsWorkspace.flinkLib,
-        submitRequest.hdfsWorkspace.flinkPlugins,
         submitRequest.hdfsWorkspace.appJars,
         submitRequest.hdfsWorkspace.appPlugins
       )

Reply via email to