This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch submit
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit ec2607d0622e68886b31f2befb237b0efa62ea24
Author: benjobs <[email protected]>
AuthorDate: Thu Nov 16 23:33:13 2023 +0800

    [Bug] flink job submit bug fixed.
---
 .../console/core/entity/Application.java           |  75 +-----------
 .../core/service/impl/AppBuildPipeServiceImpl.java | 128 +++++++++++----------
 2 files changed, 72 insertions(+), 131 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index d963c5477..3c0f2436e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -27,7 +27,6 @@ import 
org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.bean.MavenDependency;
 import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -393,6 +392,11 @@ public class Application implements Serializable {
     return path;
   }
 
+  @JsonIgnore
+  public String getDistJar() {
+    return getDistHome() + "/" + getJar();
+  }
+
   @JsonIgnore
   public String getLocalAppHome() {
     String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(), 
id.toString());
@@ -506,75 +510,6 @@ public class Application implements Serializable {
     return false;
   }
 
-  /**
-   * Parameter comparison, mainly to compare whether the parameters related to 
Flink runtime have
-   * changed
-   */
-  public boolean eqJobParam(Application other) {
-    // 1) Resolve Order has it changed
-    // 2) flink Version has it changed
-    // 3) Execution Mode has it changed
-    // 4) Parallelism has it changed
-    // 5) Task Slots has it changed
-    // 6) Options has it changed
-    // 7) properties has it changed
-    // 8) Program Args has it changed
-    // 9) Flink Version  has it changed
-
-    if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) {
-      return false;
-    }
-
-    if (!ObjectUtils.safeEquals(this.getResolveOrder(), 
other.getResolveOrder())
-        || !ObjectUtils.safeEquals(this.getExecutionMode(), 
other.getExecutionMode())
-        || !ObjectUtils.safeEquals(this.getK8sRestExposedType(), 
other.getK8sRestExposedType())) {
-      return false;
-    }
-
-    if (this.getOptions() != null) {
-      if (other.getOptions() != null) {
-        if (!this.getOptions().trim().equals(other.getOptions().trim())) {
-          Map<String, Object> optMap = this.getOptionMap();
-          Map<String, Object> otherMap = other.getOptionMap();
-          if (optMap.size() != otherMap.size()) {
-            return false;
-          }
-          for (Map.Entry<String, Object> entry : optMap.entrySet()) {
-            if (!entry.getValue().equals(otherMap.get(entry.getKey()))) {
-              return false;
-            }
-          }
-        }
-      } else {
-        return false;
-      }
-    } else if (other.getOptions() != null) {
-      return false;
-    }
-
-    if (this.getDynamicProperties() != null) {
-      if (other.getDynamicProperties() != null) {
-        if 
(!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim()))
 {
-          return false;
-        }
-      } else {
-        return false;
-      }
-    } else if (other.getDynamicProperties() != null) {
-      return false;
-    }
-
-    if (this.getArgs() != null) {
-      if (other.getArgs() != null) {
-        return this.getArgs().trim().equals(other.getArgs().trim());
-      } else {
-        return false;
-      }
-    } else {
-      return other.getArgs() == null;
-    }
-  }
-
   @JsonIgnore
   public StorageType getStorageType() {
     return getStorageType(getExecutionMode());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 33afcf881..dce1ab3d5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.streampark.console.core.enums.CandidateType;
 import org.apache.streampark.console.core.enums.NoticeType;
 import org.apache.streampark.console.core.enums.OptionState;
 import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.enums.ResourceFrom;
 import 
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
 import org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ApplicationBackUpService;
@@ -396,73 +397,78 @@ public class AppBuildPipeServiceImpl
     if (app.isCustomCodeJob()) {
       // customCode upload jar to appHome...
       FsOperator fsOperator = app.getFsOperator();
-
-      if (app.isUploadJob()) {
-        // 1). upload jar to local uploadDIR.
-        File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
-        File localUploadJar = new File(localUploadDIR, app.getJar());
-        checkOrElseUploadJar(localFS, localJar, localUploadJar, 
localUploadDIR);
-
-        // 2) copy jar to local $app_home/lib
-        File libJar = new File(app.getLocalAppLib(), app.getJar());
-        if (!localFS.exists(app.getLocalAppLib())
-            || !libJar.exists()
-            || !FileUtils.equals(localJar, libJar)) {
-          localFS.mkCleanDirs(app.getLocalAppLib());
-          localFS.upload(localUploadJar.getAbsolutePath(), 
app.getLocalAppLib());
-        }
-
-        // 3) for YARNApplication mode
-        if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
-          List<File> jars = new ArrayList<>(0);
-
-          // 1). user jar
-          jars.add(libJar);
-
-          // 2). jar dependency
-          app.getMavenDependency().getJar().forEach(jar -> jars.add(new 
File(localUploadDIR, jar)));
-
-          // 3). pom dependency
-          if (!app.getMavenDependency().getPom().isEmpty()) {
-            Set<Artifact> artifacts =
-                app.getMavenDependency().getPom().stream()
-                    .filter(x -> !new File(localUploadDIR, 
x.artifactName()).exists())
-                    .map(
-                        pom ->
-                            new Artifact(
-                                pom.getGroupId(),
-                                pom.getArtifactId(),
-                                pom.getVersion(),
-                                pom.getClassifier(),
-                                pom.toExclusionString()))
-                    .collect(Collectors.toSet());
-            Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
-            jars.addAll(mavenArts);
+      ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
+      switch (resourceFrom) {
+        case CICD:
+          String appLib = app.getAppLib();
+          fsOperator.mkCleanDirs(appLib);
+          fsOperator.upload(app.getDistJar(), appLib);
+          break;
+        case UPLOAD:
+          // 1). upload jar to local uploadDIR.
+          File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
+          File localUploadJar = new File(localUploadDIR, app.getJar());
+          checkOrElseUploadJar(localFS, localJar, localUploadJar, 
localUploadDIR);
+
+          // 2) copy jar to local $app_home/lib
+          File libJar = new File(app.getLocalAppLib(), app.getJar());
+          if (!localFS.exists(app.getLocalAppLib())
+              || !libJar.exists()
+              || !FileUtils.equals(localJar, libJar)) {
+            localFS.mkCleanDirs(app.getLocalAppLib());
+            localFS.upload(localUploadJar.getAbsolutePath(), 
app.getLocalAppLib());
           }
 
-          // 4). local uploadDIR to hdfs uploadsDIR
-          String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
-          for (File jarFile : jars) {
-            String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
-            if (!fsOperator.exists(hdfsUploadPath)) {
-              fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
-            } else {
-              InputStream inputStream = Files.newInputStream(jarFile.toPath());
-              if 
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+          // 3) for YARNApplication mode
+          if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
+            List<File> jars = new ArrayList<>(0);
+
+            // 1). user jar
+            jars.add(libJar);
+
+            // 2). jar dependency
+            app.getMavenDependency()
+                .getJar()
+                .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
+
+            // 3). pom dependency
+            if (!app.getMavenDependency().getPom().isEmpty()) {
+              Set<Artifact> artifacts =
+                  app.getMavenDependency().getPom().stream()
+                      .filter(x -> !new File(localUploadDIR, 
x.artifactName()).exists())
+                      .map(
+                          pom ->
+                              new Artifact(
+                                  pom.getGroupId(),
+                                  pom.getArtifactId(),
+                                  pom.getVersion(),
+                                  pom.getClassifier(),
+                                  pom.toExclusionString()))
+                      .collect(Collectors.toSet());
+              Set<File> mavenArts = 
MavenTool.resolveArtifactsAsJava(artifacts);
+              jars.addAll(mavenArts);
+            }
+
+            // 4). local uploadDIR to hdfs uploadsDIR
+            String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
+            for (File jarFile : jars) {
+              String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
+              if (!fsOperator.exists(hdfsUploadPath)) {
                 fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+              } else {
+                InputStream inputStream = 
Files.newInputStream(jarFile.toPath());
+                if 
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+                  fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+                }
               }
             }
+            // 5). copy jars to $hdfs_app_home/lib
+            fsOperator.mkCleanDirs(app.getAppLib());
+            jars.forEach(
+                jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), 
app.getAppLib()));
           }
-
-          // 5). copy jars to $hdfs_app_home/lib
-          fsOperator.mkCleanDirs(app.getAppLib());
-          jars.forEach(
-              jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), 
app.getAppLib()));
-        }
-      } else {
-        String appHome = app.getAppHome();
-        fsOperator.mkCleanDirs(appHome);
-        fsOperator.upload(app.getDistHome(), appHome);
+        default:
+          throw new IllegalArgumentException("");
       }
     }
   }

Reply via email to