This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch yarn-app
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/yarn-app by this push:
new 6d0666b31 [Bug] yarn application mode bug fixed.
6d0666b31 is described below
commit 6d0666b315fb3e1236a4e97c18128bfd7947e53a
Author: benjobs <[email protected]>
AuthorDate: Wed Nov 8 02:01:52 2023 +0800
[Bug] yarn application mode bug fixed.
---
.../core/service/impl/AppBuildPipeServiceImpl.java | 2 -
.../streampark/flink/packer/maven/MavenTool.scala | 5 +++
.../flink/packer/pipeline/BuildRequest.scala | 1 -
.../impl/FlinkYarnApplicationBuildPipeline.scala | 46 ++++++++--------------
4 files changed, 21 insertions(+), 33 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 11295412f..41d451335 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -296,12 +296,10 @@ public class AppBuildPipeServiceImpl
switch (executionMode) {
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
- String localWorkspace = app.getLocalAppHome().concat("/lib");
FlinkYarnApplicationBuildRequest yarnAppRequest =
new FlinkYarnApplicationBuildRequest(
app.getJobName(),
mainClass,
- localWorkspace,
yarnProvidedPath,
app.getDevelopmentMode(),
app.getDependencyInfo());
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 5dab91892..05562a870 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,8 +43,10 @@ import javax.annotation.{Nonnull, Nullable}
import java.io.File
import java.util
+import java.util.{HashSet, Set => JavaSet}
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
@@ -176,6 +178,9 @@ object MavenTool extends Logger {
buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
}
+ def resolveArtifactsAsJava(mavenArtifacts: Set[Artifact]): JavaSet[File] =
resolveArtifacts(
+ mavenArtifacts).asJava
+
/**
* Resolve the collectoin of artifacts, Artifacts will be download to
ConfigConst.MAVEN_LOCAL_DIR
* if necessary. notes: Only compile scope dependencies will be resolved.
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 62f90935a..5abc55b92 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -116,7 +116,6 @@ case class FlinkRemotePerJobBuildRequest(
case class FlinkYarnApplicationBuildRequest(
appName: String,
mainClass: String,
- localWorkspace: String,
yarnProvidedPath: String,
developmentMode: DevelopmentMode,
dependencyInfo: DependencyInfo)
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 92c1d9199..dcf6ad86f 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.packer.pipeline.impl
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.DevelopmentMode
-import org.apache.streampark.common.fs.{FsOperator, HdfsOperator, LfsOperator}
+import org.apache.streampark.common.fs.{FsOperator, HdfsOperator}
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.packer.maven.MavenTool
import org.apache.streampark.flink.packer.pipeline._
@@ -45,9 +45,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
override protected def buildProcess(): SimpleBuildResponse = {
execStep(1) {
request.developmentMode match {
- case DevelopmentMode.FLINK_SQL =>
- LfsOperator.mkCleanDirs(request.localWorkspace)
- HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
+ case DevelopmentMode.FLINK_SQL =>
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
case _ =>
}
logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
@@ -64,11 +62,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
}.getOrElse(throw getError.exception)
execStep(3) {
- mavenJars.foreach(
- jar => {
- uploadToHdfs(FsOperator.lfs, jar, request.localWorkspace)
- uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
- })
+ mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar,
request.yarnProvidedPath))
}.getOrElse(throw getError.exception)
SimpleBuildResponse()
@@ -82,29 +76,21 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
}
if (originFile.isFile) {
// check file in upload dir
- fsOperator match {
- case FsOperator.lfs =>
- fsOperator.copy(originFile.getAbsolutePath, target)
- case FsOperator.hdfs =>
- val uploadFile =
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
- if (fsOperator.exists(uploadFile)) {
- Utils.using(new FileInputStream(originFile))(
- inputStream => {
- if (DigestUtils.md5Hex(inputStream) !=
fsOperator.fileMd5(uploadFile)) {
- fsOperator.upload(originFile.getAbsolutePath, uploadFile)
- }
- })
- } else {
- fsOperator.upload(originFile.getAbsolutePath, uploadFile)
- }
- // copy jar from upload dir to target dir
- fsOperator.copy(uploadFile, target)
+ val uploadFile = s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
+ if (fsOperator.exists(uploadFile)) {
+ Utils.using(new FileInputStream(originFile))(
+ inputStream => {
+ if (DigestUtils.md5Hex(inputStream) !=
fsOperator.fileMd5(uploadFile)) {
+ fsOperator.upload(originFile.getAbsolutePath, uploadFile)
+ }
+ })
+ } else {
+ fsOperator.upload(originFile.getAbsolutePath, uploadFile)
}
+ // copy jar from upload dir to target dir
+ fsOperator.copy(uploadFile, target)
} else {
- fsOperator match {
- case FsOperator.hdfs => fsOperator.upload(originFile.getAbsolutePath,
target)
- case _ =>
- }
+ fsOperator.upload(originFile.getAbsolutePath, target)
}
}