This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch yarn-app
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/yarn-app by this push:
     new 6d0666b31 [Bug] yarn application mode bug fixed.
6d0666b31 is described below

commit 6d0666b315fb3e1236a4e97c18128bfd7947e53a
Author: benjobs <[email protected]>
AuthorDate: Wed Nov 8 02:01:52 2023 +0800

    [Bug] yarn application mode bug fixed.
---
 .../core/service/impl/AppBuildPipeServiceImpl.java |  2 -
 .../streampark/flink/packer/maven/MavenTool.scala  |  5 +++
 .../flink/packer/pipeline/BuildRequest.scala       |  1 -
 .../impl/FlinkYarnApplicationBuildPipeline.scala   | 46 ++++++++--------------
 4 files changed, 21 insertions(+), 33 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 11295412f..41d451335 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -296,12 +296,10 @@ public class AppBuildPipeServiceImpl
     switch (executionMode) {
       case YARN_APPLICATION:
         String yarnProvidedPath = app.getAppLib();
-        String localWorkspace = app.getLocalAppHome().concat("/lib");
         FlinkYarnApplicationBuildRequest yarnAppRequest =
             new FlinkYarnApplicationBuildRequest(
                 app.getJobName(),
                 mainClass,
-                localWorkspace,
                 yarnProvidedPath,
                 app.getDevelopmentMode(),
                 app.getDependencyInfo());
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 5dab91892..05562a870 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,8 +43,10 @@ import javax.annotation.{Nonnull, Nullable}
 
 import java.io.File
 import java.util
+import java.util.{HashSet, Set => JavaSet}
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
@@ -176,6 +178,9 @@ object MavenTool extends Logger {
     buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
   }
 
+  def resolveArtifactsAsJava(mavenArtifacts: Set[Artifact]): JavaSet[File] = 
resolveArtifacts(
+    mavenArtifacts).asJava
+
   /**
    * Resolve the collectoin of artifacts, Artifacts will be download to 
ConfigConst.MAVEN_LOCAL_DIR
    * if necessary. notes: Only compile scope dependencies will be resolved.
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 62f90935a..5abc55b92 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -116,7 +116,6 @@ case class FlinkRemotePerJobBuildRequest(
 case class FlinkYarnApplicationBuildRequest(
     appName: String,
     mainClass: String,
-    localWorkspace: String,
     yarnProvidedPath: String,
     developmentMode: DevelopmentMode,
     dependencyInfo: DependencyInfo)
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 92c1d9199..dcf6ad86f 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.packer.pipeline.impl
 
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.DevelopmentMode
-import org.apache.streampark.common.fs.{FsOperator, HdfsOperator, LfsOperator}
+import org.apache.streampark.common.fs.{FsOperator, HdfsOperator}
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.packer.maven.MavenTool
 import org.apache.streampark.flink.packer.pipeline._
@@ -45,9 +45,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
   override protected def buildProcess(): SimpleBuildResponse = {
     execStep(1) {
       request.developmentMode match {
-        case DevelopmentMode.FLINK_SQL =>
-          LfsOperator.mkCleanDirs(request.localWorkspace)
-          HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
+        case DevelopmentMode.FLINK_SQL => 
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
         case _ =>
       }
       logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
@@ -64,11 +62,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
       }.getOrElse(throw getError.exception)
 
     execStep(3) {
-      mavenJars.foreach(
-        jar => {
-          uploadToHdfs(FsOperator.lfs, jar, request.localWorkspace)
-          uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
-        })
+      mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, 
request.yarnProvidedPath))
     }.getOrElse(throw getError.exception)
 
     SimpleBuildResponse()
@@ -82,29 +76,21 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
     }
     if (originFile.isFile) {
       // check file in upload dir
-      fsOperator match {
-        case FsOperator.lfs =>
-          fsOperator.copy(originFile.getAbsolutePath, target)
-        case FsOperator.hdfs =>
-          val uploadFile = 
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
-          if (fsOperator.exists(uploadFile)) {
-            Utils.using(new FileInputStream(originFile))(
-              inputStream => {
-                if (DigestUtils.md5Hex(inputStream) != 
fsOperator.fileMd5(uploadFile)) {
-                  fsOperator.upload(originFile.getAbsolutePath, uploadFile)
-                }
-              })
-          } else {
-            fsOperator.upload(originFile.getAbsolutePath, uploadFile)
-          }
-          // copy jar from upload dir to target dir
-          fsOperator.copy(uploadFile, target)
+      val uploadFile = s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
+      if (fsOperator.exists(uploadFile)) {
+        Utils.using(new FileInputStream(originFile))(
+          inputStream => {
+            if (DigestUtils.md5Hex(inputStream) != 
fsOperator.fileMd5(uploadFile)) {
+              fsOperator.upload(originFile.getAbsolutePath, uploadFile)
+            }
+          })
+      } else {
+        fsOperator.upload(originFile.getAbsolutePath, uploadFile)
       }
+      // copy jar from upload dir to target dir
+      fsOperator.copy(uploadFile, target)
     } else {
-      fsOperator match {
-        case FsOperator.hdfs => fsOperator.upload(originFile.getAbsolutePath, 
target)
-        case _ =>
-      }
+      fsOperator.upload(originFile.getAbsolutePath, target)
     }
   }
 

Reply via email to