This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new ad848d16c [Bug] submit flink job bug fixed (#3348)
ad848d16c is described below
commit ad848d16cae982c19d7fc7a9fd35f28262ca0d26
Author: benjobs <[email protected]>
AuthorDate: Sat Nov 18 16:39:43 2023 +0800
[Bug] submit flink job bug fixed (#3348)
* [Improve] submit flink job on yarn application|perjob mode bug fixed
---------
Co-authored-by: benjobs <[email protected]>
---
.../apache/streampark/common/util/FileUtils.scala | 11 ++
.../console/core/entity/Application.java | 11 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 158 +++++++++++----------
.../core/service/impl/ApplicationServiceImpl.java | 9 +-
.../src/main/resources/application-mysql.yml | 2 +-
.../flink/client/trait/FlinkClientTrait.scala | 11 +-
.../streampark/flink/packer/maven/Artifact.scala | 12 +-
.../streampark/flink/packer/maven/MavenTool.scala | 28 ++--
.../impl/FlinkYarnApplicationBuildPipeline.scala | 32 ++---
9 files changed, 140 insertions(+), 134 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 708943098..77618fbd4 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -79,6 +79,17 @@ object FileUtils {
s"[StreamPark] FileUtils.exists: file $path is not exist!")
}
+ def mkdir(dir: File) = {
+ if (dir.exists && !dir.isDirectory) {
+ throw new IOException(s"File $dir exists and is not a directory. Unable
to create directory.")
+ } else if (!dir.mkdirs) {
+ // Double-check that some other thread or process hasn't made
+ if (!dir.isDirectory) {
+ throw new IOException(s"Unable to create directory $dir")
+ }
+ }
+ }
+
def getPathFromEnv(env: String): String = {
val path = System.getenv(env)
require(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 3c0f2436e..1bbee1382 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -388,26 +388,21 @@ public class Application implements Serializable {
public String getDistHome() {
String path =
String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(),
projectId.toString(), getModule());
- log.info("local distHome:{}", path);
+ log.info("local distHome: {}", path);
return path;
}
- @JsonIgnore
- public String getDistJar() {
- return getDistHome() + "/" + getJar();
- }
-
@JsonIgnore
public String getLocalAppHome() {
String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(),
id.toString());
- log.info("local appHome:{}", path);
+ log.info("local appHome: {}", path);
return path;
}
@JsonIgnore
public String getRemoteAppHome() {
String path = String.format("%s/%s", Workspace.remote().APP_WORKSPACE(),
id.toString());
- log.info("remote appHome:{}", path);
+ log.info("remote appHome: {}", path);
return path;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 1ee33cf01..59c0e108f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
@@ -373,9 +374,7 @@ public class AppBuildPipeServiceImpl
private void prepareJars(Application app) throws IOException {
File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
- if (!localUploadDIR.exists()) {
- localUploadDIR.mkdirs();
- }
+ FileUtils.mkdir(localUploadDIR);
FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
@@ -398,91 +397,106 @@ public class AppBuildPipeServiceImpl
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
- switch (resourceFrom) {
- case CICD:
- String appLib = app.getAppLib();
- fsOperator.mkCleanDirs(appLib);
- fsOperator.upload(app.getDistJar(), appLib);
- break;
- case UPLOAD:
- // 1). upload jar to local uploadDIR.
- File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
- File localUploadJar = new File(localUploadDIR, app.getJar());
- checkOrElseUploadJar(localFS, localJar, localUploadJar,
localUploadDIR);
-
- // 2) copy jar to local $app_home/lib
- File libJar = new File(app.getLocalAppLib(), app.getJar());
- if (!localFS.exists(app.getLocalAppLib())
- || !libJar.exists()
- || !FileUtils.equals(localJar, libJar)) {
- localFS.mkCleanDirs(app.getLocalAppLib());
- localFS.upload(localUploadJar.getAbsolutePath(),
app.getLocalAppLib());
+
+ File userJar;
+ if (resourceFrom == ResourceFrom.CICD) {
+ userJar = getAppDistJar(app);
+ } else if (resourceFrom == ResourceFrom.UPLOAD) {
+ userJar = new File(WebUtils.getAppTempDir(), app.getJar());
+ } else {
+ throw new IllegalArgumentException("ResourceFrom error: " +
resourceFrom);
+ }
+ // 2) copy user jar to localUpload DIR
+ File localUploadJar = new File(localUploadDIR, userJar.getName());
+ checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR);
+
+ // 3) for YARNApplication mode
+ if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
+ // 1) upload user jar to hdfs workspace
+ if (!fsOperator.exists(app.getAppHome())) {
+ fsOperator.mkdirs(app.getAppHome());
+ }
+ String pipelineJar =
app.getAppHome().concat("/").concat(userJar.getName());
+ if (!fsOperator.exists(pipelineJar)) {
+ fsOperator.upload(localUploadJar.getAbsolutePath(),
app.getAppHome());
+ } else {
+ InputStream inputStream =
Files.newInputStream(localUploadJar.toPath());
+ if
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(pipelineJar))) {
+ fsOperator.upload(localUploadJar.getAbsolutePath(),
app.getAppHome());
}
+ }
- // 3) for YARNApplication mode
- if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
- List<File> jars = new ArrayList<>(0);
-
- // 1). user jar
- jars.add(libJar);
-
- // 2). jar dependency
- app.getMavenDependency()
- .getJar()
- .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
-
- // 3). pom dependency
- if (!app.getMavenDependency().getPom().isEmpty()) {
- Set<Artifact> artifacts =
- app.getMavenDependency().getPom().stream()
- .filter(x -> !new File(localUploadDIR,
x.artifactName()).exists())
- .map(
- pom ->
- new Artifact(
- pom.getGroupId(),
- pom.getArtifactId(),
- pom.getVersion(),
- pom.getClassifier(),
- pom.toExclusionString()))
- .collect(Collectors.toSet());
- Set<File> mavenArts =
MavenTool.resolveArtifactsAsJava(artifacts);
- jars.addAll(mavenArts);
- }
+ List<File> dependencyJars = new ArrayList<>(0);
+
+ // 2). jar dependency
+ app.getMavenDependency()
+ .getJar()
+ .forEach(jar -> dependencyJars.add(new File(localUploadDIR, jar)));
+
+ // 3). pom dependency
+ if (!app.getMavenDependency().getPom().isEmpty()) {
+ Set<Artifact> artifacts =
+ app.getMavenDependency().getPom().stream()
+ .filter(x -> !new File(localUploadDIR,
x.artifactName()).exists())
+ .map(
+ pom ->
+ new Artifact(
+ pom.getGroupId(),
+ pom.getArtifactId(),
+ pom.getVersion(),
+ pom.getClassifier(),
+ pom.toExclusionString()))
+ .collect(Collectors.toSet());
+ Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
+ dependencyJars.addAll(mavenArts);
+ }
- // 4). local uploadDIR to hdfs uploadsDIR
- String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
- for (File jarFile : jars) {
- String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
- if (!fsOperator.exists(hdfsUploadPath)) {
- fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
- } else {
- InputStream inputStream =
Files.newInputStream(jarFile.toPath());
- if
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
- fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
- }
- }
+ // 4). local uploadDIR to hdfs uploadsDIR
+ String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
+ for (File jarFile : dependencyJars) {
+ String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
+ if (!fsOperator.exists(hdfsUploadPath)) {
+ fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+ } else {
+ InputStream inputStream = Files.newInputStream(jarFile.toPath());
+ if
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+ fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
}
- // 5). copy jars to $hdfs_app_home/lib
- fsOperator.mkCleanDirs(app.getAppLib());
- jars.forEach(
- jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(),
app.getAppLib()));
}
- break;
- default:
- throw new IllegalArgumentException("ResourceFrom error: " +
resourceFrom);
+ }
+ // 5). copy jars to $hdfs_app_home/lib
+ if (!fsOperator.exists(app.getAppLib())) {
+ fsOperator.mkdirs(app.getAppLib());
+ } else {
+ fsOperator.mkCleanDirs(app.getAppLib());
+ }
+ dependencyJars.forEach(
+ jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(),
app.getAppLib()));
}
}
}
+ private File getAppDistJar(Application app) {
+ if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
+ return new File(app.getDistHome(), app.getModule().concat(".jar"));
+ }
+ if (app.getApplicationType() == ApplicationType.APACHE_FLINK) {
+ return new File(app.getDistHome(), app.getJar());
+ }
+ throw new IllegalArgumentException(
+ "[StreamPark] unsupported ApplicationType of custom code: " +
app.getApplicationType());
+ }
+
/** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) {
+ File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
switch (app.getDevelopmentMode()) {
case CUSTOM_CODE:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
- return String.format("%s/%s", app.getLocalAppLib(),
app.getModule().concat(".jar"));
+ return String.format("%s/%s", localUploadDIR,
app.getModule().concat(".jar"));
case APACHE_FLINK:
- return String.format("%s/%s", app.getLocalAppLib(), app.getJar());
+ return String.format("%s/%s", localUploadDIR, app.getJar());
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 99da52420..0d1d45785 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1487,15 +1487,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
case STREAMPARK_FLINK:
flinkUserJar =
String.format(
- "%s/%s", application.getAppLib(),
application.getModule().concat(".jar"));
+ "%s/%s", application.getAppHome(),
application.getModule().concat(".jar"));
break;
case APACHE_FLINK:
- if (application.getFsOperator().exists(application.getAppLib())) {
- flinkUserJar = String.format("%s/%s", application.getAppLib(),
application.getJar());
- } else {
- // compatible with historical version
- flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
- }
+ flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
break;
default:
throw new IllegalArgumentException(
diff --git
a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
b/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
index e8d0b760a..f5b82362c 100644
---
a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
+++
b/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
@@ -20,4 +20,4 @@ spring:
username: root
password: streampark
driver-class-name: com.mysql.cj.jdbc.Driver
- url:
jdbc:mysql://localhost:3306/streampark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
+ url:
jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 6a2e8c95c..8e3ae8cba 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -233,7 +233,7 @@ trait FlinkClientTrait extends Logger {
submitRequest: SubmitRequest,
jarFile: File): (PackagedProgram, JobGraph) = {
- val pgkBuilder = PackagedProgram.newBuilder
+ val packageProgram = PackagedProgram.newBuilder
.setJarFile(jarFile)
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
@@ -242,14 +242,7 @@ trait FlinkClientTrait extends Logger {
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
.orElse(Lists.newArrayList()): _*)
- // userClassPath...
- submitRequest.executionMode match {
- case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB =>
- pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs)
- case _ =>
- }
-
- val packageProgram = pgkBuilder.build()
+ .build()
val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
index 1e8245228..b6fd04bc6 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
@@ -30,13 +30,11 @@ case class Artifact(
extensions: JavaSet[String] = Collections.emptySet()) {
def filter(artifact: AetherArtifact): Boolean = {
- artifact.getGroupId match {
- case g if g == groupId =>
- artifact.getArtifactId match {
- case "*" => true
- case a => a == artifactId
- }
- case _ => false
+ (artifact.getGroupId, artifact.getArtifactId) match {
+ case ("*", "*") => true
+ case (g, "*") => g == this.groupId
+ case ("*", a) => a == this.artifactId
+ case (g, a) => g == this.groupId && a == this.artifactId
}
}
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index c16e1440e..451c75dda 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -29,7 +29,7 @@ import
org.apache.maven.repository.internal.MavenRepositorySystemUtils
import org.codehaus.plexus.logging.{Logger => PlexusLog}
import org.codehaus.plexus.logging.console.ConsoleLogger
import org.eclipse.aether.{RepositorySystem, RepositorySystemSession}
-import org.eclipse.aether.artifact.DefaultArtifact
+import org.eclipse.aether.artifact.{Artifact, DefaultArtifact}
import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory
import org.eclipse.aether.repository.{LocalRepository, RemoteRepository}
import org.eclipse.aether.resolution.{ArtifactDescriptorRequest,
ArtifactRequest}
@@ -205,23 +205,29 @@ object MavenTool extends Logger {
val (repoSystem, session) = getMavenEndpoint()
- val artifacts = mavenArtifacts.map(
- e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar",
e.version))
-
val exclusions = mavenArtifacts
.flatMap(_.extensions.map(_.split(":")))
.map(a => Artifact(a.head, a.last, null)) ++ excludeArtifact
val remoteRepos = getRemoteRepos()
+ val exclusionAll = exclusions.exists(e => e.groupId == "*" && e.artifactId
== "*")
+
+ val artifacts = mavenArtifacts.map(
+ e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar",
e.version))
+
// read relevant artifact descriptor info and excluding items if necessary.
- val dependencies = artifacts
- .map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos,
null))
- .map(descReq => repoSystem.readArtifactDescriptor(session, descReq))
- .flatMap(_.getDependencies)
- .filter(_.getScope == "compile")
- .filter(dep => !exclusions.exists(_.filter(dep.getArtifact)))
- .map(_.getArtifact)
+ val dependencies =
+ if (exclusionAll) Set.empty[DefaultArtifact]
+ else {
+ artifacts
+ .map(artifact => new ArtifactDescriptorRequest(artifact,
remoteRepos, null))
+ .map(descReq => repoSystem.readArtifactDescriptor(session, descReq))
+ .flatMap(_.getDependencies)
+ .filter(_.getScope == "compile")
+ .filter(dep => !exclusions.exists(_.filter(dep.getArtifact)))
+ .map(_.getArtifact)
+ }
val mergedArtifacts = artifacts ++ dependencies
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index dcf6ad86f..d678c81a0 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -43,28 +43,22 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
*/
@throws[Throwable]
override protected def buildProcess(): SimpleBuildResponse = {
- execStep(1) {
- request.developmentMode match {
- case DevelopmentMode.FLINK_SQL =>
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
- case _ =>
- }
- logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
- }.getOrElse(throw getError.exception)
-
- val mavenJars =
- execStep(2) {
- request.developmentMode match {
- case DevelopmentMode.FLINK_SQL =>
- val mavenArts =
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
- mavenArts.map(_.getAbsolutePath) ++
request.dependencyInfo.extJarLibs
- case _ => Set[String]()
- }
+ if (request.developmentMode == DevelopmentMode.FLINK_SQL) {
+ execStep(1) {
+ HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
+ logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
}.getOrElse(throw getError.exception)
- execStep(3) {
- mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar,
request.yarnProvidedPath))
- }.getOrElse(throw getError.exception)
+ val mavenJars =
+ execStep(2) {
+ val mavenArts =
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
+ mavenArts.map(_.getAbsolutePath) ++ request.dependencyInfo.extJarLibs
+ }.getOrElse(throw getError.exception)
+ execStep(3) {
+ mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar,
request.yarnProvidedPath))
+ }.getOrElse(throw getError.exception)
+ }
SimpleBuildResponse()
}