This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch resource
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/resource by this push:
new 528540686 application minor improvament
528540686 is described below
commit 528540686cda9b8412e9eee2cf31f3f756816e26
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 22 12:41:29 2023 +0800
application minor improvament
---
.../streampark/console/base/util/CommonUtils.java | 4 +-
.../streampark/console/base/util/ObjectUtils.java | 12 ++--
.../console/core/entity/Application.java | 83 ----------------------
.../streampark/console/core/entity/FlinkSql.java | 6 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 5 +-
.../core/service/impl/ApplicationServiceImpl.java | 70 +++++++++---------
6 files changed, 54 insertions(+), 126 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
index 5b80ba623..f6bc11983 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
@@ -199,7 +199,7 @@ public final class CommonUtils implements Serializable {
if (iterator != null) {
while (iterator.hasNext()) {
Object candidate = iterator.next();
- if (ObjectUtils.safeEquals(candidate, element)) {
+ if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
@@ -218,7 +218,7 @@ public final class CommonUtils implements Serializable {
if (enumeration != null) {
while (enumeration.hasMoreElements()) {
Object candidate = enumeration.nextElement();
- if (ObjectUtils.safeEquals(candidate, element)) {
+ if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
index f375dad52..c16d2a42f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
@@ -104,7 +104,7 @@ public final class ObjectUtils {
return false;
}
for (Object arrayEle : array) {
- if (safeEquals(arrayEle, element)) {
+ if (equals(arrayEle, element)) {
return true;
}
}
@@ -238,7 +238,7 @@ public final class ObjectUtils {
* @return whether the given objects are equal
* @see Arrays#equals
*/
- public static boolean safeEquals(Object o1, Object o2) {
+ public static boolean equals(Object o1, Object o2) {
if (o1 == null || o2 == null) {
return false;
}
@@ -282,8 +282,8 @@ public final class ObjectUtils {
return false;
}
- public static boolean safeTrimEquals(Object o1, Object o2) {
- boolean equals = safeEquals(o1, o2);
+ public static boolean trimEquals(Object o1, Object o2) {
+ boolean equals = equals(o1, o2);
if (!equals) {
if (o1 != null && o2 != null) {
if (o1 instanceof String && o2 instanceof String) {
@@ -294,6 +294,10 @@ public final class ObjectUtils {
return equals;
}
+ public static boolean trimNoEquals(Object o1, Object o2) {
+ return !trimEquals(o1, o2);
+ }
+
/**
* Return as hash code for the given object; typically the value of <code>
* {@link Object#hashCode()}</code>. If the object is an array, this method
will delegate to any
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4fc0cf147..38f9a76c8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -27,7 +27,6 @@ import
org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -252,14 +251,6 @@ public class Application implements Serializable {
return ingressTemplate;
}
- public void setIngressTemplate(String ingressTemplate) {
- this.ingressTemplate = ingressTemplate;
- }
-
- public String getDefaultModeIngress() {
- return defaultModeIngress;
- }
-
public void setDefaultModeIngress(String defaultModeIngress) {
this.defaultModeIngress = defaultModeIngress;
}
@@ -357,11 +348,6 @@ public class Application implements Serializable {
return DevelopmentMode.of(jobType);
}
- @JsonIgnore
- public void setDevelopmentMode(DevelopmentMode mode) {
- this.jobType = mode.getValue();
- }
-
@JsonIgnore
public FlinkAppState getFlinkAppStateEnum() {
return FlinkAppState.of(state);
@@ -509,75 +495,6 @@ public class Application implements Serializable {
return false;
}
- /**
- * Parameter comparison, mainly to compare whether the parameters related to
Flink runtime have
- * changed
- */
- public boolean eqJobParam(Application other) {
- // 1) Resolve Order has it changed
- // 2) flink Version has it changed
- // 3) Execution Mode has it changed
- // 4) Parallelism has it changed
- // 5) Task Slots has it changed
- // 6) Options has it changed
- // 7) properties has it changed
- // 8) Program Args has it changed
- // 9) Flink Version has it changed
-
- if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) {
- return false;
- }
-
- if (!ObjectUtils.safeEquals(this.getResolveOrder(),
other.getResolveOrder())
- || !ObjectUtils.safeEquals(this.getExecutionMode(),
other.getExecutionMode())
- || !ObjectUtils.safeEquals(this.getK8sRestExposedType(),
other.getK8sRestExposedType())) {
- return false;
- }
-
- if (this.getOptions() != null) {
- if (other.getOptions() != null) {
- if (!this.getOptions().trim().equals(other.getOptions().trim())) {
- Map<String, Object> optMap = this.getOptionMap();
- Map<String, Object> otherMap = other.getOptionMap();
- if (optMap.size() != otherMap.size()) {
- return false;
- }
- for (Map.Entry<String, Object> entry : optMap.entrySet()) {
- if (!entry.getValue().equals(otherMap.get(entry.getKey()))) {
- return false;
- }
- }
- }
- } else {
- return false;
- }
- } else if (other.getOptions() != null) {
- return false;
- }
-
- if (this.getDynamicProperties() != null) {
- if (other.getDynamicProperties() != null) {
- if
(!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim()))
{
- return false;
- }
- } else {
- return false;
- }
- } else if (other.getDynamicProperties() != null) {
- return false;
- }
-
- if (this.getArgs() != null) {
- if (other.getArgs() != null) {
- return this.getArgs().trim().equals(other.getArgs().trim());
- } else {
- return false;
- }
- } else {
- return other.getArgs() == null;
- }
- }
-
@JsonIgnore
public StorageType getStorageType() {
return getStorageType(getExecutionMode());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
index cb3fc4f97..9b30955ee 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
@@ -18,7 +18,6 @@
package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.enums.ChangedType;
@@ -30,6 +29,7 @@ import lombok.Data;
import java.util.Base64;
import java.util.Date;
+import java.util.Objects;
@Data
@TableName("t_flink_sql")
@@ -91,8 +91,8 @@ public class FlinkSql {
Dependency targetDependency =
Dependency.toDependency(target.getDependency());
boolean depDifference = !thisDependency.eq(targetDependency);
// 3) determine if team resource has changed
- boolean teamResDifference =
- !ObjectUtils.safeEquals(this.teamResource, target.getTeamResource());
+ boolean teamResDifference = !Objects.equals(this.teamResource,
target.getTeamResource());
+
if (sqlDifference && depDifference && teamResDifference) {
return ChangedType.ALL;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 2a38308cb..b7c37eee6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -108,6 +108,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.streampark.console.core.enums.Operation.RELEASE;
+
@Service
@Slf4j
@Transactional(propagation = Propagation.SUPPORTS, rollbackFor =
Exception.class)
@@ -170,8 +172,7 @@ public class AppBuildPipeServiceImpl
Application app = applicationService.getById(appId);
ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(
- org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+ applicationLog.setOptionName(RELEASE.getValue());
applicationLog.setAppId(app.getId());
applicationLog.setOptionTime(new Date());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 96253c6d7..0505d69b2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -135,6 +135,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -837,57 +838,60 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(),
appParam.getTeamId()));
application.setRelease(ReleaseState.NEED_RELEASE.get());
+
+ // 1) jar job jar file changed
if (application.isUploadJob()) {
- if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
+ if (!Objects.equals(application.getJar(), appParam.getJar())) {
application.setBuild(true);
} else {
File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
if (jarFile.exists()) {
- long checkSum = 0;
try {
- checkSum = FileUtils.checksumCRC32(jarFile);
+ long checkSum = FileUtils.checksumCRC32(jarFile);
+ if (!Objects.equals(checkSum, application.getJarCheckSum())) {
+ application.setBuild(true);
+ }
} catch (IOException e) {
log.error("Error in checksumCRC32 for {}.", jarFile);
throw new RuntimeException(e);
}
- if (!ObjectUtils.safeEquals(checkSum, application.getJarCheckSum()))
{
- application.setBuild(true);
- }
}
}
}
- if (!application.getBuild()) {
- if (!application.getExecutionMode().equals(appParam.getExecutionMode()))
{
- if
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
- ||
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
- application.setBuild(true);
- }
- }
- }
-
- if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
- if (!ObjectUtils.safeTrimEquals(
+ // 2) k8s podTemplate changed..
+ if (application.getBuild() &&
ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
+ if (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(),
appParam.getK8sRestExposedType())
- || !ObjectUtils.safeTrimEquals(
+ || ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(),
appParam.getK8sJmPodTemplate())
- || !ObjectUtils.safeTrimEquals(
+ || ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(),
appParam.getK8sTmPodTemplate())
- || !ObjectUtils.safeTrimEquals(
+ || ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
- || !ObjectUtils.safeTrimEquals(
+ || ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(),
appParam.getK8sHadoopIntegration())
- || !ObjectUtils.safeTrimEquals(application.getFlinkImage(),
appParam.getFlinkImage())) {
+ || ObjectUtils.trimNoEquals(application.getFlinkImage(),
appParam.getFlinkImage())) {
application.setBuild(true);
}
}
- // when flink version has changed, we should rebuild the application.
Otherwise, the shims jar
- // may be not suitable for the new flink version.
- if (!ObjectUtils.safeEquals(application.getVersionId(),
appParam.getVersionId())) {
+ // 3) flink version changed
+ if (!application.getBuild()
+ && !Objects.equals(application.getVersionId(),
appParam.getVersionId())) {
application.setBuild(true);
}
+ // 4) yarn application mode change
+ if (!application.getBuild()) {
+ if (!application.getExecutionMode().equals(appParam.getExecutionMode()))
{
+ if
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
+ ||
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
+ application.setBuild(true);
+ }
+ }
+ }
+
appParam.setJobType(application.getJobType());
// changes to the following parameters need to be re-release to take effect
application.setJobName(appParam.getJobName());
@@ -935,15 +939,16 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// Flink Sql job...
if (application.isFlinkSqlJob()) {
updateFlinkSqlJob(application, appParam);
+ return true;
+ }
+
+ if (application.isStreamParkJob()) {
+ configService.update(appParam, application.isRunning());
} else {
- if (application.isStreamParkJob()) {
- configService.update(appParam, application.isRunning());
- } else {
- application.setJar(appParam.getJar());
- application.setMainClass(appParam.getMainClass());
- }
+ application.setJar(appParam.getJar());
+ application.setMainClass(appParam.getMainClass());
}
- baseMapper.updateById(application);
+ this.updateById(application);
return true;
}
@@ -1015,6 +1020,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
}
+ this.updateById(application);
this.configService.update(appParam, application.isRunning());
}