This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new 4e5e26707 [Bug] flink job (from project build) start bug fixed. (#3346)
4e5e26707 is described below
commit 4e5e267072fd0ea3a29ca194c319f9158dbfe78f
Author: benjobs <[email protected]>
AuthorDate: Thu Nov 16 23:57:26 2023 +0800
[Bug] flink job (from project build) start bug fixed. (#3346)
* [Bug] flink job submit bug fixed.
Co-authored-by: benjobs <[email protected]>
---
.../console/core/entity/Application.java | 75 +-----------
.../core/service/impl/AppBuildPipeServiceImpl.java | 129 +++++++++++----------
2 files changed, 73 insertions(+), 131 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index d963c5477..3c0f2436e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -27,7 +27,6 @@ import
org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.MavenDependency;
import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -393,6 +392,11 @@ public class Application implements Serializable {
return path;
}
+ @JsonIgnore
+ public String getDistJar() {
+ return getDistHome() + "/" + getJar();
+ }
+
@JsonIgnore
public String getLocalAppHome() {
String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(),
id.toString());
@@ -506,75 +510,6 @@ public class Application implements Serializable {
return false;
}
- /**
- * Parameter comparison, mainly to compare whether the parameters related to
Flink runtime have
- * changed
- */
- public boolean eqJobParam(Application other) {
- // 1) Resolve Order has it changed
- // 2) flink Version has it changed
- // 3) Execution Mode has it changed
- // 4) Parallelism has it changed
- // 5) Task Slots has it changed
- // 6) Options has it changed
- // 7) properties has it changed
- // 8) Program Args has it changed
- // 9) Flink Version has it changed
-
- if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) {
- return false;
- }
-
- if (!ObjectUtils.safeEquals(this.getResolveOrder(),
other.getResolveOrder())
- || !ObjectUtils.safeEquals(this.getExecutionMode(),
other.getExecutionMode())
- || !ObjectUtils.safeEquals(this.getK8sRestExposedType(),
other.getK8sRestExposedType())) {
- return false;
- }
-
- if (this.getOptions() != null) {
- if (other.getOptions() != null) {
- if (!this.getOptions().trim().equals(other.getOptions().trim())) {
- Map<String, Object> optMap = this.getOptionMap();
- Map<String, Object> otherMap = other.getOptionMap();
- if (optMap.size() != otherMap.size()) {
- return false;
- }
- for (Map.Entry<String, Object> entry : optMap.entrySet()) {
- if (!entry.getValue().equals(otherMap.get(entry.getKey()))) {
- return false;
- }
- }
- }
- } else {
- return false;
- }
- } else if (other.getOptions() != null) {
- return false;
- }
-
- if (this.getDynamicProperties() != null) {
- if (other.getDynamicProperties() != null) {
- if
(!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim()))
{
- return false;
- }
- } else {
- return false;
- }
- } else if (other.getDynamicProperties() != null) {
- return false;
- }
-
- if (this.getArgs() != null) {
- if (other.getArgs() != null) {
- return this.getArgs().trim().equals(other.getArgs().trim());
- } else {
- return false;
- }
- } else {
- return other.getArgs() == null;
- }
- }
-
@JsonIgnore
public StorageType getStorageType() {
return getStorageType(getExecutionMode());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 33afcf881..1ee33cf01 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.streampark.console.core.enums.CandidateType;
import org.apache.streampark.console.core.enums.NoticeType;
import org.apache.streampark.console.core.enums.OptionState;
import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.enums.ResourceFrom;
import
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
@@ -396,73 +397,79 @@ public class AppBuildPipeServiceImpl
if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
-
- if (app.isUploadJob()) {
- // 1). upload jar to local uploadDIR.
- File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
- File localUploadJar = new File(localUploadDIR, app.getJar());
- checkOrElseUploadJar(localFS, localJar, localUploadJar,
localUploadDIR);
-
- // 2) copy jar to local $app_home/lib
- File libJar = new File(app.getLocalAppLib(), app.getJar());
- if (!localFS.exists(app.getLocalAppLib())
- || !libJar.exists()
- || !FileUtils.equals(localJar, libJar)) {
- localFS.mkCleanDirs(app.getLocalAppLib());
- localFS.upload(localUploadJar.getAbsolutePath(),
app.getLocalAppLib());
- }
-
- // 3) for YARNApplication mode
- if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
- List<File> jars = new ArrayList<>(0);
-
- // 1). user jar
- jars.add(libJar);
-
- // 2). jar dependency
- app.getMavenDependency().getJar().forEach(jar -> jars.add(new
File(localUploadDIR, jar)));
-
- // 3). pom dependency
- if (!app.getMavenDependency().getPom().isEmpty()) {
- Set<Artifact> artifacts =
- app.getMavenDependency().getPom().stream()
- .filter(x -> !new File(localUploadDIR,
x.artifactName()).exists())
- .map(
- pom ->
- new Artifact(
- pom.getGroupId(),
- pom.getArtifactId(),
- pom.getVersion(),
- pom.getClassifier(),
- pom.toExclusionString()))
- .collect(Collectors.toSet());
- Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
- jars.addAll(mavenArts);
+ ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
+ switch (resourceFrom) {
+ case CICD:
+ String appLib = app.getAppLib();
+ fsOperator.mkCleanDirs(appLib);
+ fsOperator.upload(app.getDistJar(), appLib);
+ break;
+ case UPLOAD:
+ // 1). upload jar to local uploadDIR.
+ File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
+ File localUploadJar = new File(localUploadDIR, app.getJar());
+ checkOrElseUploadJar(localFS, localJar, localUploadJar,
localUploadDIR);
+
+ // 2) copy jar to local $app_home/lib
+ File libJar = new File(app.getLocalAppLib(), app.getJar());
+ if (!localFS.exists(app.getLocalAppLib())
+ || !libJar.exists()
+ || !FileUtils.equals(localJar, libJar)) {
+ localFS.mkCleanDirs(app.getLocalAppLib());
+ localFS.upload(localUploadJar.getAbsolutePath(),
app.getLocalAppLib());
}
- // 4). local uploadDIR to hdfs uploadsDIR
- String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
- for (File jarFile : jars) {
- String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
- if (!fsOperator.exists(hdfsUploadPath)) {
- fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
- } else {
- InputStream inputStream = Files.newInputStream(jarFile.toPath());
- if
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+ // 3) for YARNApplication mode
+ if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
+ List<File> jars = new ArrayList<>(0);
+
+ // 1). user jar
+ jars.add(libJar);
+
+ // 2). jar dependency
+ app.getMavenDependency()
+ .getJar()
+ .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
+
+ // 3). pom dependency
+ if (!app.getMavenDependency().getPom().isEmpty()) {
+ Set<Artifact> artifacts =
+ app.getMavenDependency().getPom().stream()
+ .filter(x -> !new File(localUploadDIR,
x.artifactName()).exists())
+ .map(
+ pom ->
+ new Artifact(
+ pom.getGroupId(),
+ pom.getArtifactId(),
+ pom.getVersion(),
+ pom.getClassifier(),
+ pom.toExclusionString()))
+ .collect(Collectors.toSet());
+ Set<File> mavenArts =
MavenTool.resolveArtifactsAsJava(artifacts);
+ jars.addAll(mavenArts);
+ }
+
+ // 4). local uploadDIR to hdfs uploadsDIR
+ String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
+ for (File jarFile : jars) {
+ String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
+ if (!fsOperator.exists(hdfsUploadPath)) {
fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+ } else {
+ InputStream inputStream =
Files.newInputStream(jarFile.toPath());
+ if
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+ fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+ }
}
}
+ // 5). copy jars to $hdfs_app_home/lib
+ fsOperator.mkCleanDirs(app.getAppLib());
+ jars.forEach(
+ jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(),
app.getAppLib()));
}
-
- // 5). copy jars to $hdfs_app_home/lib
- fsOperator.mkCleanDirs(app.getAppLib());
- jars.forEach(
- jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(),
app.getAppLib()));
- }
- } else {
- String appHome = app.getAppHome();
- fsOperator.mkCleanDirs(appHome);
- fsOperator.upload(app.getDistHome(), appHome);
+ break;
+ default:
+ throw new IllegalArgumentException("ResourceFrom error: " +
resourceFrom);
}
}
}