This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new ad848d16c [Bug] submit flink job bug fixed (#3348)
ad848d16c is described below

commit ad848d16cae982c19d7fc7a9fd35f28262ca0d26
Author: benjobs <[email protected]>
AuthorDate: Sat Nov 18 16:39:43 2023 +0800

    [Bug] submit flink job bug fixed (#3348)
    
    * [Improve] submit flink job on yarn application|perjob mode bug fixed
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../apache/streampark/common/util/FileUtils.scala  |  11 ++
 .../console/core/entity/Application.java           |  11 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java | 158 +++++++++++----------
 .../core/service/impl/ApplicationServiceImpl.java  |   9 +-
 .../src/main/resources/application-mysql.yml       |   2 +-
 .../flink/client/trait/FlinkClientTrait.scala      |  11 +-
 .../streampark/flink/packer/maven/Artifact.scala   |  12 +-
 .../streampark/flink/packer/maven/MavenTool.scala  |  28 ++--
 .../impl/FlinkYarnApplicationBuildPipeline.scala   |  32 ++---
 9 files changed, 140 insertions(+), 134 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 708943098..77618fbd4 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -79,6 +79,17 @@ object FileUtils {
       s"[StreamPark] FileUtils.exists: file $path is not exist!")
   }
 
+  def mkdir(dir: File) = {
+    if (dir.exists && !dir.isDirectory) {
+      throw new IOException(s"File $dir exists and is not a directory. Unable 
to create directory.")
+    } else if (!dir.mkdirs) {
+      // Double-check that some other thread or process hasn't made
+      if (!dir.isDirectory) {
+        throw new IOException(s"Unable to create directory $dir")
+      }
+    }
+  }
+
   def getPathFromEnv(env: String): String = {
     val path = System.getenv(env)
     require(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 3c0f2436e..1bbee1382 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -388,26 +388,21 @@ public class Application implements Serializable {
   public String getDistHome() {
     String path =
         String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), 
projectId.toString(), getModule());
-    log.info("local distHome:{}", path);
+    log.info("local distHome: {}", path);
     return path;
   }
 
-  @JsonIgnore
-  public String getDistJar() {
-    return getDistHome() + "/" + getJar();
-  }
-
   @JsonIgnore
   public String getLocalAppHome() {
     String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(), 
id.toString());
-    log.info("local appHome:{}", path);
+    log.info("local appHome: {}", path);
     return path;
   }
 
   @JsonIgnore
   public String getRemoteAppHome() {
     String path = String.format("%s/%s", Workspace.remote().APP_WORKSPACE(), 
id.toString());
-    log.info("remote appHome:{}", path);
+    log.info("remote appHome: {}", path);
     return path;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 1ee33cf01..59c0e108f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.FileUtils;
@@ -373,9 +374,7 @@ public class AppBuildPipeServiceImpl
 
   private void prepareJars(Application app) throws IOException {
     File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
-    if (!localUploadDIR.exists()) {
-      localUploadDIR.mkdirs();
-    }
+    FileUtils.mkdir(localUploadDIR);
 
     FsOperator localFS = FsOperator.lfs();
     // 1. copy jar to local upload dir
@@ -398,91 +397,106 @@ public class AppBuildPipeServiceImpl
       // customCode upload jar to appHome...
       FsOperator fsOperator = app.getFsOperator();
       ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
-      switch (resourceFrom) {
-        case CICD:
-          String appLib = app.getAppLib();
-          fsOperator.mkCleanDirs(appLib);
-          fsOperator.upload(app.getDistJar(), appLib);
-          break;
-        case UPLOAD:
-          // 1). upload jar to local uploadDIR.
-          File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
-          File localUploadJar = new File(localUploadDIR, app.getJar());
-          checkOrElseUploadJar(localFS, localJar, localUploadJar, 
localUploadDIR);
-
-          // 2) copy jar to local $app_home/lib
-          File libJar = new File(app.getLocalAppLib(), app.getJar());
-          if (!localFS.exists(app.getLocalAppLib())
-              || !libJar.exists()
-              || !FileUtils.equals(localJar, libJar)) {
-            localFS.mkCleanDirs(app.getLocalAppLib());
-            localFS.upload(localUploadJar.getAbsolutePath(), 
app.getLocalAppLib());
+
+      File userJar;
+      if (resourceFrom == ResourceFrom.CICD) {
+        userJar = getAppDistJar(app);
+      } else if (resourceFrom == ResourceFrom.UPLOAD) {
+        userJar = new File(WebUtils.getAppTempDir(), app.getJar());
+      } else {
+        throw new IllegalArgumentException("ResourceFrom error: " + 
resourceFrom);
+      }
+      // 2) copy user jar to localUpload DIR
+      File localUploadJar = new File(localUploadDIR, userJar.getName());
+      checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR);
+
+      // 3) for YARNApplication mode
+      if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
+        // 1) upload user jar to hdfs workspace
+        if (!fsOperator.exists(app.getAppHome())) {
+          fsOperator.mkdirs(app.getAppHome());
+        }
+        String pipelineJar = 
app.getAppHome().concat("/").concat(userJar.getName());
+        if (!fsOperator.exists(pipelineJar)) {
+          fsOperator.upload(localUploadJar.getAbsolutePath(), 
app.getAppHome());
+        } else {
+          InputStream inputStream = 
Files.newInputStream(localUploadJar.toPath());
+          if 
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(pipelineJar))) {
+            fsOperator.upload(localUploadJar.getAbsolutePath(), 
app.getAppHome());
           }
+        }
 
-          // 3) for YARNApplication mode
-          if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
-            List<File> jars = new ArrayList<>(0);
-
-            // 1). user jar
-            jars.add(libJar);
-
-            // 2). jar dependency
-            app.getMavenDependency()
-                .getJar()
-                .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
-
-            // 3). pom dependency
-            if (!app.getMavenDependency().getPom().isEmpty()) {
-              Set<Artifact> artifacts =
-                  app.getMavenDependency().getPom().stream()
-                      .filter(x -> !new File(localUploadDIR, 
x.artifactName()).exists())
-                      .map(
-                          pom ->
-                              new Artifact(
-                                  pom.getGroupId(),
-                                  pom.getArtifactId(),
-                                  pom.getVersion(),
-                                  pom.getClassifier(),
-                                  pom.toExclusionString()))
-                      .collect(Collectors.toSet());
-              Set<File> mavenArts = 
MavenTool.resolveArtifactsAsJava(artifacts);
-              jars.addAll(mavenArts);
-            }
+        List<File> dependencyJars = new ArrayList<>(0);
+
+        // 2). jar dependency
+        app.getMavenDependency()
+            .getJar()
+            .forEach(jar -> dependencyJars.add(new File(localUploadDIR, jar)));
+
+        // 3). pom dependency
+        if (!app.getMavenDependency().getPom().isEmpty()) {
+          Set<Artifact> artifacts =
+              app.getMavenDependency().getPom().stream()
+                  .filter(x -> !new File(localUploadDIR, 
x.artifactName()).exists())
+                  .map(
+                      pom ->
+                          new Artifact(
+                              pom.getGroupId(),
+                              pom.getArtifactId(),
+                              pom.getVersion(),
+                              pom.getClassifier(),
+                              pom.toExclusionString()))
+                  .collect(Collectors.toSet());
+          Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
+          dependencyJars.addAll(mavenArts);
+        }
 
-            // 4). local uploadDIR to hdfs uploadsDIR
-            String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
-            for (File jarFile : jars) {
-              String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
-              if (!fsOperator.exists(hdfsUploadPath)) {
-                fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
-              } else {
-                InputStream inputStream = 
Files.newInputStream(jarFile.toPath());
-                if 
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
-                  fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
-                }
-              }
+        // 4). local uploadDIR to hdfs uploadsDIR
+        String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
+        for (File jarFile : dependencyJars) {
+          String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
+          if (!fsOperator.exists(hdfsUploadPath)) {
+            fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+          } else {
+            InputStream inputStream = Files.newInputStream(jarFile.toPath());
+            if 
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+              fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
             }
-            // 5). copy jars to $hdfs_app_home/lib
-            fsOperator.mkCleanDirs(app.getAppLib());
-            jars.forEach(
-                jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), 
app.getAppLib()));
           }
-          break;
-        default:
-          throw new IllegalArgumentException("ResourceFrom error: " + 
resourceFrom);
+        }
+        // 5). copy jars to $hdfs_app_home/lib
+        if (!fsOperator.exists(app.getAppLib())) {
+          fsOperator.mkdirs(app.getAppLib());
+        } else {
+          fsOperator.mkCleanDirs(app.getAppLib());
+        }
+        dependencyJars.forEach(
+            jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), 
app.getAppLib()));
       }
     }
   }
 
+  private File getAppDistJar(Application app) {
+    if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
+      return new File(app.getDistHome(), app.getModule().concat(".jar"));
+    }
+    if (app.getApplicationType() == ApplicationType.APACHE_FLINK) {
+      return new File(app.getDistHome(), app.getJar());
+    }
+    throw new IllegalArgumentException(
+        "[StreamPark] unsupported ApplicationType of custom code: " + 
app.getApplicationType());
+  }
+
   /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
   private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) {
+    File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
     switch (app.getDevelopmentMode()) {
       case CUSTOM_CODE:
         switch (app.getApplicationType()) {
           case STREAMPARK_FLINK:
-            return String.format("%s/%s", app.getLocalAppLib(), 
app.getModule().concat(".jar"));
+            return String.format("%s/%s", localUploadDIR, 
app.getModule().concat(".jar"));
           case APACHE_FLINK:
-            return String.format("%s/%s", app.getLocalAppLib(), app.getJar());
+            return String.format("%s/%s", localUploadDIR, app.getJar());
           default:
             throw new IllegalArgumentException(
                 "[StreamPark] unsupported ApplicationType of custom code: "
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 99da52420..0d1d45785 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1487,15 +1487,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           case STREAMPARK_FLINK:
             flinkUserJar =
                 String.format(
-                    "%s/%s", application.getAppLib(), 
application.getModule().concat(".jar"));
+                    "%s/%s", application.getAppHome(), 
application.getModule().concat(".jar"));
             break;
           case APACHE_FLINK:
-            if (application.getFsOperator().exists(application.getAppLib())) {
-              flinkUserJar = String.format("%s/%s", application.getAppLib(), 
application.getJar());
-            } else {
-              // compatible with historical version
-              flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
-            }
+            flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
             break;
           default:
             throw new IllegalArgumentException(
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
 
b/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
index e8d0b760a..f5b82362c 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
@@ -20,4 +20,4 @@ spring:
     username: root
     password: streampark
     driver-class-name: com.mysql.cj.jdbc.Driver
-    url: 
jdbc:mysql://localhost:3306/streampark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
+    url: 
jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 6a2e8c95c..8e3ae8cba 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -233,7 +233,7 @@ trait FlinkClientTrait extends Logger {
       submitRequest: SubmitRequest,
       jarFile: File): (PackagedProgram, JobGraph) = {
 
-    val pgkBuilder = PackagedProgram.newBuilder
+    val packageProgram = PackagedProgram.newBuilder
       .setJarFile(jarFile)
       .setEntryPointClassName(
         
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
@@ -242,14 +242,7 @@ trait FlinkClientTrait extends Logger {
         flinkConfig
           .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
           .orElse(Lists.newArrayList()): _*)
-    // userClassPath...
-    submitRequest.executionMode match {
-      case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB =>
-        pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs)
-      case _ =>
-    }
-
-    val packageProgram = pgkBuilder.build()
+      .build()
 
     val jobGraph = PackagedProgramUtils.createJobGraph(
       packageProgram,
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
index 1e8245228..b6fd04bc6 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
@@ -30,13 +30,11 @@ case class Artifact(
     extensions: JavaSet[String] = Collections.emptySet()) {
 
   def filter(artifact: AetherArtifact): Boolean = {
-    artifact.getGroupId match {
-      case g if g == groupId =>
-        artifact.getArtifactId match {
-          case "*" => true
-          case a => a == artifactId
-        }
-      case _ => false
+    (artifact.getGroupId, artifact.getArtifactId) match {
+      case ("*", "*") => true
+      case (g, "*") => g == this.groupId
+      case ("*", a) => a == this.artifactId
+      case (g, a) => g == this.groupId && a == this.artifactId
     }
   }
 }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index c16e1440e..451c75dda 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -29,7 +29,7 @@ import 
org.apache.maven.repository.internal.MavenRepositorySystemUtils
 import org.codehaus.plexus.logging.{Logger => PlexusLog}
 import org.codehaus.plexus.logging.console.ConsoleLogger
 import org.eclipse.aether.{RepositorySystem, RepositorySystemSession}
-import org.eclipse.aether.artifact.DefaultArtifact
+import org.eclipse.aether.artifact.{Artifact, DefaultArtifact}
 import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory
 import org.eclipse.aether.repository.{LocalRepository, RemoteRepository}
 import org.eclipse.aether.resolution.{ArtifactDescriptorRequest, 
ArtifactRequest}
@@ -205,23 +205,29 @@ object MavenTool extends Logger {
 
     val (repoSystem, session) = getMavenEndpoint()
 
-    val artifacts = mavenArtifacts.map(
-      e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", 
e.version))
-
     val exclusions = mavenArtifacts
       .flatMap(_.extensions.map(_.split(":")))
       .map(a => Artifact(a.head, a.last, null)) ++ excludeArtifact
 
     val remoteRepos = getRemoteRepos()
 
+    val exclusionAll = exclusions.exists(e => e.groupId == "*" && e.artifactId 
== "*")
+
+    val artifacts = mavenArtifacts.map(
+      e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", 
e.version))
+
     // read relevant artifact descriptor info and excluding items if necessary.
-    val dependencies = artifacts
-      .map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, 
null))
-      .map(descReq => repoSystem.readArtifactDescriptor(session, descReq))
-      .flatMap(_.getDependencies)
-      .filter(_.getScope == "compile")
-      .filter(dep => !exclusions.exists(_.filter(dep.getArtifact)))
-      .map(_.getArtifact)
+    val dependencies =
+      if (exclusionAll) Set.empty[DefaultArtifact]
+      else {
+        artifacts
+          .map(artifact => new ArtifactDescriptorRequest(artifact, 
remoteRepos, null))
+          .map(descReq => repoSystem.readArtifactDescriptor(session, descReq))
+          .flatMap(_.getDependencies)
+          .filter(_.getScope == "compile")
+          .filter(dep => !exclusions.exists(_.filter(dep.getArtifact)))
+          .map(_.getArtifact)
+      }
 
     val mergedArtifacts = artifacts ++ dependencies
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index dcf6ad86f..d678c81a0 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -43,28 +43,22 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
    */
   @throws[Throwable]
   override protected def buildProcess(): SimpleBuildResponse = {
-    execStep(1) {
-      request.developmentMode match {
-        case DevelopmentMode.FLINK_SQL => 
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
-        case _ =>
-      }
-      logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
-    }.getOrElse(throw getError.exception)
-
-    val mavenJars =
-      execStep(2) {
-        request.developmentMode match {
-          case DevelopmentMode.FLINK_SQL =>
-            val mavenArts = 
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
-            mavenArts.map(_.getAbsolutePath) ++ 
request.dependencyInfo.extJarLibs
-          case _ => Set[String]()
-        }
+    if (request.developmentMode == DevelopmentMode.FLINK_SQL) {
+      execStep(1) {
+        HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
+        logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
       }.getOrElse(throw getError.exception)
 
-    execStep(3) {
-      mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, 
request.yarnProvidedPath))
-    }.getOrElse(throw getError.exception)
+      val mavenJars =
+        execStep(2) {
+          val mavenArts = 
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
+          mavenArts.map(_.getAbsolutePath) ++ request.dependencyInfo.extJarLibs
+        }.getOrElse(throw getError.exception)
 
+      execStep(3) {
+        mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, 
request.yarnProvidedPath))
+      }.getOrElse(throw getError.exception)
+    }
     SimpleBuildResponse()
   }
 

Reply via email to