This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch perjob in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit bde59a77fe859c9d3fcfbae5e835c425cda4d74c Author: benjobs <[email protected]> AuthorDate: Fri Nov 17 15:18:24 2023 +0800 [Bug] submit job bug fixed --- .../console/core/entity/Application.java | 11 +- .../core/service/impl/AppBuildPipeServiceImpl.java | 133 +++++++++++---------- .../src/main/resources/application-mysql.yml | 4 +- .../flink/client/trait/FlinkClientTrait.scala | 11 +- .../streampark/flink/packer/maven/Artifact.scala | 12 +- .../streampark/flink/packer/maven/MavenTool.scala | 28 +++-- .../impl/FlinkYarnApplicationBuildPipeline.scala | 32 ++--- 7 files changed, 112 insertions(+), 119 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 3c0f2436e..1bbee1382 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -388,26 +388,21 @@ public class Application implements Serializable { public String getDistHome() { String path = String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), projectId.toString(), getModule()); - log.info("local distHome:{}", path); + log.info("local distHome: {}", path); return path; } - @JsonIgnore - public String getDistJar() { - return getDistHome() + "/" + getJar(); - } - @JsonIgnore public String getLocalAppHome() { String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(), id.toString()); - log.info("local appHome:{}", path); + log.info("local appHome: {}", path); return path; } @JsonIgnore public String getRemoteAppHome() { String path = String.format("%s/%s", Workspace.remote().APP_WORKSPACE(), id.toString()); - log.info("remote appHome:{}", path); + log.info("remote appHome: {}", path); return path; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 1ee33cf01..cc733e84e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.FileUtils; @@ -398,91 +399,97 @@ public class AppBuildPipeServiceImpl // customCode upload jar to appHome... FsOperator fsOperator = app.getFsOperator(); ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom()); + File localUploadJar = new File(localUploadDIR, app.getJar()); switch (resourceFrom) { case CICD: - String appLib = app.getAppLib(); - fsOperator.mkCleanDirs(appLib); - fsOperator.upload(app.getDistJar(), appLib); + // upload jar to local uploadDIR. + File userJar = getAppDistJar(app); + checkOrElseUploadJar(localFS, userJar, localUploadJar, localUploadDIR); break; case UPLOAD: // 1). upload jar to local uploadDIR. File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); - File localUploadJar = new File(localUploadDIR, app.getJar()); checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR); + break; + default: + throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom); + } - // 2) copy jar to local $app_home/lib - File libJar = new File(app.getLocalAppLib(), app.getJar()); - if (!localFS.exists(app.getLocalAppLib()) - || !libJar.exists() - || !FileUtils.equals(localJar, libJar)) { - localFS.mkCleanDirs(app.getLocalAppLib()); - localFS.upload(localUploadJar.getAbsolutePath(), app.getLocalAppLib()); - } - - // 3) for YARNApplication mode - if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { - List<File> jars = new ArrayList<>(0); - - // 1). user jar - jars.add(libJar); - - // 2). jar dependency - app.getMavenDependency() - .getJar() - .forEach(jar -> jars.add(new File(localUploadDIR, jar))); - - // 3). pom dependency - if (!app.getMavenDependency().getPom().isEmpty()) { - Set<Artifact> artifacts = - app.getMavenDependency().getPom().stream() - .filter(x -> !new File(localUploadDIR, x.artifactName()).exists()) - .map( - pom -> - new Artifact( - pom.getGroupId(), - pom.getArtifactId(), - pom.getVersion(), - pom.getClassifier(), - pom.toExclusionString())) - .collect(Collectors.toSet()); - Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts); - jars.addAll(mavenArts); - } + // 3) for YARNApplication mode + if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { + + List<File> hdfsUploadJars = new ArrayList<>(0); + + // 1). user jar + hdfsUploadJars.add(localUploadJar); + + // 2). jar dependency + app.getMavenDependency() + .getJar() + .forEach(jar -> hdfsUploadJars.add(new File(localUploadDIR, jar))); + + // 3). pom dependency + if (!app.getMavenDependency().getPom().isEmpty()) { + Set<Artifact> artifacts = + app.getMavenDependency().getPom().stream() + .filter(x -> !new File(localUploadDIR, x.artifactName()).exists()) + .map( + pom -> + new Artifact( + pom.getGroupId(), + pom.getArtifactId(), + pom.getVersion(), + pom.getClassifier(), + pom.toExclusionString())) + .collect(Collectors.toSet()); + Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts); + hdfsUploadJars.addAll(mavenArts); + } - // 4). local uploadDIR to hdfs uploadsDIR - String hdfsUploadDIR = Workspace.remote().APP_UPLOADS(); - for (File jarFile : jars) { - String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName(); - if (!fsOperator.exists(hdfsUploadPath)) { - fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); - } else { - InputStream inputStream = Files.newInputStream(jarFile.toPath()); - if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) { - fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); - } - } + // 4). local uploadDIR to hdfs uploadsDIR + String hdfsUploadDIR = Workspace.remote().APP_UPLOADS(); + for (File jarFile : hdfsUploadJars) { + String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName(); + if (!fsOperator.exists(hdfsUploadPath)) { + fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); + } else { + InputStream inputStream = Files.newInputStream(jarFile.toPath()); + if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) { + fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); } - // 5). copy jars to $hdfs_app_home/lib - fsOperator.mkCleanDirs(app.getAppLib()); - jars.forEach( - jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib())); } - break; - default: - throw new IllegalArgumentException("ResourceFrom error: " + resourceFrom); + } + // 5). copy jars to $hdfs_app_home/lib + fsOperator.mkCleanDirs(app.getAppLib()); + hdfsUploadJars.forEach( + jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib())); } } } + private File getAppDistJar(Application app) { + File userJar; + if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) { + userJar = new File(app.getDistHome(), app.getModule().concat(".jar")); + } else if (app.getApplicationType() == ApplicationType.APACHE_FLINK) { + userJar = new File(app.getDistHome(), app.getJar()); + } else { + throw new IllegalArgumentException( + "[StreamPark] unsupported ApplicationType of custom code: " + app.getApplicationType()); + } + return userJar; + } + /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */ private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) { + File localUploadDIR = new File(Workspace.local().APP_UPLOADS()); switch (app.getDevelopmentMode()) { case CUSTOM_CODE: switch (app.getApplicationType()) { case STREAMPARK_FLINK: - return String.format("%s/%s", app.getLocalAppLib(), app.getModule().concat(".jar")); + return String.format("%s/%s", localUploadDIR, app.getModule().concat(".jar")); case APACHE_FLINK: - return String.format("%s/%s", app.getLocalAppLib(), app.getJar()); + return String.format("%s/%s", localUploadDIR, app.getJar()); default: throw new IllegalArgumentException( "[StreamPark] unsupported ApplicationType of custom code: " diff --git a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml b/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml index e8d0b760a..74f2a97b9 100644 --- a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml +++ b/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml @@ -18,6 +18,6 @@ spring: datasource: username: root - password: streampark + password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://localhost:3306/streampark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8 + url: jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8 diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 6a2e8c95c..8e3ae8cba 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -233,7 +233,7 @@ trait FlinkClientTrait extends Logger { submitRequest: SubmitRequest, jarFile: File): (PackagedProgram, JobGraph) = { - val pgkBuilder = PackagedProgram.newBuilder + val packageProgram = PackagedProgram.newBuilder .setJarFile(jarFile) .setEntryPointClassName( flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()) @@ -242,14 +242,7 @@ trait FlinkClientTrait extends Logger { flinkConfig .getOptional(ApplicationConfiguration.APPLICATION_ARGS) .orElse(Lists.newArrayList()): _*) - // userClassPath... - submitRequest.executionMode match { - case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB => - pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs) - case _ => - } - - val packageProgram = pgkBuilder.build() + .build() val jobGraph = PackagedProgramUtils.createJobGraph( packageProgram, diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala index 1e8245228..b6fd04bc6 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala @@ -30,13 +30,11 @@ case class Artifact( extensions: JavaSet[String] = Collections.emptySet()) { def filter(artifact: AetherArtifact): Boolean = { - artifact.getGroupId match { - case g if g == groupId => - artifact.getArtifactId match { - case "*" => true - case a => a == artifactId - } - case _ => false + (artifact.getGroupId, artifact.getArtifactId) match { + case ("*", "*") => true + case (g, "*") => g == this.groupId + case ("*", a) => a == this.artifactId + case (g, a) => g == this.groupId && a == this.artifactId } } } diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala index c16e1440e..451c75dda 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala @@ -29,7 +29,7 @@ import org.apache.maven.repository.internal.MavenRepositorySystemUtils import org.codehaus.plexus.logging.{Logger => PlexusLog} import org.codehaus.plexus.logging.console.ConsoleLogger import org.eclipse.aether.{RepositorySystem, RepositorySystemSession} -import org.eclipse.aether.artifact.DefaultArtifact +import org.eclipse.aether.artifact.{Artifact, DefaultArtifact} import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory import org.eclipse.aether.repository.{LocalRepository, RemoteRepository} import org.eclipse.aether.resolution.{ArtifactDescriptorRequest, ArtifactRequest} @@ -205,23 +205,29 @@ object MavenTool extends Logger { val (repoSystem, session) = getMavenEndpoint() - val artifacts = mavenArtifacts.map( - e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", e.version)) - val exclusions = mavenArtifacts .flatMap(_.extensions.map(_.split(":"))) .map(a => Artifact(a.head, a.last, null)) ++ excludeArtifact val remoteRepos = getRemoteRepos() + val exclusionAll = exclusions.exists(e => e.groupId == "*" && e.artifactId == "*") + + val artifacts = mavenArtifacts.map( + e => new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", e.version)) + // read relevant artifact descriptor info and excluding items if necessary. - val dependencies = artifacts - .map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, null)) - .map(descReq => repoSystem.readArtifactDescriptor(session, descReq)) - .flatMap(_.getDependencies) - .filter(_.getScope == "compile") - .filter(dep => !exclusions.exists(_.filter(dep.getArtifact))) - .map(_.getArtifact) + val dependencies = + if (exclusionAll) Set.empty[DefaultArtifact] + else { + artifacts + .map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, null)) + .map(descReq => repoSystem.readArtifactDescriptor(session, descReq)) + .flatMap(_.getDependencies) + .filter(_.getScope == "compile") + .filter(dep => !exclusions.exists(_.filter(dep.getArtifact))) + .map(_.getArtifact) + } val mergedArtifacts = artifacts ++ dependencies diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala index dcf6ad86f..d678c81a0 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala @@ -43,28 +43,22 @@ class FlinkYarnApplicationBuildPipeline(request: FlinkYarnApplicationBuildReques */ @throws[Throwable] override protected def buildProcess(): SimpleBuildResponse = { - execStep(1) { - request.developmentMode match { - case DevelopmentMode.FLINK_SQL => HdfsOperator.mkCleanDirs(request.yarnProvidedPath) - case _ => - } - logInfo(s"recreate building workspace: ${request.yarnProvidedPath}") - }.getOrElse(throw getError.exception) - - val mavenJars = - execStep(2) { - request.developmentMode match { - case DevelopmentMode.FLINK_SQL => - val mavenArts = MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts) - mavenArts.map(_.getAbsolutePath) ++ request.dependencyInfo.extJarLibs - case _ => Set[String]() - } + if (request.developmentMode == DevelopmentMode.FLINK_SQL) { + execStep(1) { + HdfsOperator.mkCleanDirs(request.yarnProvidedPath) + logInfo(s"recreate building workspace: ${request.yarnProvidedPath}") }.getOrElse(throw getError.exception) - execStep(3) { - mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)) - }.getOrElse(throw getError.exception) + val mavenJars = + execStep(2) { + val mavenArts = MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts) + mavenArts.map(_.getAbsolutePath) ++ request.dependencyInfo.extJarLibs + }.getOrElse(throw getError.exception) + execStep(3) { + mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)) + }.getOrElse(throw getError.exception) + } SimpleBuildResponse() }
