This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch resource
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/resource by this push:
     new 528540686 application minor improvament
528540686 is described below

commit 528540686cda9b8412e9eee2cf31f3f756816e26
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 22 12:41:29 2023 +0800

    application minor improvament
---
 .../streampark/console/base/util/CommonUtils.java  |  4 +-
 .../streampark/console/base/util/ObjectUtils.java  | 12 ++--
 .../console/core/entity/Application.java           | 83 ----------------------
 .../streampark/console/core/entity/FlinkSql.java   |  6 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |  5 +-
 .../core/service/impl/ApplicationServiceImpl.java  | 70 +++++++++---------
 6 files changed, 54 insertions(+), 126 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
index 5b80ba623..f6bc11983 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
@@ -199,7 +199,7 @@ public final class CommonUtils implements Serializable {
     if (iterator != null) {
       while (iterator.hasNext()) {
         Object candidate = iterator.next();
-        if (ObjectUtils.safeEquals(candidate, element)) {
+        if (ObjectUtils.equals(candidate, element)) {
           return true;
         }
       }
@@ -218,7 +218,7 @@ public final class CommonUtils implements Serializable {
     if (enumeration != null) {
       while (enumeration.hasMoreElements()) {
         Object candidate = enumeration.nextElement();
-        if (ObjectUtils.safeEquals(candidate, element)) {
+        if (ObjectUtils.equals(candidate, element)) {
           return true;
         }
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
index f375dad52..c16d2a42f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
@@ -104,7 +104,7 @@ public final class ObjectUtils {
       return false;
     }
     for (Object arrayEle : array) {
-      if (safeEquals(arrayEle, element)) {
+      if (equals(arrayEle, element)) {
         return true;
       }
     }
@@ -238,7 +238,7 @@ public final class ObjectUtils {
    * @return whether the given objects are equal
    * @see Arrays#equals
    */
-  public static boolean safeEquals(Object o1, Object o2) {
+  public static boolean equals(Object o1, Object o2) {
     if (o1 == null || o2 == null) {
       return false;
     }
@@ -282,8 +282,8 @@ public final class ObjectUtils {
     return false;
   }
 
-  public static boolean safeTrimEquals(Object o1, Object o2) {
-    boolean equals = safeEquals(o1, o2);
+  public static boolean trimEquals(Object o1, Object o2) {
+    boolean equals = equals(o1, o2);
     if (!equals) {
       if (o1 != null && o2 != null) {
         if (o1 instanceof String && o2 instanceof String) {
@@ -294,6 +294,10 @@ public final class ObjectUtils {
     return equals;
   }
 
+  public static boolean trimNoEquals(Object o1, Object o2) {
+    return !trimEquals(o1, o2);
+  }
+
   /**
    * Return as hash code for the given object; typically the value of <code>
    * {@link Object#hashCode()}</code>. If the object is an array, this method 
will delegate to any
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4fc0cf147..38f9a76c8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -27,7 +27,6 @@ import 
org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.bean.Dependency;
 import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -252,14 +251,6 @@ public class Application implements Serializable {
     return ingressTemplate;
   }
 
-  public void setIngressTemplate(String ingressTemplate) {
-    this.ingressTemplate = ingressTemplate;
-  }
-
-  public String getDefaultModeIngress() {
-    return defaultModeIngress;
-  }
-
   public void setDefaultModeIngress(String defaultModeIngress) {
     this.defaultModeIngress = defaultModeIngress;
   }
@@ -357,11 +348,6 @@ public class Application implements Serializable {
     return DevelopmentMode.of(jobType);
   }
 
-  @JsonIgnore
-  public void setDevelopmentMode(DevelopmentMode mode) {
-    this.jobType = mode.getValue();
-  }
-
   @JsonIgnore
   public FlinkAppState getFlinkAppStateEnum() {
     return FlinkAppState.of(state);
@@ -509,75 +495,6 @@ public class Application implements Serializable {
     return false;
   }
 
-  /**
-   * Parameter comparison, mainly to compare whether the parameters related to 
Flink runtime have
-   * changed
-   */
-  public boolean eqJobParam(Application other) {
-    // 1) Resolve Order has it changed
-    // 2) flink Version has it changed
-    // 3) Execution Mode has it changed
-    // 4) Parallelism has it changed
-    // 5) Task Slots has it changed
-    // 6) Options has it changed
-    // 7) properties has it changed
-    // 8) Program Args has it changed
-    // 9) Flink Version  has it changed
-
-    if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) {
-      return false;
-    }
-
-    if (!ObjectUtils.safeEquals(this.getResolveOrder(), 
other.getResolveOrder())
-        || !ObjectUtils.safeEquals(this.getExecutionMode(), 
other.getExecutionMode())
-        || !ObjectUtils.safeEquals(this.getK8sRestExposedType(), 
other.getK8sRestExposedType())) {
-      return false;
-    }
-
-    if (this.getOptions() != null) {
-      if (other.getOptions() != null) {
-        if (!this.getOptions().trim().equals(other.getOptions().trim())) {
-          Map<String, Object> optMap = this.getOptionMap();
-          Map<String, Object> otherMap = other.getOptionMap();
-          if (optMap.size() != otherMap.size()) {
-            return false;
-          }
-          for (Map.Entry<String, Object> entry : optMap.entrySet()) {
-            if (!entry.getValue().equals(otherMap.get(entry.getKey()))) {
-              return false;
-            }
-          }
-        }
-      } else {
-        return false;
-      }
-    } else if (other.getOptions() != null) {
-      return false;
-    }
-
-    if (this.getDynamicProperties() != null) {
-      if (other.getDynamicProperties() != null) {
-        if 
(!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim()))
 {
-          return false;
-        }
-      } else {
-        return false;
-      }
-    } else if (other.getDynamicProperties() != null) {
-      return false;
-    }
-
-    if (this.getArgs() != null) {
-      if (other.getArgs() != null) {
-        return this.getArgs().trim().equals(other.getArgs().trim());
-      } else {
-        return false;
-      }
-    } else {
-      return other.getArgs() == null;
-    }
-  }
-
   @JsonIgnore
   public StorageType getStorageType() {
     return getStorageType(getExecutionMode());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
index cb3fc4f97..9b30955ee 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
@@ -18,7 +18,6 @@
 package org.apache.streampark.console.core.entity;
 
 import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.core.bean.Dependency;
 import org.apache.streampark.console.core.enums.ChangedType;
 
@@ -30,6 +29,7 @@ import lombok.Data;
 
 import java.util.Base64;
 import java.util.Date;
+import java.util.Objects;
 
 @Data
 @TableName("t_flink_sql")
@@ -91,8 +91,8 @@ public class FlinkSql {
     Dependency targetDependency = 
Dependency.toDependency(target.getDependency());
     boolean depDifference = !thisDependency.eq(targetDependency);
     // 3) determine if team resource has changed
-    boolean teamResDifference =
-        !ObjectUtils.safeEquals(this.teamResource, target.getTeamResource());
+    boolean teamResDifference = !Objects.equals(this.teamResource, 
target.getTeamResource());
+
     if (sqlDifference && depDifference && teamResDifference) {
       return ChangedType.ALL;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 2a38308cb..b7c37eee6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -108,6 +108,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.streampark.console.core.enums.Operation.RELEASE;
+
 @Service
 @Slf4j
 @Transactional(propagation = Propagation.SUPPORTS, rollbackFor = 
Exception.class)
@@ -170,8 +172,7 @@ public class AppBuildPipeServiceImpl
 
     Application app = applicationService.getById(appId);
     ApplicationLog applicationLog = new ApplicationLog();
-    applicationLog.setOptionName(
-        org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+    applicationLog.setOptionName(RELEASE.getValue());
     applicationLog.setAppId(app.getId());
     applicationLog.setOptionTime(new Date());
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 96253c6d7..0505d69b2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -135,6 +135,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -837,57 +838,60 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), 
appParam.getTeamId()));
 
     application.setRelease(ReleaseState.NEED_RELEASE.get());
+
+    // 1) jar job jar file changed
     if (application.isUploadJob()) {
-      if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
+      if (!Objects.equals(application.getJar(), appParam.getJar())) {
         application.setBuild(true);
       } else {
         File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
         if (jarFile.exists()) {
-          long checkSum = 0;
           try {
-            checkSum = FileUtils.checksumCRC32(jarFile);
+            long checkSum = FileUtils.checksumCRC32(jarFile);
+            if (!Objects.equals(checkSum, application.getJarCheckSum())) {
+              application.setBuild(true);
+            }
           } catch (IOException e) {
             log.error("Error in checksumCRC32 for {}.", jarFile);
             throw new RuntimeException(e);
           }
-          if (!ObjectUtils.safeEquals(checkSum, application.getJarCheckSum())) 
{
-            application.setBuild(true);
-          }
         }
       }
     }
 
-    if (!application.getBuild()) {
-      if (!application.getExecutionMode().equals(appParam.getExecutionMode())) 
{
-        if 
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
-            || 
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
-          application.setBuild(true);
-        }
-      }
-    }
-
-    if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
-      if (!ObjectUtils.safeTrimEquals(
+    // 2) k8s podTemplate changed..
+    if (application.getBuild() && 
ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
+      if (ObjectUtils.trimNoEquals(
               application.getK8sRestExposedType(), 
appParam.getK8sRestExposedType())
-          || !ObjectUtils.safeTrimEquals(
+          || ObjectUtils.trimNoEquals(
               application.getK8sJmPodTemplate(), 
appParam.getK8sJmPodTemplate())
-          || !ObjectUtils.safeTrimEquals(
+          || ObjectUtils.trimNoEquals(
               application.getK8sTmPodTemplate(), 
appParam.getK8sTmPodTemplate())
-          || !ObjectUtils.safeTrimEquals(
+          || ObjectUtils.trimNoEquals(
               application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
-          || !ObjectUtils.safeTrimEquals(
+          || ObjectUtils.trimNoEquals(
               application.getK8sHadoopIntegration(), 
appParam.getK8sHadoopIntegration())
-          || !ObjectUtils.safeTrimEquals(application.getFlinkImage(), 
appParam.getFlinkImage())) {
+          || ObjectUtils.trimNoEquals(application.getFlinkImage(), 
appParam.getFlinkImage())) {
         application.setBuild(true);
       }
     }
 
-    // when flink version has changed, we should rebuild the application. 
Otherwise, the shims jar
-    // may be not suitable for the new flink version.
-    if (!ObjectUtils.safeEquals(application.getVersionId(), 
appParam.getVersionId())) {
+    // 3) flink version changed
+    if (!application.getBuild()
+        && !Objects.equals(application.getVersionId(), 
appParam.getVersionId())) {
       application.setBuild(true);
     }
 
+    // 4) yarn application mode change
+    if (!application.getBuild()) {
+      if (!application.getExecutionMode().equals(appParam.getExecutionMode())) 
{
+        if 
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
+            || 
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
+          application.setBuild(true);
+        }
+      }
+    }
+
     appParam.setJobType(application.getJobType());
     // changes to the following parameters need to be re-release to take effect
     application.setJobName(appParam.getJobName());
@@ -935,15 +939,16 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // Flink Sql job...
     if (application.isFlinkSqlJob()) {
       updateFlinkSqlJob(application, appParam);
+      return true;
+    }
+
+    if (application.isStreamParkJob()) {
+      configService.update(appParam, application.isRunning());
     } else {
-      if (application.isStreamParkJob()) {
-        configService.update(appParam, application.isRunning());
-      } else {
-        application.setJar(appParam.getJar());
-        application.setMainClass(appParam.getMainClass());
-      }
+      application.setJar(appParam.getJar());
+      application.setMainClass(appParam.getMainClass());
     }
-    baseMapper.updateById(application);
+    this.updateById(application);
     return true;
   }
 
@@ -1015,6 +1020,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         }
       }
     }
+    this.updateById(application);
     this.configService.update(appParam, application.isRunning());
   }
 

Reply via email to