This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new b39d7e21b [Improvement] Some code optimization (#3909)
b39d7e21b is described below

commit b39d7e21b7c18737ae5fc1c462065668ee9bc6ef
Author: xiangzihao <[email protected]>
AuthorDate: Tue Jul 23 17:07:30 2024 +0800

    [Improvement] Some code optimization (#3909)
    
    * some code optimization
---
 .../console/core/bean/AlertTemplate.java           | 265 +++------------------
 .../core/component/FlinkCheckpointProcessor.java   |   5 +-
 .../console/core/entity/Application.java           |   2 +-
 .../console/core/entity/FlinkStateChangeEvent.java |  59 +++++
 .../console/core/enums/FlinkAppStateEnum.java      |  10 +-
 .../console/core/enums/OptionStateEnum.java        |   2 +-
 .../console/core/utils/AlertTemplateUtils.java     | 138 +++++++++++
 .../console/core/watcher/FlinkAppHttpWatcher.java  |  71 ++----
 .../console/core/watcher/FlinkClusterWatcher.java  |   4 +-
 .../core/watcher/FlinkK8sChangeEventListener.java  |   5 +-
 .../console/core/watcher/SparkAppHttpWatcher.java  |   3 +-
 11 files changed, 264 insertions(+), 300 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index ce24a1e9f..4b9f220f8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -17,25 +17,24 @@
 
 package org.apache.streampark.console.core.bean;
 
-import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.enums.SparkExecutionMode;
 import org.apache.streampark.common.util.DateUtils;
 import org.apache.streampark.common.util.YarnUtils;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.FlinkCluster;
-import org.apache.streampark.console.core.entity.SparkApplication;
-import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
-import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
-import org.apache.streampark.console.core.enums.SparkAppStateEnum;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
 import java.util.Date;
 import java.util.TimeZone;
 
 @Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class AlertTemplate implements Serializable {
 
     private String title;
@@ -52,7 +51,7 @@ public class AlertTemplate implements Serializable {
     private Boolean restart;
     private Integer restartIndex;
     private Integer totalRestart;
-    private boolean atAll = false;
+    private boolean atAll;
     private Integer allJobs;
     private Integer affectedJobs;
     private String user;
@@ -61,157 +60,22 @@ public class AlertTemplate implements Serializable {
     private Integer lostJobs;
     private Integer cancelledJobs;
 
-    private static final String ALERT_SUBJECT_PREFIX = "StreamPark Alert:";
+    public static class AlertTemplateBuilder {
 
-    private static final String ALERT_TITLE_PREFIX = "Notify:";
-
-    private static final String PROBE = "PROBE";
-
-    public static AlertTemplate of(Application application, FlinkAppStateEnum 
appState) {
-        return new AlertTemplateBuilder()
-            .setDuration(application.getStartTime(), application.getEndTime())
-            .setJobName(application.getJobName())
-            .setLink(application.getFlinkExecutionMode(), 
application.getClusterId())
-            .setStartTime(application.getStartTime())
-            .setEndTime(application.getEndTime())
-            .setRestart(application.isNeedRestartOnFailed(), 
application.getRestartCount())
-            .setRestartIndex(application.getRestartCount())
-            .setTotalRestart(application.getRestartSize())
-            .setType(1)
-            .setTitle(
-                String.format(
-                    "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(),
-                    appState.name()))
-            .setSubject(
-                String.format("%s %s %s", ALERT_SUBJECT_PREFIX, 
application.getJobName(),
-                    appState))
-            .setStatus(appState.name())
-            .build();
-    }
-
-    public static AlertTemplate of(Application application, 
CheckPointStatusEnum statusEnum) {
-        return new AlertTemplateBuilder()
-            .setDuration(application.getStartTime(), application.getEndTime())
-            .setJobName(application.getJobName())
-            .setLink(application.getFlinkExecutionMode(), 
application.getClusterId())
-            .setStartTime(application.getStartTime())
-            .setType(2)
-            .setCpFailureRateInterval(
-                DateUtils.toDuration(application.getCpFailureRateInterval() * 
1000 * 60))
-            .setCpMaxFailureInterval(application.getCpMaxFailureInterval())
-            .setTitle(
-                String.format("%s %s checkpoint FAILED", ALERT_TITLE_PREFIX,
-                    application.getJobName()))
-            .setSubject(
-                String.format(
-                    "%s %s, checkPoint is Failed", ALERT_SUBJECT_PREFIX,
-                    application.getJobName()))
-            .build();
-    }
-
-    public static AlertTemplate of(FlinkCluster cluster, ClusterState 
clusterState) {
-        return new AlertTemplateBuilder()
-            .setDuration(cluster.getStartTime(), cluster.getEndTime())
-            .setJobName(cluster.getClusterName())
-            .setLink(cluster.getFlinkExecutionModeEnum(), 
cluster.getClusterId())
-            .setStartTime(cluster.getStartTime())
-            .setEndTime(cluster.getEndTime())
-            .setType(3)
-            .setTitle(
-                String.format(
-                    "%s %s %s", ALERT_TITLE_PREFIX, cluster.getClusterName(),
-                    clusterState.name()))
-            .setSubject(
-                String.format("%s %s %s", ALERT_SUBJECT_PREFIX, 
cluster.getClusterName(),
-                    clusterState))
-            .setStatus(clusterState.name())
-            .setAllJobs(cluster.getAllJobs())
-            .setAffectedJobs(cluster.getAffectedJobs())
-            .build();
-    }
-
-    public static AlertTemplate of(AlertProbeMsg alertProbeMsg) {
-        return new AlertTemplateBuilder()
-            .setType(4)
-            .setUser(alertProbeMsg.getUser())
-            .setProbeJobs(alertProbeMsg.getProbeJobs())
-            .setFailedJobs(alertProbeMsg.getFailedJobs())
-            .setLostJobs(alertProbeMsg.getLostJobs())
-            .setCancelledJobs(alertProbeMsg.getCancelledJobs())
-            .setSubject(String.format("%s %s", ALERT_SUBJECT_PREFIX, PROBE))
-            .setTitle(PROBE)
-            .build();
-    }
-
-    public static AlertTemplate of(SparkApplication application, 
SparkAppStateEnum appState) {
-        return new AlertTemplateBuilder()
-            .setDuration(application.getStartTime(), application.getEndTime())
-            .setJobName(application.getJobName())
-            .setLink(application.getSparkExecutionMode(), 
application.getJobId())
-            .setStartTime(application.getStartTime())
-            .setEndTime(application.getEndTime())
-            .setRestart(application.isNeedRestartOnFailed(), 
application.getRestartCount())
-            .setRestartIndex(application.getRestartCount())
-            .setTotalRestart(application.getRestartSize())
-            .setType(1)
-            .setTitle(
-                String.format(
-                    "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(), 
appState.name()))
-            .setSubject(
-                String.format("%s %s %s", ALERT_SUBJECT_PREFIX, 
application.getJobName(), appState))
-            .setStatus(appState.name())
-            .build();
-    }
-    private static class AlertTemplateBuilder {
-
-        private final AlertTemplate alertTemplate = new AlertTemplate();
-
-        public AlertTemplateBuilder setTitle(String title) {
-            alertTemplate.setTitle(title);
+        public AlertTemplateBuilder startTime(Date startTime) {
+            this.startTime = DateUtils.format(startTime, 
DateUtils.fullFormat(), TimeZone.getDefault());
             return this;
         }
 
-        public AlertTemplateBuilder setSubject(String subject) {
-            alertTemplate.setSubject(subject);
+        public AlertTemplateBuilder endTime(Date endTime) {
+            this.endTime = DateUtils.format(
+                endTime == null ? new Date() : endTime,
+                DateUtils.fullFormat(),
+                TimeZone.getDefault());
             return this;
         }
 
-        public AlertTemplateBuilder setJobName(String jobName) {
-            alertTemplate.setJobName(jobName);
-            return this;
-        }
-
-        public AlertTemplateBuilder setType(Integer type) {
-            alertTemplate.setType(type);
-            return this;
-        }
-
-        public AlertTemplateBuilder setStatus(String status) {
-            alertTemplate.setStatus(status);
-            return this;
-        }
-
-        public AlertTemplateBuilder setStartTime(Date startTime) {
-            alertTemplate.setStartTime(
-                DateUtils.format(startTime, DateUtils.fullFormat(), 
TimeZone.getDefault()));
-            return this;
-        }
-
-        public AlertTemplateBuilder setEndTime(Date endTime) {
-            alertTemplate.setEndTime(
-                DateUtils.format(
-                    endTime == null ? new Date() : endTime,
-                    DateUtils.fullFormat(),
-                    TimeZone.getDefault()));
-            return this;
-        }
-
-        public AlertTemplateBuilder setDuration(String duration) {
-            alertTemplate.setDuration(duration);
-            return this;
-        }
-
-        public AlertTemplateBuilder setDuration(Date start, Date end) {
+        public AlertTemplateBuilder duration(Date start, Date end) {
             long duration;
             if (start == null && end == null) {
                 duration = 0L;
@@ -220,114 +84,47 @@ public class AlertTemplate implements Serializable {
             } else {
                 duration = end.getTime() - start.getTime();
             }
-            alertTemplate.setDuration(DateUtils.toDuration(duration));
+            this.duration = DateUtils.toDuration(duration);
             return this;
         }
 
-        public AlertTemplateBuilder setLink(String link) {
-            alertTemplate.setLink(link);
-            return this;
-        }
-
-        public AlertTemplateBuilder setLink(FlinkExecutionMode mode, String 
appId) {
+        public AlertTemplateBuilder link(FlinkExecutionMode mode, String 
appId) {
             if (FlinkExecutionMode.isYarnMode(mode)) {
                 String format = "%s/proxy/%s/";
-                String url = String.format(format, 
YarnUtils.getRMWebAppURL(false), appId);
-                alertTemplate.setLink(url);
+                this.link = String.format(format, 
YarnUtils.getRMWebAppURL(false), appId);
             } else {
-                alertTemplate.setLink(null);
+                this.link = null;
             }
             return this;
         }
 
-        public AlertTemplateBuilder setLink(SparkExecutionMode mode, String 
appId) {
+        public AlertTemplateBuilder link(SparkExecutionMode mode, String 
appId) {
             if (SparkExecutionMode.isYarnMode(mode)) {
                 String format = "%s/proxy/%s/";
-                String url = String.format(format, 
YarnUtils.getRMWebAppURL(false), appId);
-                alertTemplate.setLink(url);
+                this.link = String.format(format, 
YarnUtils.getRMWebAppURL(false), appId);
             } else {
-                alertTemplate.setLink(null);
+                this.link = null;
             }
             return this;
         }
 
-        public AlertTemplateBuilder setCpFailureRateInterval(String 
cpFailureRateInterval) {
-            alertTemplate.setCpFailureRateInterval(cpFailureRateInterval);
+        public AlertTemplateBuilder restart(Boolean needRestartOnFailed, 
Integer restartCount) {
+            this.restart = needRestartOnFailed && restartCount > 0;
             return this;
         }
 
-        public AlertTemplateBuilder setCpMaxFailureInterval(Integer 
cpMaxFailureInterval) {
-            alertTemplate.setCpMaxFailureInterval(cpMaxFailureInterval);
-            return this;
-        }
-
-        public AlertTemplateBuilder setRestart(Boolean restart) {
-            alertTemplate.setRestart(restart);
-            return this;
-        }
-
-        public AlertTemplateBuilder setRestart(Boolean needRestartOnFailed, 
Integer restartCount) {
-            boolean needRestart = needRestartOnFailed && restartCount > 0;
-            alertTemplate.setRestart(needRestart);
-            return this;
-        }
-
-        public AlertTemplateBuilder setRestartIndex(Integer restartIndex) {
-            if (alertTemplate.getRestart()) {
-                alertTemplate.setRestartIndex(restartIndex);
+        public AlertTemplateBuilder restartIndex(Integer restartIndex) {
+            if (this.restart) {
+                this.restartIndex = restartIndex;
             }
             return this;
         }
 
-        public AlertTemplateBuilder setTotalRestart(Integer totalRestart) {
-            if (alertTemplate.getRestart()) {
-                alertTemplate.setTotalRestart(totalRestart);
+        public AlertTemplateBuilder totalRestart(Integer totalRestart) {
+            if (this.restart) {
+                this.totalRestart = totalRestart;
             }
             return this;
         }
-
-        public AlertTemplateBuilder setAtAll(Boolean atAll) {
-            alertTemplate.setAtAll(atAll);
-            return this;
-        }
-
-        public AlertTemplateBuilder setAllJobs(Integer allJobs) {
-            alertTemplate.setAllJobs(allJobs);
-            return this;
-        }
-
-        public AlertTemplateBuilder setAffectedJobs(Integer affectedJobs) {
-            alertTemplate.setAffectedJobs(affectedJobs);
-            return this;
-        }
-
-        public AlertTemplateBuilder setUser(String user) {
-            alertTemplate.setUser(user);
-            return this;
-        }
-
-        public AlertTemplateBuilder setProbeJobs(Integer probeJobs) {
-            alertTemplate.setProbeJobs(probeJobs);
-            return this;
-        }
-
-        public AlertTemplateBuilder setFailedJobs(Integer failedJobs) {
-            alertTemplate.setFailedJobs(failedJobs);
-            return this;
-        }
-
-        public AlertTemplateBuilder setLostJobs(Integer lostJobs) {
-            alertTemplate.setLostJobs(lostJobs);
-            return this;
-        }
-
-        public AlertTemplateBuilder setCancelledJobs(Integer cancelledJobs) {
-            alertTemplate.setCancelledJobs(cancelledJobs);
-            return this;
-        }
-
-        public AlertTemplate build() {
-            return this.alertTemplate;
-        }
     }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
index 05e7863a1..8bad65b90 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.streampark.console.core.component;
 
 import org.apache.streampark.common.util.AssertUtils;
-import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.SavePoint;
 import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
@@ -27,6 +26,7 @@ import 
org.apache.streampark.console.core.metrics.flink.CheckPoints;
 import org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
 
 import com.github.benmanes.caffeine.cache.Cache;
@@ -134,7 +134,8 @@ public class FlinkCheckpointProcessor {
         switch (failoverStrategyEnum) {
             case ALERT:
                 alertService.alert(
-                    application.getAlertId(), AlertTemplate.of(application, 
CheckPointStatusEnum.FAILED));
+                    application.getAlertId(),
+                    AlertTemplateUtils.createAlertTemplate(application, 
CheckPointStatusEnum.FAILED));
                 break;
             case RESTART:
                 try {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 9997c6341..3af261839 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -348,7 +348,7 @@ public class Application implements Serializable {
 
     @JsonIgnore
     public FlinkAppStateEnum getStateEnum() {
-        return FlinkAppStateEnum.of(state);
+        return FlinkAppStateEnum.getState(state);
     }
 
     @JsonIgnore
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkStateChangeEvent.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkStateChangeEvent.java
new file mode 100644
index 000000000..64c9f3cdd
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkStateChangeEvent.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
+import org.apache.streampark.console.core.enums.OptionStateEnum;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Objects;
+
+@Getter
+@Setter
+public class FlinkStateChangeEvent {
+
+    private Long id;
+    private String jobId;
+    private FlinkAppStateEnum appState;
+    private OptionStateEnum optionState;
+    private String jobManagerUrl;
+
+    @Override
+    public boolean equals(Object object) {
+        if (this == object) {
+            return true;
+        }
+        if (object == null || getClass() != object.getClass()) {
+            return false;
+        }
+        FlinkStateChangeEvent that = (FlinkStateChangeEvent) object;
+        return Objects.equals(id, that.id)
+            && Objects.equals(jobId, that.jobId)
+            && Objects.equals(appState, that.appState)
+            && Objects.equals(optionState, that.optionState)
+            && Objects.equals(jobManagerUrl, that.jobManagerUrl);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, jobId, appState, optionState, jobManagerUrl);
+    }
+
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
index be46f30db..fb6730ec6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
@@ -109,7 +109,7 @@ public enum FlinkAppStateEnum {
         this.value = value;
     }
 
-    public static FlinkAppStateEnum of(Integer state) {
+    public static FlinkAppStateEnum getState(Integer state) {
         for (FlinkAppStateEnum appState : values()) {
             if (appState.value == state) {
                 return appState;
@@ -118,7 +118,7 @@ public enum FlinkAppStateEnum {
         return FlinkAppStateEnum.OTHER;
     }
 
-    public static FlinkAppStateEnum of(String name) {
+    public static FlinkAppStateEnum getState(String name) {
         for (FlinkAppStateEnum appState : values()) {
             if (appState.name().equals(name)) {
                 return appState;
@@ -128,7 +128,7 @@ public enum FlinkAppStateEnum {
     }
 
     public static boolean isEndState(Integer appState) {
-        FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.of(appState);
+        FlinkAppStateEnum flinkAppStateEnum = 
FlinkAppStateEnum.getState(appState);
         return FlinkAppStateEnum.CANCELED == flinkAppStateEnum
             || FlinkAppStateEnum.FAILED == flinkAppStateEnum
             || FlinkAppStateEnum.KILLED == flinkAppStateEnum
@@ -139,7 +139,7 @@ public enum FlinkAppStateEnum {
     }
 
     public static boolean isLost(Integer appState) {
-        FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.of(appState);
+        FlinkAppStateEnum flinkAppStateEnum = 
FlinkAppStateEnum.getState(appState);
         return FlinkAppStateEnum.LOST == flinkAppStateEnum;
     }
 
@@ -154,7 +154,7 @@ public enum FlinkAppStateEnum {
             if (FlinkJobState.K8S_INITIALIZING() == flinkJobState) {
                 return INITIALIZING;
             }
-            return of(flinkJobState.toString());
+            return getState(flinkJobState.toString());
         }
 
         /** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/OptionStateEnum.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/OptionStateEnum.java
index 0a6644b06..7230c9694 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/OptionStateEnum.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/OptionStateEnum.java
@@ -44,7 +44,7 @@ public enum OptionStateEnum {
         this.value = value;
     }
 
-    public static OptionStateEnum of(Integer state) {
+    public static OptionStateEnum getState(Integer state) {
         return Arrays.stream(values()).filter((x) -> x.value == 
state).findFirst().orElse(null);
     }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java
new file mode 100644
index 000000000..9ad9c0e33
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/AlertTemplateUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.utils;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.util.DateUtils;
+import org.apache.streampark.console.core.bean.AlertProbeMsg;
+import org.apache.streampark.console.core.bean.AlertTemplate;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.enums.AlertTypeEnum;
+import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
+import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class AlertTemplateUtils {
+
+    private static final String ALERT_SUBJECT_PREFIX = "StreamPark Alert:";
+
+    private static final String ALERT_TITLE_PREFIX = "Notify:";
+
+    private static final String PROBE = "PROBE";
+
+    public static AlertTemplate createAlertTemplate(Application application, 
FlinkAppStateEnum appState) {
+        return AlertTemplate.builder()
+            .duration(application.getStartTime(), application.getEndTime())
+            .jobName(application.getJobName())
+            .link(application.getFlinkExecutionMode(), 
application.getClusterId())
+            .startTime(application.getStartTime())
+            .endTime(application.getEndTime())
+            .restart(application.isNeedRestartOnFailed(), 
application.getRestartCount())
+            .restartIndex(application.getRestartCount())
+            .totalRestart(application.getRestartSize())
+            .type(AlertTypeEnum.EMAIL.getCode())
+            .title(
+                String.format(
+                    "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(),
+                    appState.name()))
+            .subject(
+                String.format("%s %s %s", ALERT_SUBJECT_PREFIX, 
application.getJobName(),
+                    appState))
+            .status(appState.name())
+            .build();
+    }
+
+    public static AlertTemplate createAlertTemplate(Application application, 
CheckPointStatusEnum statusEnum) {
+        return AlertTemplate.builder()
+            .duration(application.getStartTime(), application.getEndTime())
+            .jobName(application.getJobName())
+            .link(application.getFlinkExecutionMode(), 
application.getClusterId())
+            .startTime(application.getStartTime())
+            .type(AlertTypeEnum.DING_TALK.getCode())
+            .cpFailureRateInterval(
+                DateUtils.toDuration(application.getCpFailureRateInterval() * 
1000 * 60))
+            .cpMaxFailureInterval(application.getCpMaxFailureInterval())
+            .title(
+                String.format("%s %s checkpoint FAILED", ALERT_TITLE_PREFIX,
+                    application.getJobName()))
+            .subject(
+                String.format(
+                    "%s %s, checkPoint is Failed", ALERT_SUBJECT_PREFIX,
+                    application.getJobName()))
+            .build();
+    }
+
+    public static AlertTemplate createAlertTemplate(FlinkCluster cluster, 
ClusterState clusterState) {
+        return AlertTemplate.builder()
+            .duration(cluster.getStartTime(), cluster.getEndTime())
+            .jobName(cluster.getClusterName())
+            .link(cluster.getFlinkExecutionModeEnum(), cluster.getClusterId())
+            .startTime(cluster.getStartTime())
+            .endTime(cluster.getEndTime())
+            .type(3)
+            .title(
+                String.format(
+                    "%s %s %s", ALERT_TITLE_PREFIX, cluster.getClusterName(),
+                    clusterState.name()))
+            .subject(
+                String.format("%s %s %s", ALERT_SUBJECT_PREFIX, 
cluster.getClusterName(),
+                    clusterState))
+            .status(clusterState.name())
+            .allJobs(cluster.getAllJobs())
+            .affectedJobs(cluster.getAffectedJobs())
+            .build();
+    }
+
+    public static AlertTemplate createAlertTemplate(AlertProbeMsg 
alertProbeMsg) {
+        return AlertTemplate.builder()
+            .type(AlertTypeEnum.WE_COM.getCode())
+            .user(alertProbeMsg.getUser())
+            .probeJobs(alertProbeMsg.getProbeJobs())
+            .failedJobs(alertProbeMsg.getFailedJobs())
+            .lostJobs(alertProbeMsg.getLostJobs())
+            .cancelledJobs(alertProbeMsg.getCancelledJobs())
+            .subject(String.format("%s %s", ALERT_SUBJECT_PREFIX, PROBE))
+            .title(PROBE)
+            .build();
+    }
+
+    public static AlertTemplate createAlertTemplate(SparkApplication 
application, SparkAppStateEnum appState) {
+        return AlertTemplate.builder()
+            .duration(application.getStartTime(), application.getEndTime())
+            .jobName(application.getJobName())
+            .link(application.getSparkExecutionMode(), application.getJobId())
+            .startTime(application.getStartTime())
+            .endTime(application.getEndTime())
+            .restart(application.isNeedRestartOnFailed(), 
application.getRestartCount())
+            .restartIndex(application.getRestartCount())
+            .totalRestart(application.getRestartSize())
+            .type(AlertTypeEnum.EMAIL.getCode())
+            .title(
+                String.format(
+                    "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(), 
appState.name()))
+            .subject(
+                String.format("%s %s %s", ALERT_SUBJECT_PREFIX, 
application.getJobName(), appState))
+            .status(appState.name())
+            .build();
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 556138cae..6d64133b6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -26,6 +26,7 @@ import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.component.FlinkCheckpointProcessor;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkStateChangeEvent;
 import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
 import org.apache.streampark.console.core.enums.OptionStateEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
@@ -40,6 +41,7 @@ import 
org.apache.streampark.console.core.service.alert.AlertService;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.client5.http.config.RequestConfig;
@@ -49,8 +51,6 @@ import 
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import lombok.Getter;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -66,7 +66,6 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -152,7 +151,7 @@ public class FlinkAppHttpWatcher {
     private static final Cache<Long, Byte> CANCELING_CACHE = 
Caffeine.newBuilder()
         .expireAfterWrite(10, TimeUnit.SECONDS).build();
 
-    private static final Cache<Long, StateChangeEvent> PREVIOUS_STATUS = 
Caffeine.newBuilder()
+    private static final Cache<Long, FlinkStateChangeEvent> PREVIOUS_STATUS = 
Caffeine.newBuilder()
         .expireAfterWrite(24, TimeUnit.HOURS).build();
 
     /**
@@ -314,7 +313,7 @@ public class FlinkAppHttpWatcher {
 
         if (optional.isPresent()) {
             JobsOverview.Job jobOverview = optional.get();
-            FlinkAppStateEnum currentState = 
FlinkAppStateEnum.of(jobOverview.getState());
+            FlinkAppStateEnum currentState = 
FlinkAppStateEnum.getState(jobOverview.getState());
 
             if (!FlinkAppStateEnum.OTHER.equals(currentState)) {
                 try {
@@ -453,8 +452,8 @@ public class FlinkAppHttpWatcher {
             WATCHING_APPS.put(appId, application);
         }
 
-        StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
-        StateChangeEvent nowEvent = StateChangeEvent.of(application);
+        FlinkStateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
+        FlinkStateChangeEvent nowEvent = createStateChangeEvent(application);
         if (!nowEvent.equals(event)) {
             PREVIOUS_STATUS.put(appId, nowEvent);
             applicationManageService.persistMetrics(application);
@@ -546,7 +545,7 @@ public class FlinkAppHttpWatcher {
                 YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
                 if (yarnAppInfo != null) {
                     String state = yarnAppInfo.getApp().getFinalStatus();
-                    flinkAppState = FlinkAppStateEnum.of(state);
+                    flinkAppState = FlinkAppStateEnum.getState(state);
                 }
             } finally {
                 if (StopFromEnum.NONE.equals(stopFrom)) {
@@ -575,7 +574,7 @@ public class FlinkAppHttpWatcher {
             } else {
                 try {
                     String state = yarnAppInfo.getApp().getFinalStatus();
-                    FlinkAppStateEnum flinkAppState = 
FlinkAppStateEnum.of(state);
+                    FlinkAppStateEnum flinkAppState = 
FlinkAppStateEnum.getState(state);
                     if (FlinkAppStateEnum.OTHER.equals(flinkAppState)) {
                         return;
                     }
@@ -617,7 +616,7 @@ public class FlinkAppHttpWatcher {
     }
 
     private void doAlert(Application application, FlinkAppStateEnum 
flinkAppState) {
-        AlertTemplate alertTemplate = AlertTemplate.of(application, 
flinkAppState);
+        AlertTemplate alertTemplate = 
AlertTemplateUtils.createAlertTemplate(application, flinkAppState);
         alertService.alert(application.getAlertId(), alertTemplate);
     }
 
@@ -630,7 +629,7 @@ public class FlinkAppHttpWatcher {
 
     public void cleanSavepoint(Application application) {
         application.setOptionState(OptionStateEnum.NONE.getValue());
-        StateChangeEvent event = 
PREVIOUS_STATUS.getIfPresent(application.getId());
+        FlinkStateChangeEvent event = 
PREVIOUS_STATUS.getIfPresent(application.getId());
         if (event != null && event.getOptionState() == 
OptionStateEnum.SAVEPOINTING) {
             doPersistMetrics(application, false);
         }
@@ -666,7 +665,7 @@ public class FlinkAppHttpWatcher {
         SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
 
         // update to PREVIOUS_STATUS
-        StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
+        FlinkStateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
         if (event != null) {
             event.setOptionState(OptionStateEnum.SAVEPOINTING);
             PREVIOUS_STATUS.put(appId, event);
@@ -836,45 +835,13 @@ public class FlinkAppHttpWatcher {
         R call(T e) throws Exception;
     }
 
-    @Getter
-    @Setter
-    static class StateChangeEvent {
-
-        private Long id;
-        private String jobId;
-        private FlinkAppStateEnum appState;
-        private OptionStateEnum optionState;
-        private String jobManagerUrl;
-
-        @Override
-        public boolean equals(Object object) {
-            if (this == object) {
-                return true;
-            }
-            if (object == null || getClass() != object.getClass()) {
-                return false;
-            }
-            StateChangeEvent that = (StateChangeEvent) object;
-            return Objects.equals(id, that.id)
-                && Objects.equals(jobId, that.jobId)
-                && Objects.equals(appState, that.appState)
-                && Objects.equals(optionState, that.optionState)
-                && Objects.equals(jobManagerUrl, that.jobManagerUrl);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(id, jobId, appState, optionState, 
jobManagerUrl);
-        }
-
-        public static StateChangeEvent of(Application application) {
-            StateChangeEvent event = new StateChangeEvent();
-            event.setId(application.getId());
-            
event.setOptionState(OptionStateEnum.of(application.getOptionState()));
-            event.setAppState(application.getStateEnum());
-            event.setJobId(application.getJobId());
-            event.setJobManagerUrl(application.getJobManagerUrl());
-            return event;
-        }
+    public static FlinkStateChangeEvent createStateChangeEvent(Application 
application) {
+        FlinkStateChangeEvent event = new FlinkStateChangeEvent();
+        event.setId(application.getId());
+        
event.setOptionState(OptionStateEnum.getState(application.getOptionState()));
+        event.setAppState(application.getStateEnum());
+        event.setJobId(application.getJobId());
+        event.setJobManagerUrl(application.getJobManagerUrl());
+        return event;
     }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
index c6b2ef1f5..d9528f56e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
@@ -25,13 +25,13 @@ import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.metrics.flink.Overview;
 import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -138,7 +138,7 @@ public class FlinkClusterWatcher {
                     
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
             cluster.setClusterState(state.getState());
             cluster.setEndTime(new Date());
-            alertService.alert(cluster.getAlertId(), AlertTemplate.of(cluster, 
state));
+            alertService.alert(cluster.getAlertId(), 
AlertTemplateUtils.createAlertTemplate(cluster, state));
         }
     }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
index 879916051..e56e282ac 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
@@ -18,7 +18,6 @@
 package org.apache.streampark.console.core.watcher;
 
 import org.apache.streampark.common.enums.FlinkExecutionMode;
-import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.component.FlinkCheckpointProcessor;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
@@ -26,6 +25,7 @@ import 
org.apache.streampark.console.core.enums.OptionStateEnum;
 import org.apache.streampark.console.core.metrics.flink.CheckPoints;
 import org.apache.streampark.console.core.service.alert.AlertService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
 import 
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent;
@@ -104,7 +104,8 @@ public class FlinkK8sChangeEventListener {
             || FlinkAppStateEnum.LOST == state
             || FlinkAppStateEnum.RESTARTING == state
             || FlinkAppStateEnum.FINISHED == state) {
-            executor.execute(() -> alertService.alert(app.getAlertId(), 
AlertTemplate.of(app, state)));
+            executor.execute(
+                () -> alertService.alert(app.getAlertId(), 
AlertTemplateUtils.createAlertTemplate(app, state)));
         }
     }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index c7e447cd2..86b541407 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -33,6 +33,7 @@ import 
org.apache.streampark.console.core.service.alert.AlertService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
+import org.apache.streampark.console.core.utils.AlertTemplateUtils;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.hc.core5.util.Timeout;
@@ -383,7 +384,7 @@ public class SparkAppHttpWatcher {
      * @param appState spark application state
      */
     private void doAlert(SparkApplication application, SparkAppStateEnum 
appState) {
-        AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
+        AlertTemplate alertTemplate = 
AlertTemplateUtils.createAlertTemplate(application, appState);
         alertService.alert(application.getAlertId(), alertTemplate);
     }
 }


Reply via email to