This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 50ee38390 [Improve] Console core module code optimization (#3202)
50ee38390 is described below
commit 50ee383900773664249410011c17b0f285bcc905
Author: ChengJie1053 <[email protected]>
AuthorDate: Thu Sep 28 13:13:36 2023 +0800
[Improve] Console core module code optimization (#3202)
* [Improve] Console core module code optimization
---
.../console/core/bean/AlertTemplate.java | 29 ++++++++++++++++------
.../console/core/entity/Application.java | 8 +++---
.../console/core/entity/ApplicationConfig.java | 4 ++-
.../streampark/console/core/entity/FlinkEnv.java | 3 ++-
.../streampark/console/core/entity/Project.java | 2 +-
.../streampark/console/core/entity/Resource.java | 8 +++---
6 files changed, 37 insertions(+), 17 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index f7fb06e64..e96b31583 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -57,6 +57,12 @@ public class AlertTemplate implements Serializable {
private Integer lostJobs;
private Integer cancelledJobs;
+ private static final String ALERT_SUBJECT_PREFIX = "StreamPark Alert:";
+
+ private static final String ALERT_TITLE_PREFIX = "Notify:";
+
+ private static final String PROBE = "PROBE";
+
public static AlertTemplate of(Application application, FlinkAppStateEnum
appState) {
return new AlertTemplateBuilder()
.setDuration(application.getStartTime(), application.getEndTime())
@@ -68,8 +74,11 @@ public class AlertTemplate implements Serializable {
.setRestartIndex(application.getRestartCount())
.setTotalRestart(application.getRestartSize())
.setType(1)
- .setTitle(String.format("Notify: %s %s", application.getJobName(),
appState.name()))
- .setSubject(String.format("StreamPark Alert: %s %s",
application.getJobName(), appState))
+ .setTitle(
+ String.format(
+ "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(),
appState.name()))
+ .setSubject(
+ String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
application.getJobName(), appState))
.setStatus(appState.name())
.build();
}
@@ -84,9 +93,11 @@ public class AlertTemplate implements Serializable {
.setCpFailureRateInterval(
DateUtils.toDuration(application.getCpFailureRateInterval() * 1000
* 60))
.setCpMaxFailureInterval(application.getCpMaxFailureInterval())
- .setTitle(String.format("Notify: %s checkpoint FAILED",
application.getJobName()))
+ .setTitle(
+ String.format("%s %s checkpoint FAILED", ALERT_TITLE_PREFIX,
application.getJobName()))
.setSubject(
- String.format("StreamPark Alert: %s, checkPoint is Failed",
application.getJobName()))
+ String.format(
+ "%s %s, checkPoint is Failed", ALERT_SUBJECT_PREFIX,
application.getJobName()))
.build();
}
@@ -98,9 +109,11 @@ public class AlertTemplate implements Serializable {
.setStartTime(cluster.getStartTime())
.setEndTime(cluster.getEndTime())
.setType(3)
- .setTitle(String.format("Notify: %s %s", cluster.getClusterName(),
clusterState.name()))
+ .setTitle(
+ String.format(
+ "%s %s %s", ALERT_TITLE_PREFIX, cluster.getClusterName(),
clusterState.name()))
.setSubject(
- String.format("StreamPark Alert: %s %s", cluster.getClusterName(),
clusterState))
+ String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
cluster.getClusterName(), clusterState))
.setStatus(clusterState.name())
.setAllJobs(cluster.getAllJobs())
.setAffectedJobs(cluster.getAffectedJobs())
@@ -115,8 +128,8 @@ public class AlertTemplate implements Serializable {
.setFailedJobs(alertProbeMsg.getFailedJobs())
.setLostJobs(alertProbeMsg.getLostJobs())
.setCancelledJobs(alertProbeMsg.getCancelledJobs())
- .setSubject("StreamPark Alert: PROBE")
- .setTitle("PROBE")
+ .setSubject(String.format("%s %s", ALERT_SUBJECT_PREFIX, PROBE))
+ .setTitle(PROBE)
.build();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 960683594..1bb002ed0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -37,6 +37,7 @@ import
org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates;
import org.apache.streampark.flink.packer.maven.DependencyInfo;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
@@ -282,7 +283,8 @@ public class Application implements Serializable {
}
Map<String, Object> hotParamsMap = this.getHotParamsMap();
- if (!hotParamsMap.isEmpty() &&
hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
+ if (MapUtils.isNotEmpty(hotParamsMap)
+ && hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
String yarnQueue =
hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString();
String labelExpr =
Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL()))
@@ -541,7 +543,7 @@ public class Application implements Serializable {
@SneakyThrows
@SuppressWarnings("unchecked")
public Map<String, Object> getHotParamsMap() {
- if (this.hotParams != null) {
+ if (StringUtils.isNotBlank(this.hotParams)) {
Map<String, Object> map = JacksonUtils.read(this.hotParams, Map.class);
map.entrySet().removeIf(entry -> entry.getValue() == null);
return map;
@@ -564,7 +566,7 @@ public class Application implements Serializable {
if (needFillYarnQueueLabel(executionModeEnum)) {
hotParams.putAll(YarnQueueLabelExpression.getQueueLabelMap(appParam.getYarnQueue()));
}
- if (!hotParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(hotParams)) {
this.setHotParams(JacksonUtils.write(hotParams));
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index 48a2f951e..b920d6e56 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -22,6 +22,8 @@ import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
+import org.apache.commons.collections.MapUtils;
+
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@@ -95,7 +97,7 @@ public class ApplicationConfig {
}
}
- if (configs != null && !configs.isEmpty()) {
+ if (MapUtils.isNotEmpty(configs)) {
return configs.entrySet().stream()
.collect(
Collectors.toMap(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index b2798e403..b56edf631 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -33,6 +33,7 @@ import lombok.Data;
import java.io.File;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
@@ -73,7 +74,7 @@ public class FlinkEnv implements Serializable {
public void doSetFlinkConf() throws ApiDetailException {
try {
File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
- String flinkConf = FileUtils.readFileToString(yaml);
+ String flinkConf = FileUtils.readFileToString(yaml,
StandardCharsets.UTF_8);
this.flinkConf = DeflaterUtils.zipString(flinkConf);
} catch (Exception e) {
throw new ApiDetailException(e);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index e9fb75922..ffc0175ff 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -108,7 +108,7 @@ public class Project implements Serializable {
/** get project source */
@JsonIgnore
public File getAppSource() {
- if (appSource == null) {
+ if (StringUtils.isBlank(appSource)) {
appSource = Workspace.PROJECT_LOCAL_PATH();
}
File sourcePath = new File(appSource);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
index 2c6159dc4..502dfc3b9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
@@ -20,6 +20,8 @@ package org.apache.streampark.console.core.entity;
import org.apache.streampark.console.core.enums.EngineTypeEnum;
import org.apache.streampark.console.core.enums.ResourceTypeEnum;
+import org.apache.commons.lang3.StringUtils;
+
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@@ -84,7 +86,7 @@ public class Resource implements Serializable {
private transient String connector;
public void setResourcePath(String resourcePath) {
- if (resourcePath == null) {
+ if (StringUtils.isBlank(resourcePath)) {
throw new IllegalArgumentException("resource path cannot be null.");
}
String[] namePath = resourcePath.split(":");
@@ -95,14 +97,14 @@ public class Resource implements Serializable {
}
public String getFileName() {
- if (this.resourcePath == null) {
+ if (StringUtils.isBlank(this.resourcePath)) {
return null;
}
return resourcePath.split(":")[0];
}
public String getFilePath() {
- if (this.resourcePath == null) {
+ if (StringUtils.isBlank(this.resourcePath)) {
return null;
}
return resourcePath.split(":")[1];