This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch yarn-app in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit c9ab619e3358a7f8e718a9d8e68b01c53347575b Author: benjobs <[email protected]> AuthorDate: Wed Nov 8 01:40:38 2023 +0800 [Bug] yarn application bug fixed --- .../core/service/impl/AppBuildPipeServiceImpl.java | 123 +++++++++++++-------- .../core/service/impl/ApplicationServiceImpl.java | 1 + 2 files changed, 75 insertions(+), 49 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index f6fb7e641..11295412f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -19,8 +19,6 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.Workspace; -import org.apache.streampark.common.enums.ApplicationType; -import org.apache.streampark.common.enums.DevelopmentMode; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.FileUtils; @@ -52,6 +50,7 @@ import org.apache.streampark.console.core.service.MessageService; import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher; import org.apache.streampark.flink.packer.docker.DockerConf; +import org.apache.streampark.flink.packer.maven.MavenTool; import org.apache.streampark.flink.packer.pipeline.BuildPipeline; import org.apache.streampark.flink.packer.pipeline.BuildResult; import org.apache.streampark.flink.packer.pipeline.DockerBuildSnapshot; @@ -87,10 +86,12 @@ import org.springframework.transaction.annotation.Transactional; import javax.annotation.Nonnull; import java.io.File; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -180,44 +181,7 @@ public class AppBuildPipeServiceImpl applicationService.checkEnv(app); // 2) some preparatory work - String appUploads = app.getWorkspace().APP_UPLOADS(); - - if (app.isCustomCodeJob()) { - // customCode upload jar to appHome... - FsOperator fsOperator = app.getFsOperator(); - if (app.isCICDJob()) { - String appHome = app.getAppHome(); - fsOperator.mkCleanDirs(appHome); - fsOperator.upload(app.getDistHome(), appHome); - } else { - File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); - // upload jar copy to appHome - String uploadJar = appUploads.concat("/").concat(app.getJar()); - checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads); - if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) { - fsOperator.mkdirs(app.getAppLib()); - fsOperator.copy(uploadJar, app.getAppLib(), false, true); - } - } - } - - if (app.isFlinkSqlJob() || app.isUploadJob()) { - if (!app.getDependencyObject().getJar().isEmpty()) { - String localUploads = Workspace.local().APP_UPLOADS(); - // copy jar to local upload dir - for (String jar : app.getDependencyObject().getJar()) { - File localJar = new File(WebUtils.getAppTempDir(), jar); - File uploadJar = new File(localUploads, jar); - if (!localJar.exists() && !uploadJar.exists()) { - throw new ApiAlertException("Missing file: " + jar + ", please upload again"); - } - if (localJar.exists()) { - checkOrElseUploadJar( - FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), localUploads); - } - } - } - } + prepareJars(app); } @Override @@ -333,11 +297,6 @@ public class AppBuildPipeServiceImpl case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); String localWorkspace = app.getLocalAppHome().concat("/lib"); - if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE) - && app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) { - yarnProvidedPath = app.getAppHome(); - localWorkspace = app.getLocalAppHome(); - } FlinkYarnApplicationBuildRequest yarnAppRequest = new FlinkYarnApplicationBuildRequest( app.getJobName(), @@ -408,6 +367,72 @@ public class AppBuildPipeServiceImpl } } + private void prepareJars(Application app) { + File localUploadDIR = new File(Workspace.local().APP_UPLOADS()); + if (!localUploadDIR.exists()) { + localUploadDIR.mkdirs(); + } + + // 1. copy jar to local upload dir + if (app.isFlinkSqlJob() || app.isUploadJob()) { + if (!app.getDependencyObject().getJar().isEmpty()) { + for (String jar : app.getDependencyObject().getJar()) { + File localJar = new File(WebUtils.getAppTempDir(), jar); + File uploadJar = new File(localUploadDIR, jar); + if (!localJar.exists() && !uploadJar.exists()) { + throw new ApiAlertException("Missing file: " + jar + ", please upload again"); + } + if (localJar.exists()) { + checkOrElseUploadJar( + FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), localUploadDIR); + } + } + } + } + + if (app.isCustomCodeJob()) { + // customCode upload jar to appHome... + FsOperator fsOperator = app.getFsOperator(); + if (app.isUploadJob()) { + + // 1). upload jar to local upload. + File uploadJar = new File(localUploadDIR, app.getJar()); + + checkOrElseUploadJar( + FsOperator.lfs(), + new File(WebUtils.getAppTempDir(), app.getJar()), + uploadJar.getAbsolutePath(), + localUploadDIR); + + if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { + List<File> jars = new ArrayList<>(0); + jars.add(uploadJar); + + // 2). jar dependency to local upload + if (!app.getDependencyObject().getJar().isEmpty()) { + for (String jar : app.getDependencyObject().getJar()) { + jars.add(new File(localUploadDIR, jar)); + } + } + + // 3. pom dependency to local upload + if (!app.getDependencyInfo().mavenArts().isEmpty()) { + Set<File> dependJars = + MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()); + jars.addAll(dependJars); + } + fsOperator.mkdirs(app.getAppLib()); + jars.forEach( + jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib(), false, true)); + } + } else { + String appHome = app.getAppHome(); + fsOperator.mkCleanDirs(appHome); + fsOperator.upload(app.getDistHome(), appHome); + } + } + } + /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) { switch (app.getDevelopmentMode()) { @@ -416,7 +441,7 @@ public class AppBuildPipeServiceImpl case STREAMPARK_FLINK: return String.format("%s/%s", app.getAppLib(), app.getModule().concat(".jar")); case APACHE_FLINK: - return String.format("%s/%s", WebUtils.getAppTempDir(), app.getJar()); + return String.format("%s/%s", Workspace.local().APP_UPLOADS(), app.getJar()); default: throw new IllegalArgumentException( "[StreamPark] unsupported ApplicationType of custom code: " @@ -487,13 +512,13 @@ public class AppBuildPipeServiceImpl } private void checkOrElseUploadJar( - FsOperator fsOperator, File localJar, String targetJar, String targetDir) { + FsOperator fsOperator, File localJar, String targetJar, File targetDir) { if (!fsOperator.exists(targetJar)) { - fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true); + fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath(), false, true); } else { // The file exists to check whether it is consistent, and if it is inconsistent, re-upload it if (!FileUtils.equals(localJar, new File(targetJar))) { - fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true); + fsOperator.upload(localJar.getAbsolutePath(), targetDir.getAbsolutePath(), false, true); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index e23c12564..99f21fd59 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -819,6 +819,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli newApp.setJarCheckSum(oldApp.getJarCheckSum()); newApp.setTags(oldApp.getTags()); newApp.setTeamId(oldApp.getTeamId()); + newApp.setDependency(oldApp.getDependency()); boolean saved = save(newApp); if (saved) {
