This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch yarn-app
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit c9ab619e3358a7f8e718a9d8e68b01c53347575b
Author: benjobs <[email protected]>
AuthorDate: Wed Nov 8 01:40:38 2023 +0800

    [Bug] yarn application bug fixed
---
 .../core/service/impl/AppBuildPipeServiceImpl.java | 123 +++++++++++++--------
 .../core/service/impl/ApplicationServiceImpl.java  |   1 +
 2 files changed, 75 insertions(+), 49 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index f6fb7e641..11295412f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -19,8 +19,6 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.DevelopmentMode;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.FileUtils;
@@ -52,6 +50,7 @@ import 
org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.packer.docker.DockerConf;
+import org.apache.streampark.flink.packer.maven.MavenTool;
 import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
 import org.apache.streampark.flink.packer.pipeline.DockerBuildSnapshot;
@@ -87,10 +86,12 @@ import 
org.springframework.transaction.annotation.Transactional;
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -180,44 +181,7 @@ public class AppBuildPipeServiceImpl
             applicationService.checkEnv(app);
 
             // 2) some preparatory work
-            String appUploads = app.getWorkspace().APP_UPLOADS();
-
-            if (app.isCustomCodeJob()) {
-              // customCode upload jar to appHome...
-              FsOperator fsOperator = app.getFsOperator();
-              if (app.isCICDJob()) {
-                String appHome = app.getAppHome();
-                fsOperator.mkCleanDirs(appHome);
-                fsOperator.upload(app.getDistHome(), appHome);
-              } else {
-                File localJar = new File(WebUtils.getAppTempDir(), 
app.getJar());
-                // upload jar copy to appHome
-                String uploadJar = appUploads.concat("/").concat(app.getJar());
-                checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, 
appUploads);
-                if (app.getApplicationType() == 
ApplicationType.STREAMPARK_FLINK) {
-                  fsOperator.mkdirs(app.getAppLib());
-                  fsOperator.copy(uploadJar, app.getAppLib(), false, true);
-                }
-              }
-            }
-
-            if (app.isFlinkSqlJob() || app.isUploadJob()) {
-              if (!app.getDependencyObject().getJar().isEmpty()) {
-                String localUploads = Workspace.local().APP_UPLOADS();
-                // copy jar to local upload dir
-                for (String jar : app.getDependencyObject().getJar()) {
-                  File localJar = new File(WebUtils.getAppTempDir(), jar);
-                  File uploadJar = new File(localUploads, jar);
-                  if (!localJar.exists() && !uploadJar.exists()) {
-                    throw new ApiAlertException("Missing file: " + jar + ", 
please upload again");
-                  }
-                  if (localJar.exists()) {
-                    checkOrElseUploadJar(
-                        FsOperator.lfs(), localJar, 
uploadJar.getAbsolutePath(), localUploads);
-                  }
-                }
-              }
-            }
+            prepareJars(app);
           }
 
           @Override
@@ -333,11 +297,6 @@ public class AppBuildPipeServiceImpl
       case YARN_APPLICATION:
         String yarnProvidedPath = app.getAppLib();
         String localWorkspace = app.getLocalAppHome().concat("/lib");
-        if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE)
-            && app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) {
-          yarnProvidedPath = app.getAppHome();
-          localWorkspace = app.getLocalAppHome();
-        }
         FlinkYarnApplicationBuildRequest yarnAppRequest =
             new FlinkYarnApplicationBuildRequest(
                 app.getJobName(),
@@ -408,6 +367,72 @@ public class AppBuildPipeServiceImpl
     }
   }
 
+  private void prepareJars(Application app) {
+    File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
+    if (!localUploadDIR.exists()) {
+      localUploadDIR.mkdirs();
+    }
+
+    // 1. copy jar to local upload dir
+    if (app.isFlinkSqlJob() || app.isUploadJob()) {
+      if (!app.getDependencyObject().getJar().isEmpty()) {
+        for (String jar : app.getDependencyObject().getJar()) {
+          File localJar = new File(WebUtils.getAppTempDir(), jar);
+          File uploadJar = new File(localUploadDIR, jar);
+          if (!localJar.exists() && !uploadJar.exists()) {
+            throw new ApiAlertException("Missing file: " + jar + ", please 
upload again");
+          }
+          if (localJar.exists()) {
+            checkOrElseUploadJar(
+                FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), 
localUploadDIR);
+          }
+        }
+      }
+    }
+
+    if (app.isCustomCodeJob()) {
+      // customCode upload jar to appHome...
+      FsOperator fsOperator = app.getFsOperator();
+      if (app.isUploadJob()) {
+
+        // 1). upload jar to local upload.
+        File uploadJar = new File(localUploadDIR, app.getJar());
+
+        checkOrElseUploadJar(
+            FsOperator.lfs(),
+            new File(WebUtils.getAppTempDir(), app.getJar()),
+            uploadJar.getAbsolutePath(),
+            localUploadDIR);
+
+        if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
+          List<File> jars = new ArrayList<>(0);
+          jars.add(uploadJar);
+
+          // 2). jar dependency to local upload
+          if (!app.getDependencyObject().getJar().isEmpty()) {
+            for (String jar : app.getDependencyObject().getJar()) {
+              jars.add(new File(localUploadDIR, jar));
+            }
+          }
+
+          // 3. pom dependency to local upload
+          if (!app.getDependencyInfo().mavenArts().isEmpty()) {
+            Set<File> dependJars =
+                
MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts());
+            jars.addAll(dependJars);
+          }
+          fsOperator.mkdirs(app.getAppLib());
+          jars.forEach(
+              jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib(), 
false, true));
+        }
+      } else {
+        String appHome = app.getAppHome();
+        fsOperator.mkCleanDirs(appHome);
+        fsOperator.upload(app.getDistHome(), appHome);
+      }
+    }
+  }
+
   /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
   private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) {
     switch (app.getDevelopmentMode()) {
@@ -416,7 +441,7 @@ public class AppBuildPipeServiceImpl
           case STREAMPARK_FLINK:
             return String.format("%s/%s", app.getAppLib(), 
app.getModule().concat(".jar"));
           case APACHE_FLINK:
-            return String.format("%s/%s", WebUtils.getAppTempDir(), 
app.getJar());
+            return String.format("%s/%s", Workspace.local().APP_UPLOADS(), 
app.getJar());
           default:
             throw new IllegalArgumentException(
                 "[StreamPark] unsupported ApplicationType of custom code: "
@@ -487,13 +512,13 @@ public class AppBuildPipeServiceImpl
   }
 
   private void checkOrElseUploadJar(
-      FsOperator fsOperator, File localJar, String targetJar, String 
targetDir) {
+      FsOperator fsOperator, File localJar, String targetJar, File targetDir) {
     if (!fsOperator.exists(targetJar)) {
-      fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true);
+      fsOperator.upload(localJar.getAbsolutePath(), 
targetDir.getAbsolutePath(), false, true);
     } else {
       // The file exists to check whether it is consistent, and if it is 
inconsistent, re-upload it
       if (!FileUtils.equals(localJar, new File(targetJar))) {
-        fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true);
+        fsOperator.upload(localJar.getAbsolutePath(), 
targetDir.getAbsolutePath(), false, true);
       }
     }
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index e23c12564..99f21fd59 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -819,6 +819,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     newApp.setJarCheckSum(oldApp.getJarCheckSum());
     newApp.setTags(oldApp.getTags());
     newApp.setTeamId(oldApp.getTeamId());
+    newApp.setDependency(oldApp.getDependency());
 
     boolean saved = save(newApp);
     if (saved) {

Reply via email to