This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch submit in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit ec2607d0622e68886b31f2befb237b0efa62ea24 Author: benjobs <[email protected]> AuthorDate: Thu Nov 16 23:33:13 2023 +0800 [Bug] flink job submit bug fixed. --- .../console/core/entity/Application.java | 75 +----------- .../core/service/impl/AppBuildPipeServiceImpl.java | 128 +++++++++++---------- 2 files changed, 72 insertions(+), 131 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index d963c5477..3c0f2436e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -27,7 +27,6 @@ import org.apache.streampark.common.enums.FlinkK8sRestExposedType; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.console.base.util.JacksonUtils; -import org.apache.streampark.console.base.util.ObjectUtils; import org.apache.streampark.console.core.bean.AppControl; import org.apache.streampark.console.core.bean.MavenDependency; import org.apache.streampark.console.core.enums.FlinkAppState; @@ -393,6 +392,11 @@ public class Application implements Serializable { return path; } + @JsonIgnore + public String getDistJar() { + return getDistHome() + "/" + getJar(); + } + @JsonIgnore public String getLocalAppHome() { String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(), id.toString()); @@ -506,75 +510,6 @@ public class Application implements Serializable { return false; } - /** - * Parameter comparison, mainly to compare whether the parameters related to Flink runtime have - * changed - */ - public boolean eqJobParam(Application other) { - // 1) Resolve Order has it changed - // 2) flink Version has it changed - // 3) Execution Mode has it changed - // 4) Parallelism has it changed - // 5) Task Slots has it changed - // 6) Options has it changed - // 7) properties has it changed - // 8) Program Args has it changed - // 9) Flink Version has it changed - - if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) { - return false; - } - - if (!ObjectUtils.safeEquals(this.getResolveOrder(), other.getResolveOrder()) - || !ObjectUtils.safeEquals(this.getExecutionMode(), other.getExecutionMode()) - || !ObjectUtils.safeEquals(this.getK8sRestExposedType(), other.getK8sRestExposedType())) { - return false; - } - - if (this.getOptions() != null) { - if (other.getOptions() != null) { - if (!this.getOptions().trim().equals(other.getOptions().trim())) { - Map<String, Object> optMap = this.getOptionMap(); - Map<String, Object> otherMap = other.getOptionMap(); - if (optMap.size() != otherMap.size()) { - return false; - } - for (Map.Entry<String, Object> entry : optMap.entrySet()) { - if (!entry.getValue().equals(otherMap.get(entry.getKey()))) { - return false; - } - } - } - } else { - return false; - } - } else if (other.getOptions() != null) { - return false; - } - - if (this.getDynamicProperties() != null) { - if (other.getDynamicProperties() != null) { - if (!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim())) { - return false; - } - } else { - return false; - } - } else if (other.getDynamicProperties() != null) { - return false; - } - - if (this.getArgs() != null) { - if (other.getArgs() != null) { - return this.getArgs().trim().equals(other.getArgs().trim()); - } else { - return false; - } - } else { - return other.getArgs() == null; - } - } - @JsonIgnore public StorageType getStorageType() { return getStorageType(getExecutionMode()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 33afcf881..dce1ab3d5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.streampark.console.core.enums.CandidateType; import org.apache.streampark.console.core.enums.NoticeType; import org.apache.streampark.console.core.enums.OptionState; import org.apache.streampark.console.core.enums.ReleaseState; +import org.apache.streampark.console.core.enums.ResourceFrom; import org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper; import org.apache.streampark.console.core.service.AppBuildPipeService; import org.apache.streampark.console.core.service.ApplicationBackUpService; @@ -396,73 +397,78 @@ public class AppBuildPipeServiceImpl if (app.isCustomCodeJob()) { // customCode upload jar to appHome... FsOperator fsOperator = app.getFsOperator(); - - if (app.isUploadJob()) { - // 1). upload jar to local uploadDIR. - File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); - File localUploadJar = new File(localUploadDIR, app.getJar()); - checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR); - - // 2) copy jar to local $app_home/lib - File libJar = new File(app.getLocalAppLib(), app.getJar()); - if (!localFS.exists(app.getLocalAppLib()) - || !libJar.exists() - || !FileUtils.equals(localJar, libJar)) { - localFS.mkCleanDirs(app.getLocalAppLib()); - localFS.upload(localUploadJar.getAbsolutePath(), app.getLocalAppLib()); - } - - // 3) for YARNApplication mode - if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { - List<File> jars = new ArrayList<>(0); - - // 1). user jar - jars.add(libJar); - - // 2). jar dependency - app.getMavenDependency().getJar().forEach(jar -> jars.add(new File(localUploadDIR, jar))); - - // 3). pom dependency - if (!app.getMavenDependency().getPom().isEmpty()) { - Set<Artifact> artifacts = - app.getMavenDependency().getPom().stream() - .filter(x -> !new File(localUploadDIR, x.artifactName()).exists()) - .map( - pom -> - new Artifact( - pom.getGroupId(), - pom.getArtifactId(), - pom.getVersion(), - pom.getClassifier(), - pom.toExclusionString())) - .collect(Collectors.toSet()); - Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts); - jars.addAll(mavenArts); + ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom()); + switch (resourceFrom) { + case CICD: + String appLib = app.getAppLib(); + fsOperator.mkCleanDirs(appLib); + fsOperator.upload(app.getDistJar(), appLib); + break; + case UPLOAD: + // 1). upload jar to local uploadDIR. + File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); + File localUploadJar = new File(localUploadDIR, app.getJar()); + checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR); + + // 2) copy jar to local $app_home/lib + File libJar = new File(app.getLocalAppLib(), app.getJar()); + if (!localFS.exists(app.getLocalAppLib()) + || !libJar.exists() + || !FileUtils.equals(localJar, libJar)) { + localFS.mkCleanDirs(app.getLocalAppLib()); + localFS.upload(localUploadJar.getAbsolutePath(), app.getLocalAppLib()); } - // 4). local uploadDIR to hdfs uploadsDIR - String hdfsUploadDIR = Workspace.remote().APP_UPLOADS(); - for (File jarFile : jars) { - String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName(); - if (!fsOperator.exists(hdfsUploadPath)) { - fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); - } else { - InputStream inputStream = Files.newInputStream(jarFile.toPath()); - if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) { + // 3) for YARNApplication mode + if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { + List<File> jars = new ArrayList<>(0); + + // 1). user jar + jars.add(libJar); + + // 2). jar dependency + app.getMavenDependency() + .getJar() + .forEach(jar -> jars.add(new File(localUploadDIR, jar))); + + // 3). pom dependency + if (!app.getMavenDependency().getPom().isEmpty()) { + Set<Artifact> artifacts = + app.getMavenDependency().getPom().stream() + .filter(x -> !new File(localUploadDIR, x.artifactName()).exists()) + .map( + pom -> + new Artifact( + pom.getGroupId(), + pom.getArtifactId(), + pom.getVersion(), + pom.getClassifier(), + pom.toExclusionString())) + .collect(Collectors.toSet()); + Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts); + jars.addAll(mavenArts); + } + + // 4). local uploadDIR to hdfs uploadsDIR + String hdfsUploadDIR = Workspace.remote().APP_UPLOADS(); + for (File jarFile : jars) { + String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName(); + if (!fsOperator.exists(hdfsUploadPath)) { fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); + } else { + InputStream inputStream = Files.newInputStream(jarFile.toPath()); + if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) { + fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); + } } } + // 5). copy jars to $hdfs_app_home/lib + fsOperator.mkCleanDirs(app.getAppLib()); + jars.forEach( + jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib())); } - - // 5). copy jars to $hdfs_app_home/lib - fsOperator.mkCleanDirs(app.getAppLib()); - jars.forEach( - jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib())); - } - } else { - String appHome = app.getAppHome(); - fsOperator.mkCleanDirs(appHome); - fsOperator.upload(app.getDistHome(), appHome); + default: + throw new IllegalArgumentException(""); } } }
