This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b39d7e21b [Improvement] Some code optimization (#3909)
b39d7e21b is described below
commit b39d7e21b7c18737ae5fc1c462065668ee9bc6ef
Author: xiangzihao <[email protected]>
AuthorDate: Tue Jul 23 17:07:30 2024 +0800
[Improvement] Some code optimization (#3909)
* some code optimization
---
.../console/core/bean/AlertTemplate.java | 265 +++------------------
.../core/component/FlinkCheckpointProcessor.java | 5 +-
.../console/core/entity/Application.java | 2 +-
.../console/core/entity/FlinkStateChangeEvent.java | 59 +++++
.../console/core/enums/FlinkAppStateEnum.java | 10 +-
.../console/core/enums/OptionStateEnum.java | 2 +-
.../console/core/utils/AlertTemplateUtils.java | 138 +++++++++++
.../console/core/watcher/FlinkAppHttpWatcher.java | 71 ++----
.../console/core/watcher/FlinkClusterWatcher.java | 4 +-
.../core/watcher/FlinkK8sChangeEventListener.java | 5 +-
.../console/core/watcher/SparkAppHttpWatcher.java | 3 +-
11 files changed, 264 insertions(+), 300 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index ce24a1e9f..4b9f220f8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -17,25 +17,24 @@
package org.apache.streampark.console.core.bean;
-import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.common.util.YarnUtils;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.FlinkCluster;
-import org.apache.streampark.console.core.entity.SparkApplication;
-import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
-import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
-import org.apache.streampark.console.core.enums.SparkAppStateEnum;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
import java.util.TimeZone;
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class AlertTemplate implements Serializable {
private String title;
@@ -52,7 +51,7 @@ public class AlertTemplate implements Serializable {
private Boolean restart;
private Integer restartIndex;
private Integer totalRestart;
- private boolean atAll = false;
+ private boolean atAll;
private Integer allJobs;
private Integer affectedJobs;
private String user;
@@ -61,157 +60,22 @@ public class AlertTemplate implements Serializable {
private Integer lostJobs;
private Integer cancelledJobs;
- private static final String ALERT_SUBJECT_PREFIX = "StreamPark Alert:";
+ public static class AlertTemplateBuilder {
- private static final String ALERT_TITLE_PREFIX = "Notify:";
-
- private static final String PROBE = "PROBE";
-
- public static AlertTemplate of(Application application, FlinkAppStateEnum
appState) {
- return new AlertTemplateBuilder()
- .setDuration(application.getStartTime(), application.getEndTime())
- .setJobName(application.getJobName())
- .setLink(application.getFlinkExecutionMode(),
application.getClusterId())
- .setStartTime(application.getStartTime())
- .setEndTime(application.getEndTime())
- .setRestart(application.isNeedRestartOnFailed(),
application.getRestartCount())
- .setRestartIndex(application.getRestartCount())
- .setTotalRestart(application.getRestartSize())
- .setType(1)
- .setTitle(
- String.format(
- "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(),
- appState.name()))
- .setSubject(
- String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
application.getJobName(),
- appState))
- .setStatus(appState.name())
- .build();
- }
-
- public static AlertTemplate of(Application application,
CheckPointStatusEnum statusEnum) {
- return new AlertTemplateBuilder()
- .setDuration(application.getStartTime(), application.getEndTime())
- .setJobName(application.getJobName())
- .setLink(application.getFlinkExecutionMode(),
application.getClusterId())
- .setStartTime(application.getStartTime())
- .setType(2)
- .setCpFailureRateInterval(
- DateUtils.toDuration(application.getCpFailureRateInterval() *
1000 * 60))
- .setCpMaxFailureInterval(application.getCpMaxFailureInterval())
- .setTitle(
- String.format("%s %s checkpoint FAILED", ALERT_TITLE_PREFIX,
- application.getJobName()))
- .setSubject(
- String.format(
- "%s %s, checkPoint is Failed", ALERT_SUBJECT_PREFIX,
- application.getJobName()))
- .build();
- }
-
- public static AlertTemplate of(FlinkCluster cluster, ClusterState
clusterState) {
- return new AlertTemplateBuilder()
- .setDuration(cluster.getStartTime(), cluster.getEndTime())
- .setJobName(cluster.getClusterName())
- .setLink(cluster.getFlinkExecutionModeEnum(),
cluster.getClusterId())
- .setStartTime(cluster.getStartTime())
- .setEndTime(cluster.getEndTime())
- .setType(3)
- .setTitle(
- String.format(
- "%s %s %s", ALERT_TITLE_PREFIX, cluster.getClusterName(),
- clusterState.name()))
- .setSubject(
- String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
cluster.getClusterName(),
- clusterState))
- .setStatus(clusterState.name())
- .setAllJobs(cluster.getAllJobs())
- .setAffectedJobs(cluster.getAffectedJobs())
- .build();
- }
-
- public static AlertTemplate of(AlertProbeMsg alertProbeMsg) {
- return new AlertTemplateBuilder()
- .setType(4)
- .setUser(alertProbeMsg.getUser())
- .setProbeJobs(alertProbeMsg.getProbeJobs())
- .setFailedJobs(alertProbeMsg.getFailedJobs())
- .setLostJobs(alertProbeMsg.getLostJobs())
- .setCancelledJobs(alertProbeMsg.getCancelledJobs())
- .setSubject(String.format("%s %s", ALERT_SUBJECT_PREFIX, PROBE))
- .setTitle(PROBE)
- .build();
- }
-
- public static AlertTemplate of(SparkApplication application,
SparkAppStateEnum appState) {
- return new AlertTemplateBuilder()
- .setDuration(application.getStartTime(), application.getEndTime())
- .setJobName(application.getJobName())
- .setLink(application.getSparkExecutionMode(),
application.getJobId())
- .setStartTime(application.getStartTime())
- .setEndTime(application.getEndTime())
- .setRestart(application.isNeedRestartOnFailed(),
application.getRestartCount())
- .setRestartIndex(application.getRestartCount())
- .setTotalRestart(application.getRestartSize())
- .setType(1)
- .setTitle(
- String.format(
- "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(),
appState.name()))
- .setSubject(
- String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
application.getJobName(), appState))
- .setStatus(appState.name())
- .build();
- }
- private static class AlertTemplateBuilder {
-
- private final AlertTemplate alertTemplate = new AlertTemplate();
-
- public AlertTemplateBuilder setTitle(String title) {
- alertTemplate.setTitle(title);
+ public AlertTemplateBuilder startTime(Date startTime) {
+ this.startTime = DateUtils.format(startTime,
DateUtils.fullFormat(), TimeZone.getDefault());
return this;
}
- public AlertTemplateBuilder setSubject(String subject) {
- alertTemplate.setSubject(subject);
+ public AlertTemplateBuilder endTime(Date endTime) {
+ this.endTime = DateUtils.format(
+ endTime == null ? new Date() : endTime,
+ DateUtils.fullFormat(),
+ TimeZone.getDefault());
return this;
}
- public AlertTemplateBuilder setJobName(String jobName) {
- alertTemplate.setJobName(jobName);
- return this;
- }
-
- public AlertTemplateBuilder setType(Integer type) {
- alertTemplate.setType(type);
- return this;
- }
-
- public AlertTemplateBuilder setStatus(String status) {
- alertTemplate.setStatus(status);
- return this;
- }
-
- public AlertTemplateBuilder setStartTime(Date startTime) {
- alertTemplate.setStartTime(
- DateUtils.format(startTime, DateUtils.fullFormat(),
TimeZone.getDefault()));
- return this;
- }
-
- public AlertTemplateBuilder setEndTime(Date endTime) {
- alertTemplate.setEndTime(
- DateUtils.format(
- endTime == null ? new Date() : endTime,
- DateUtils.fullFormat(),
- TimeZone.getDefault()));
- return this;
- }
-
- public AlertTemplateBuilder setDuration(String duration) {
- alertTemplate.setDuration(duration);
- return this;
- }
-
- public AlertTemplateBuilder setDuration(Date start, Date end) {
+ public AlertTemplateBuilder duration(Date start, Date end) {
long duration;
if (start == null && end == null) {
duration = 0L;
@@ -220,114 +84,47 @@ public class AlertTemplate implements Serializable {
} else {
duration = end.getTime() - start.getTime();
}
- alertTemplate.setDuration(DateUtils.toDuration(duration));
+ this.duration = DateUtils.toDuration(duration);
return this;
}
- public AlertTemplateBuilder setLink(String link) {
- alertTemplate.setLink(link);
- return this;
- }
-
- public AlertTemplateBuilder setLink(FlinkExecutionMode mode, String
appId) {
+ public AlertTemplateBuilder link(FlinkExecutionMode mode, String
appId) {
if (FlinkExecutionMode.isYarnMode(mode)) {
String format = "%s/proxy/%s/";
- String url = String.format(format,
YarnUtils.getRMWebAppURL(false), appId);
- alertTemplate.setLink(url);
+ this.link = String.format(format,
YarnUtils.getRMWebAppURL(false), appId);
} else {
- alertTemplate.setLink(null);
+ this.link = null;
}
return this;
}
- public AlertTemplateBuilder setLink(SparkExecutionMode mode, String
appId) {
+ public AlertTemplateBuilder link(SparkExecutionMode mode, String
appId) {
if (SparkExecutionMode.isYarnMode(mode)) {
String format = "%s/proxy/%s/";
- String url = String.format(format,
YarnUtils.getRMWebAppURL(false), appId);
- alertTemplate.setLink(url);
+ this.link = String.format(format,
YarnUtils.getRMWebAppURL(false), appId);
} else {
- alertTemplate.setLink(null);
+ this.link = null;
}
return this;
}
- public AlertTemplateBuilder setCpFailureRateInterval(String
cpFailureRateInterval) {
- alertTemplate.setCpFailureRateInterval(cpFailureRateInterval);
+ public AlertTemplateBuilder restart(Boolean needRestartOnFailed,
Integer restartCount) {
+ this.restart = needRestartOnFailed && restartCount > 0;
return this;
}
- public AlertTemplateBuilder setCpMaxFailureInterval(Integer
cpMaxFailureInterval) {
- alertTemplate.setCpMaxFailureInterval(cpMaxFailureInterval);
- return this;
- }
-
- public AlertTemplateBuilder setRestart(Boolean restart) {
- alertTemplate.setRestart(restart);
- return this;
- }
-
- public AlertTemplateBuilder setRestart(Boolean needRestartOnFailed,
Integer restartCount) {
- boolean needRestart = needRestartOnFailed && restartCount > 0;
- alertTemplate.setRestart(needRestart);
- return this;
- }
-
- public AlertTemplateBuilder setRestartIndex(Integer restartIndex) {
- if (alertTemplate.getRestart()) {
- alertTemplate.setRestartIndex(restartIndex);
+ public AlertTemplateBuilder restartIndex(Integer restartIndex) {
+ if (this.restart) {
+ this.restartIndex = restartIndex;
}
return this;
}
- public AlertTemplateBuilder setTotalRestart(Integer totalRestart) {
- if (alertTemplate.getRestart()) {
- alertTemplate.setTotalRestart(totalRestart);
+ public AlertTemplateBuilder totalRestart(Integer totalRestart) {
+ if (this.restart) {
+ this.totalRestart = totalRestart;
}
return this;
}
-
- public AlertTemplateBuilder setAtAll(Boolean atAll) {
- alertTemplate.setAtAll(atAll);
- return this;
- }
-
- public AlertTemplateBuilder setAllJobs(Integer allJobs) {
- alertTemplate.setAllJobs(allJobs);
- return this;
- }
-
- public AlertTemplateBuilder setAffectedJobs(Integer affectedJobs) {
- alertTemplate.setAffectedJobs(affectedJobs);
- return this;
- }
-
- public AlertTemplateBuilder setUser(String user) {
- alertTemplate.setUser(user);
- return this;
- }
-
- public AlertTemplateBuilder setProbeJobs(Integer probeJobs) {
- alertTemplate.setProbeJobs(probeJobs);
- return this;
- }
-
- public AlertTemplateBuilder setFailedJobs(Integer failedJobs) {
- alertTemplate.setFailedJobs(failedJobs);
- return this;
- }
-
- public AlertTemplateBuilder setLostJobs(Integer lostJobs) {
- alertTemplate.setLostJobs(lostJobs);
- return this;
- }
-
- public AlertTemplateBuilder setCancelledJobs(Integer cancelledJobs) {
- alertTemplate.setCancelledJobs(cancelledJobs);
- return this;
- }
-
- public AlertTemplate build() {
- return this.alertTemplate;
- }
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
index 05e7863a1..8bad65b90 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
@@ -18,7 +18,6 @@
package org.apache.streampark.console.core.component;
import org.apache.streampark.common.util.AssertUtils;
-import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
@@ -27,6 +26,7 @@ import
org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import com.github.benmanes.caffeine.cache.Cache;
@@ -134,7 +134,8 @@ public class FlinkCheckpointProcessor {
switch (failoverStrategyEnum) {
case ALERT:
alertService.alert(
- application.getAlertId(), AlertTemplate.of(application,
CheckPointStatusEnum.FAILED));
+ application.getAlertId(),
+ AlertTemplateUtils.createAlertTemplate(application,
CheckPointStatusEnum.FAILED));
break;
case RESTART:
try {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 9997c6341..3af261839 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -348,7 +348,7 @@ public class Application implements Serializable {
@JsonIgnore
public FlinkAppStateEnum getStateEnum() {
- return FlinkAppStateEnum.of(state);
+ return FlinkAppStateEnum.getState(state);
}
@JsonIgnore
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkStateChangeEvent.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkStateChangeEvent.java
new file mode 100644
index 000000000..64c9f3cdd
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkStateChangeEvent.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
+import org.apache.streampark.console.core.enums.OptionStateEnum;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Objects;
+
+@Getter
+@Setter
+public class FlinkStateChangeEvent {
+
+ private Long id;
+ private String jobId;
+ private FlinkAppStateEnum appState;
+ private OptionStateEnum optionState;
+ private String jobManagerUrl;
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ FlinkStateChangeEvent that = (FlinkStateChangeEvent) object;
+ return Objects.equals(id, that.id)
+ && Objects.equals(jobId, that.jobId)
+ && Objects.equals(appState, that.appState)
+ && Objects.equals(optionState, that.optionState)
+ && Objects.equals(jobManagerUrl, that.jobManagerUrl);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, jobId, appState, optionState, jobManagerUrl);
+ }
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
index be46f30db..fb6730ec6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
@@ -109,7 +109,7 @@ public enum FlinkAppStateEnum {
this.value = value;
}
- public static FlinkAppStateEnum of(Integer state) {
+ public static FlinkAppStateEnum getState(Integer state) {
for (FlinkAppStateEnum appState : values()) {
if (appState.value == state) {
return appState;
@@ -118,7 +118,7 @@ public enum FlinkAppStateEnum {
return FlinkAppStateEnum.OTHER;
}
- public static FlinkAppStateEnum of(String name) {
+ public static FlinkAppStateEnum getState(String name) {
for (FlinkAppStateEnum appState : values()) {
if (appState.name().equals(name)) {
return appState;
@@ -128,7 +128,7 @@ public enum FlinkAppStateEnum {
}
public static boolean isEndState(Integer appState) {
- FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.of(appState);
+ FlinkAppStateEnum flinkAppStateEnum =
FlinkAppStateEnum.getState(appState);
return FlinkAppStateEnum.CANCELED == flinkAppStateEnum
|| FlinkAppStateEnum.FAILED == flinkAppStateEnum
|| FlinkAppStateEnum.KILLED == flinkAppStateEnum
@@ -139,7 +139,7 @@ public enum FlinkAppStateEnum {
}
public static boolean isLost(Integer appState) {
- FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.of(appState);
+ FlinkAppStateEnum flinkAppStateEnum =
FlinkAppStateEnum.getState(appState);
return FlinkAppStateEnum.LOST == flinkAppStateEnum;
}
@@ -154,7 +154,7 @@ public enum FlinkAppStateEnum {
if (FlinkJobState.K8S_INITIALIZING() == flinkJobState) {
return INITIALIZING;
}
- return of(flinkJobState.toString());
+ return getState(flinkJobState.toString());
}
/** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/OptionStateEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/OptionStateEnum.java
index 0a6644b06..7230c9694 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/OptionStateEnum.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/OptionStateEnum.java
@@ -44,7 +44,7 @@ public enum OptionStateEnum {
this.value = value;
}
- public static OptionStateEnum of(Integer state) {
+ public static OptionStateEnum getState(Integer state) {
return Arrays.stream(values()).filter((x) -> x.value ==
state).findFirst().orElse(null);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java
new file mode 100644
index 000000000..9ad9c0e33
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.utils;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.util.DateUtils;
+import org.apache.streampark.console.core.bean.AlertProbeMsg;
+import org.apache.streampark.console.core.bean.AlertTemplate;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.enums.AlertTypeEnum;
+import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
+import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class AlertTemplateUtils {
+
+ private static final String ALERT_SUBJECT_PREFIX = "StreamPark Alert:";
+
+ private static final String ALERT_TITLE_PREFIX = "Notify:";
+
+ private static final String PROBE = "PROBE";
+
+ public static AlertTemplate createAlertTemplate(Application application,
FlinkAppStateEnum appState) {
+ return AlertTemplate.builder()
+ .duration(application.getStartTime(), application.getEndTime())
+ .jobName(application.getJobName())
+ .link(application.getFlinkExecutionMode(),
application.getClusterId())
+ .startTime(application.getStartTime())
+ .endTime(application.getEndTime())
+ .restart(application.isNeedRestartOnFailed(),
application.getRestartCount())
+ .restartIndex(application.getRestartCount())
+ .totalRestart(application.getRestartSize())
+ .type(AlertTypeEnum.EMAIL.getCode())
+ .title(
+ String.format(
+ "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(),
+ appState.name()))
+ .subject(
+ String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
application.getJobName(),
+ appState))
+ .status(appState.name())
+ .build();
+ }
+
+ public static AlertTemplate createAlertTemplate(Application application,
CheckPointStatusEnum statusEnum) {
+ return AlertTemplate.builder()
+ .duration(application.getStartTime(), application.getEndTime())
+ .jobName(application.getJobName())
+ .link(application.getFlinkExecutionMode(),
application.getClusterId())
+ .startTime(application.getStartTime())
+ .type(AlertTypeEnum.DING_TALK.getCode())
+ .cpFailureRateInterval(
+ DateUtils.toDuration(application.getCpFailureRateInterval() *
1000 * 60))
+ .cpMaxFailureInterval(application.getCpMaxFailureInterval())
+ .title(
+ String.format("%s %s checkpoint FAILED", ALERT_TITLE_PREFIX,
+ application.getJobName()))
+ .subject(
+ String.format(
+ "%s %s, checkPoint is Failed", ALERT_SUBJECT_PREFIX,
+ application.getJobName()))
+ .build();
+ }
+
+ public static AlertTemplate createAlertTemplate(FlinkCluster cluster,
ClusterState clusterState) {
+ return AlertTemplate.builder()
+ .duration(cluster.getStartTime(), cluster.getEndTime())
+ .jobName(cluster.getClusterName())
+ .link(cluster.getFlinkExecutionModeEnum(), cluster.getClusterId())
+ .startTime(cluster.getStartTime())
+ .endTime(cluster.getEndTime())
+ .type(3)
+ .title(
+ String.format(
+ "%s %s %s", ALERT_TITLE_PREFIX, cluster.getClusterName(),
+ clusterState.name()))
+ .subject(
+ String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
cluster.getClusterName(),
+ clusterState))
+ .status(clusterState.name())
+ .allJobs(cluster.getAllJobs())
+ .affectedJobs(cluster.getAffectedJobs())
+ .build();
+ }
+
+ public static AlertTemplate createAlertTemplate(AlertProbeMsg
alertProbeMsg) {
+ return AlertTemplate.builder()
+ .type(AlertTypeEnum.WE_COM.getCode())
+ .user(alertProbeMsg.getUser())
+ .probeJobs(alertProbeMsg.getProbeJobs())
+ .failedJobs(alertProbeMsg.getFailedJobs())
+ .lostJobs(alertProbeMsg.getLostJobs())
+ .cancelledJobs(alertProbeMsg.getCancelledJobs())
+ .subject(String.format("%s %s", ALERT_SUBJECT_PREFIX, PROBE))
+ .title(PROBE)
+ .build();
+ }
+
+ public static AlertTemplate createAlertTemplate(SparkApplication
application, SparkAppStateEnum appState) {
+ return AlertTemplate.builder()
+ .duration(application.getStartTime(), application.getEndTime())
+ .jobName(application.getJobName())
+ .link(application.getSparkExecutionMode(), application.getJobId())
+ .startTime(application.getStartTime())
+ .endTime(application.getEndTime())
+ .restart(application.isNeedRestartOnFailed(),
application.getRestartCount())
+ .restartIndex(application.getRestartCount())
+ .totalRestart(application.getRestartSize())
+ .type(AlertTypeEnum.EMAIL.getCode())
+ .title(
+ String.format(
+ "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(),
appState.name()))
+ .subject(
+ String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
application.getJobName(), appState))
+ .status(appState.name())
+ .build();
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 556138cae..6d64133b6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -26,6 +26,7 @@ import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.component.FlinkCheckpointProcessor;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkStateChangeEvent;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
@@ -40,6 +41,7 @@ import
org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.config.RequestConfig;
@@ -49,8 +51,6 @@ import
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import lombok.Getter;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -66,7 +66,6 @@ import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -152,7 +151,7 @@ public class FlinkAppHttpWatcher {
private static final Cache<Long, Byte> CANCELING_CACHE =
Caffeine.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS).build();
- private static final Cache<Long, StateChangeEvent> PREVIOUS_STATUS =
Caffeine.newBuilder()
+ private static final Cache<Long, FlinkStateChangeEvent> PREVIOUS_STATUS =
Caffeine.newBuilder()
.expireAfterWrite(24, TimeUnit.HOURS).build();
/**
@@ -314,7 +313,7 @@ public class FlinkAppHttpWatcher {
if (optional.isPresent()) {
JobsOverview.Job jobOverview = optional.get();
- FlinkAppStateEnum currentState =
FlinkAppStateEnum.of(jobOverview.getState());
+ FlinkAppStateEnum currentState =
FlinkAppStateEnum.getState(jobOverview.getState());
if (!FlinkAppStateEnum.OTHER.equals(currentState)) {
try {
@@ -453,8 +452,8 @@ public class FlinkAppHttpWatcher {
WATCHING_APPS.put(appId, application);
}
- StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
- StateChangeEvent nowEvent = StateChangeEvent.of(application);
+ FlinkStateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
+ FlinkStateChangeEvent nowEvent = createStateChangeEvent(application);
if (!nowEvent.equals(event)) {
PREVIOUS_STATUS.put(appId, nowEvent);
applicationManageService.persistMetrics(application);
@@ -546,7 +545,7 @@ public class FlinkAppHttpWatcher {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo != null) {
String state = yarnAppInfo.getApp().getFinalStatus();
- flinkAppState = FlinkAppStateEnum.of(state);
+ flinkAppState = FlinkAppStateEnum.getState(state);
}
} finally {
if (StopFromEnum.NONE.equals(stopFrom)) {
@@ -575,7 +574,7 @@ public class FlinkAppHttpWatcher {
} else {
try {
String state = yarnAppInfo.getApp().getFinalStatus();
- FlinkAppStateEnum flinkAppState =
FlinkAppStateEnum.of(state);
+ FlinkAppStateEnum flinkAppState =
FlinkAppStateEnum.getState(state);
if (FlinkAppStateEnum.OTHER.equals(flinkAppState)) {
return;
}
@@ -617,7 +616,7 @@ public class FlinkAppHttpWatcher {
}
private void doAlert(Application application, FlinkAppStateEnum
flinkAppState) {
- AlertTemplate alertTemplate = AlertTemplate.of(application,
flinkAppState);
+ AlertTemplate alertTemplate =
AlertTemplateUtils.createAlertTemplate(application, flinkAppState);
alertService.alert(application.getAlertId(), alertTemplate);
}
@@ -630,7 +629,7 @@ public class FlinkAppHttpWatcher {
public void cleanSavepoint(Application application) {
application.setOptionState(OptionStateEnum.NONE.getValue());
- StateChangeEvent event =
PREVIOUS_STATUS.getIfPresent(application.getId());
+ FlinkStateChangeEvent event =
PREVIOUS_STATUS.getIfPresent(application.getId());
if (event != null && event.getOptionState() ==
OptionStateEnum.SAVEPOINTING) {
doPersistMetrics(application, false);
}
@@ -666,7 +665,7 @@ public class FlinkAppHttpWatcher {
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
// update to PREVIOUS_STATUS
- StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
+ FlinkStateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
if (event != null) {
event.setOptionState(OptionStateEnum.SAVEPOINTING);
PREVIOUS_STATUS.put(appId, event);
@@ -836,45 +835,13 @@ public class FlinkAppHttpWatcher {
R call(T e) throws Exception;
}
- @Getter
- @Setter
- static class StateChangeEvent {
-
- private Long id;
- private String jobId;
- private FlinkAppStateEnum appState;
- private OptionStateEnum optionState;
- private String jobManagerUrl;
-
- @Override
- public boolean equals(Object object) {
- if (this == object) {
- return true;
- }
- if (object == null || getClass() != object.getClass()) {
- return false;
- }
- StateChangeEvent that = (StateChangeEvent) object;
- return Objects.equals(id, that.id)
- && Objects.equals(jobId, that.jobId)
- && Objects.equals(appState, that.appState)
- && Objects.equals(optionState, that.optionState)
- && Objects.equals(jobManagerUrl, that.jobManagerUrl);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, jobId, appState, optionState,
jobManagerUrl);
- }
-
- public static StateChangeEvent of(Application application) {
- StateChangeEvent event = new StateChangeEvent();
- event.setId(application.getId());
-
event.setOptionState(OptionStateEnum.of(application.getOptionState()));
- event.setAppState(application.getStateEnum());
- event.setJobId(application.getJobId());
- event.setJobManagerUrl(application.getJobManagerUrl());
- return event;
- }
+ public static FlinkStateChangeEvent createStateChangeEvent(Application
application) {
+ FlinkStateChangeEvent event = new FlinkStateChangeEvent();
+ event.setId(application.getId());
+
event.setOptionState(OptionStateEnum.getState(application.getOptionState()));
+ event.setAppState(application.getStateEnum());
+ event.setJobId(application.getJobId());
+ event.setJobManagerUrl(application.getJobManagerUrl());
+ return event;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
index c6b2ef1f5..d9528f56e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
@@ -25,13 +25,13 @@ import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -138,7 +138,7 @@ public class FlinkClusterWatcher {
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
cluster.setClusterState(state.getState());
cluster.setEndTime(new Date());
- alertService.alert(cluster.getAlertId(), AlertTemplate.of(cluster,
state));
+ alertService.alert(cluster.getAlertId(),
AlertTemplateUtils.createAlertTemplate(cluster, state));
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
index 879916051..e56e282ac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
@@ -18,7 +18,6 @@
package org.apache.streampark.console.core.watcher;
import org.apache.streampark.common.enums.FlinkExecutionMode;
-import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.component.FlinkCheckpointProcessor;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
@@ -26,6 +25,7 @@ import
org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
import
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent;
@@ -104,7 +104,8 @@ public class FlinkK8sChangeEventListener {
|| FlinkAppStateEnum.LOST == state
|| FlinkAppStateEnum.RESTARTING == state
|| FlinkAppStateEnum.FINISHED == state) {
- executor.execute(() -> alertService.alert(app.getAlertId(),
AlertTemplate.of(app, state)));
+ executor.execute(
+ () -> alertService.alert(app.getAlertId(),
AlertTemplateUtils.createAlertTemplate(app, state)));
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index c7e447cd2..86b541407 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -33,6 +33,7 @@ import
org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hc.core5.util.Timeout;
@@ -383,7 +384,7 @@ public class SparkAppHttpWatcher {
* @param appState spark application state
*/
private void doAlert(SparkApplication application, SparkAppStateEnum
appState) {
- AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
+ AlertTemplate alertTemplate =
AlertTemplateUtils.createAlertTemplate(application, appState);
alertService.alert(application.getAlertId(), alertTemplate);
}
}