This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4390d0693 [Refactor] Refactor alert module. (#3023)
4390d0693 is described below

commit 4390d0693e7afe3a2b26241639005b6ffed4a5f7
Author: gongzhongqiang <[email protected]>
AuthorDate: Sun Sep 10 11:01:43 2023 +0800

    [Refactor] Refactor alert module. (#3023)
    
    * [Refactor] Refactor alert module.
    
    * address comment
---
 .../console/core/controller/AlertController.java   |  11 +-
 .../console/core/entity/Application.java           |   2 +-
 .../console/core/entity/FlinkCluster.java          |   2 +-
 .../streampark/console/core/enums/AlertType.java   |  33 +++--
 .../core/service/alert/AlertNotifyService.java     |  12 ++
 .../console/core/service/alert/AlertService.java   |  23 ++--
 .../service/alert/impl/AlertConfigServiceImpl.java |   2 +-
 .../core/service/alert/impl/AlertServiceImpl.java  | 142 +++++++--------------
 .../alert/impl/DingTalkAlertNotifyServiceImpl.java |  11 +-
 .../alert/impl/EmailAlertNotifyServiceImpl.java    |  14 +-
 .../impl/HttpCallbackAlertNotifyServiceImpl.java   |   4 +-
 .../alert/impl/LarkAlertNotifyServiceImpl.java     |   9 +-
 .../alert/impl/WeComAlertNotifyServiceImpl.java    |  10 +-
 .../console/core/task/CheckpointProcessor.java     |   4 +-
 .../console/core/task/FlinkClusterWatcher.java     |  15 ++-
 .../console/core/task/FlinkHttpWatcher.java        |  39 ++++--
 .../core/task/FlinkK8sChangeEventListener.java     |   3 +-
 .../core/service/alert/AlertServiceTest.java       |   7 +-
 18 files changed, 146 insertions(+), 197 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
index 04e6bbdb6..66182a7a6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
@@ -32,8 +32,8 @@ import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.Parameters;
 import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -51,13 +51,14 @@ import java.util.TimeZone;
 @Tag(name = "ALERT_TAG")
 @Slf4j
 @Validated
+@RequiredArgsConstructor
 @RestController
 @RequestMapping("/flink/alert")
 public class AlertController {
 
-  @Autowired private AlertConfigService alertConfigService;
+  private final AlertConfigService alertConfigService;
 
-  @Autowired private AlertService alertService;
+  private final AlertService alertService;
 
   @Operation(summary = "Create alert config")
   @PostMapping(value = "/add")
@@ -126,8 +127,6 @@ public class AlertController {
         DateUtils.format(date, DateUtils.fullFormat(), TimeZone.getDefault()));
     alertTemplate.setEndTime(DateUtils.format(date, DateUtils.fullFormat(), 
TimeZone.getDefault()));
     alertTemplate.setDuration("");
-    boolean alert =
-        
alertService.alert(AlertConfigParams.of(alertConfigService.getById(id)), 
alertTemplate);
-    return RestResponse.success(alert);
+    return RestResponse.success(alertService.alert(id, alertTemplate));
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index b12218169..acdb140a9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -122,7 +122,7 @@ public class Application implements Serializable {
 
   /** alert id */
   @TableField(updateStrategy = FieldStrategy.IGNORED)
-  private Integer alertId;
+  private Long alertId;
 
   private String args;
   /** application module */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 23a7ec726..81b97be00 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -110,7 +110,7 @@ public class FlinkCluster implements Serializable {
   private Date endTime;
 
   @TableField(updateStrategy = FieldStrategy.IGNORED)
-  private Integer alertId;
+  private Long alertId;
 
   private transient Integer allJobs = 0;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/AlertType.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/AlertType.java
index 205379030..b63c8495d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/AlertType.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/AlertType.java
@@ -17,7 +17,15 @@
 
 package org.apache.streampark.console.core.enums;
 
+import org.apache.streampark.console.core.service.alert.AlertNotifyService;
+import 
org.apache.streampark.console.core.service.alert.impl.DingTalkAlertNotifyServiceImpl;
+import 
org.apache.streampark.console.core.service.alert.impl.EmailAlertNotifyServiceImpl;
+import 
org.apache.streampark.console.core.service.alert.impl.HttpCallbackAlertNotifyServiceImpl;
+import 
org.apache.streampark.console.core.service.alert.impl.LarkAlertNotifyServiceImpl;
+import 
org.apache.streampark.console.core.service.alert.impl.WeComAlertNotifyServiceImpl;
+
 import com.fasterxml.jackson.annotation.JsonValue;
+import lombok.Getter;
 import org.springframework.util.CollectionUtils;
 
 import java.util.ArrayList;
@@ -27,27 +35,32 @@ import java.util.List;
 import java.util.Map;
 
 /** The AlertType enum represents different types of alerts that can be used 
for notifications. */
+@Getter
 public enum AlertType {
+
   /** Email */
-  EMAIL(1),
+  EMAIL(1, EmailAlertNotifyServiceImpl.class),
 
   /** Ding talk */
-  DING_TALK(2),
+  DING_TALK(2, DingTalkAlertNotifyServiceImpl.class),
 
   /** WeChat work */
-  WE_COM(4),
+  WE_COM(4, WeComAlertNotifyServiceImpl.class),
 
   /** Http callback */
-  HTTP_CALLBACK(8),
+  HTTP_CALLBACK(8, HttpCallbackAlertNotifyServiceImpl.class),
 
   /** Lark */
-  LARK(16);
+  LARK(16, LarkAlertNotifyServiceImpl.class);
 
   /** The empty level */
   private static final Integer EMPTY_LEVEL = 0;
 
   /** Get the alert type by the code */
-  private final Integer code;
+  @JsonValue private final Integer code;
+
+  /** Holds the reference to a Class object. */
+  private final Class<? extends AlertNotifyService> clazz;
 
   /** A cache map used to quickly get the alert type from an integer code */
   private static final Map<Integer, AlertType> CACHE_MAP = createCacheMap();
@@ -60,13 +73,9 @@ public enum AlertType {
     return Collections.unmodifiableMap(map);
   }
 
-  AlertType(Integer code) {
+  AlertType(Integer code, Class<? extends AlertNotifyService> clazz) {
     this.code = code;
-  }
-
-  @JsonValue
-  public int getCode() {
-    return this.code;
+    this.clazz = clazz;
   }
 
   public static List<AlertType> decode(Integer level) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertNotifyService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertNotifyService.java
index fb401e168..a8ea01eed 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertNotifyService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertNotifyService.java
@@ -21,7 +21,19 @@ import 
org.apache.streampark.console.base.exception.AlertException;
 import org.apache.streampark.console.core.bean.AlertConfigParams;
 import org.apache.streampark.console.core.bean.AlertTemplate;
 
+/**
+ * This interface defines a service for sending alert notifications, it has 
multiple
+ * implementations.
+ */
 public interface AlertNotifyService {
 
+  /**
+   * Performs an alert with the given alert configuration parameters and alert 
template.
+   *
+   * @param alertConfig alert configuration parameters.
+   * @param template alert template to use.
+   * @return true if the alert was successfully triggered, false otherwise.
+   * @throws AlertException if an error occurs while performing the alert.
+   */
   boolean doAlert(AlertConfigParams alertConfig, AlertTemplate template) 
throws AlertException;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
index 59ab1dd02..7880ee23f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
@@ -17,22 +17,17 @@
 
 package org.apache.streampark.console.core.service.alert;
 
-import org.apache.streampark.common.enums.ClusterState;
-import org.apache.streampark.console.base.exception.AlertException;
-import org.apache.streampark.console.core.bean.AlertConfigParams;
 import org.apache.streampark.console.core.bean.AlertTemplate;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.FlinkCluster;
-import org.apache.streampark.console.core.enums.CheckPointStatus;
-import org.apache.streampark.console.core.enums.FlinkAppState;
 
+/** The AlertService interface represents a service for sending alert. */
 public interface AlertService {
 
-  void alert(Application application, CheckPointStatus checkPointStatus);
-
-  void alert(Application application, FlinkAppState appState);
-
-  void alert(FlinkCluster flinkCluster, ClusterState clusterState);
-
-  boolean alert(AlertConfigParams params, AlertTemplate alertTemplate) throws 
AlertException;
+  /**
+   * Sends an alert based on the given alert configuration ID and alert 
template.
+   *
+   * @param alertConfigId the ID of the alert configuration
+   * @param alertTemplate the alert template to use for generating the alert 
content
+   * @return true if the alert is sent successfully, false otherwise
+   */
+  boolean alert(Long alertConfigId, AlertTemplate alertTemplate);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
index feb07b490..2b6c29879 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
@@ -52,7 +52,7 @@ public class AlertConfigServiceImpl extends 
ServiceImpl<AlertConfigMapper, Alert
   @Override
   public IPage<AlertConfigParams> page(AlertConfigParams params, RestRequest 
request) {
     // build query conditions
-    LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper();
+    LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper<>();
     wrapper.eq(params.getUserId() != null, AlertConfig::getUserId, 
params.getUserId());
 
     Page<AlertConfig> page = new 
MybatisPager<AlertConfig>().getDefaultPage(request);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index 4ad4f1fe0..dfaac9873 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -17,26 +17,19 @@
 
 package org.apache.streampark.console.core.service.alert.impl;
 
-import org.apache.streampark.common.enums.ClusterState;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.AlertException;
 import org.apache.streampark.console.base.util.SpringContextUtils;
 import org.apache.streampark.console.core.bean.AlertConfigParams;
 import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.AlertConfig;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.enums.AlertType;
-import org.apache.streampark.console.core.enums.CheckPointStatus;
-import org.apache.streampark.console.core.enums.FlinkAppState;
 import org.apache.streampark.console.core.service.alert.AlertConfigService;
-import org.apache.streampark.console.core.service.alert.AlertNotifyService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 
@@ -44,100 +37,63 @@ import java.util.List;
 
 @Slf4j
 @Service
+@RequiredArgsConstructor
 public class AlertServiceImpl implements AlertService {
-  @Autowired private AlertConfigService alertConfigService;
 
-  @Override
-  public void alert(Application application, CheckPointStatus 
checkPointStatus) {
-    AlertTemplate alertTemplate = AlertTemplate.of(application, 
checkPointStatus);
-    alert(application.getAlertId(), alertTemplate);
-  }
-
-  @Override
-  public void alert(Application application, FlinkAppState appState) {
-    AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
-    alert(application.getAlertId(), alertTemplate);
-  }
+  private final AlertConfigService alertConfigService;
 
   @Override
-  public void alert(FlinkCluster flinkCluster, ClusterState clusterState) {
-    AlertTemplate alertTemplate = AlertTemplate.of(flinkCluster, clusterState);
-    alert(flinkCluster.getAlertId(), alertTemplate);
-  }
+  public boolean alert(Long alertConfigId, AlertTemplate alertTemplate) {
 
-  private void alert(Integer alertId, AlertTemplate alertTemplate) {
-    if (alertId == null) {
-      return;
+    if (alertConfigId == null) {
+      log.warn("alertConfigId is null");
+      return false;
     }
-    AlertConfig alertConfig = alertConfigService.getById(alertId);
+    AlertConfig alertConfig = alertConfigService.getById(alertConfigId);
     try {
-      alert(AlertConfigParams.of(alertConfig), alertTemplate);
+      AlertConfigParams params = AlertConfigParams.of(alertConfig);
+      List<AlertType> alertTypes = AlertType.decode(params.getAlertType());
+      if (CollectionUtils.isEmpty(alertTypes)) {
+        return true;
+      }
+      // No use thread pool, ensure that the alarm can be sent successfully
+      Tuple2<Boolean, AlertException> reduce =
+          alertTypes.stream()
+              .map(
+                  alertType -> {
+                    try {
+                      boolean alertRes =
+                          SpringContextUtils.getBean(alertType.getClazz())
+                              .doAlert(params, alertTemplate);
+                      return new Tuple2<Boolean, AlertException>(alertRes, 
null);
+                    } catch (AlertException e) {
+                      return new Tuple2<>(false, e);
+                    }
+                  })
+              .reduce(
+                  new Tuple2<>(true, null),
+                  (tp1, tp2) -> {
+                    boolean alertResult = tp1.f0 & tp2.f0;
+                    if (tp1.f1 == null && tp2.f1 == null) {
+                      return new Tuple2<>(tp1.f0 & tp2.f0, null);
+                    }
+                    if (tp1.f1 != null && tp2.f1 != null) {
+                      // merge multiple exception, and keep the details of the 
first exception
+                      AlertException alertException =
+                          new AlertException(
+                              tp1.f1.getMessage() + "\n" + 
tp2.f1.getMessage(), tp1.f1);
+                      return new Tuple2<>(alertResult, alertException);
+                    }
+                    return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : 
tp1.f1);
+                  });
+      if (reduce.f1 != null) {
+        throw reduce.f1;
+      }
+
+      return reduce.f0;
     } catch (Exception e) {
       log.error(e.getMessage(), e);
     }
-  }
-
-  @Override
-  public boolean alert(AlertConfigParams params, AlertTemplate alertTemplate)
-      throws AlertException {
-    List<AlertType> alertTypes = AlertType.decode(params.getAlertType());
-    if (CollectionUtils.isEmpty(alertTypes)) {
-      return true;
-    }
-    // No use thread pool, ensure that the alarm can be sent successfully
-    Tuple2<Boolean, AlertException> reduce =
-        alertTypes.stream()
-            .map(
-                alertType -> {
-                  try {
-                    Class<? extends AlertNotifyService> notifyServiceClass =
-                        getAlertServiceImpl(alertType);
-                    Utils.notNull(notifyServiceClass);
-                    boolean alertRes =
-                        SpringContextUtils.getBean(notifyServiceClass)
-                            .doAlert(params, alertTemplate);
-                    return new Tuple2<Boolean, AlertException>(alertRes, null);
-                  } catch (AlertException e) {
-                    return new Tuple2<>(false, e);
-                  }
-                })
-            .reduce(
-                new Tuple2<>(true, null),
-                (tp1, tp2) -> {
-                  boolean alertResult = tp1.f0 & tp2.f0;
-                  if (tp1.f1 == null && tp2.f1 == null) {
-                    return new Tuple2<>(tp1.f0 & tp2.f0, null);
-                  }
-                  if (tp1.f1 != null && tp2.f1 != null) {
-                    // merge multiple exception, and keep the details of the 
first exception
-                    AlertException alertException =
-                        new AlertException(
-                            tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(), 
tp1.f1);
-                    return new Tuple2<>(alertResult, alertException);
-                  }
-                  return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : 
tp1.f1);
-                });
-    if (reduce.f1 != null) {
-      throw reduce.f1;
-    }
-
-    return reduce.f0;
-  }
-
-  private Class<? extends AlertNotifyService> getAlertServiceImpl(AlertType 
alertType) {
-    switch (alertType) {
-      case EMAIL:
-        return EmailAlertNotifyServiceImpl.class;
-      case DING_TALK:
-        return DingTalkAlertNotifyServiceImpl.class;
-      case WE_COM:
-        return WeComAlertNotifyServiceImpl.class;
-      case LARK:
-        return LarkAlertNotifyServiceImpl.class;
-      case HTTP_CALLBACK:
-        return HttpCallbackAlertNotifyServiceImpl.class;
-      default:
-        return null;
-    }
+    return false;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
index 539910437..cd4a39332 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
@@ -37,7 +37,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
 
-import javax.annotation.PostConstruct;
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 
@@ -55,7 +54,7 @@ import java.util.StringJoiner;
 @Service
 @Lazy
 public class DingTalkAlertNotifyServiceImpl implements AlertNotifyService {
-  private Template template;
+  private final Template template = 
FreemarkerUtils.loadTemplateFile("alert-dingTalk.ftl");
 
   private final RestTemplate alertRestTemplate;
 
@@ -63,12 +62,6 @@ public class DingTalkAlertNotifyServiceImpl implements 
AlertNotifyService {
     this.alertRestTemplate = alertRestTemplate;
   }
 
-  @PostConstruct
-  public void loadTemplateFile() throws Exception {
-    String template = "alert-dingTalk.ftl";
-    this.template = FreemarkerUtils.loadTemplateFile(template);
-  }
-
   @Override
   public boolean doAlert(AlertConfigParams alertConfig, AlertTemplate 
alertTemplate)
       throws AlertException {
@@ -81,7 +74,7 @@ public class DingTalkAlertNotifyServiceImpl implements 
AlertNotifyService {
         Collections.addAll(contactList, contacts.split(","));
       }
       String title = alertTemplate.getTitle();
-      if (contactList.size() > 0) {
+      if (!contactList.isEmpty()) {
         StringJoiner joiner = new StringJoiner(",@", title + " @", "");
         contactList.forEach(joiner::add);
         title = joiner.toString();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/EmailAlertNotifyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/EmailAlertNotifyServiceImpl.java
index 6a597beea..9f32ea37e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/EmailAlertNotifyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/EmailAlertNotifyServiceImpl.java
@@ -22,20 +22,16 @@ import 
org.apache.streampark.console.base.util.FreemarkerUtils;
 import org.apache.streampark.console.core.bean.AlertConfigParams;
 import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.bean.EmailConfig;
-import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.alert.AlertNotifyService;
 
 import org.apache.commons.mail.HtmlEmail;
 
 import freemarker.template.Template;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 
-import javax.annotation.PostConstruct;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -45,15 +41,7 @@ import java.util.Optional;
 @Lazy
 public class EmailAlertNotifyServiceImpl implements AlertNotifyService {
 
-  private Template template;
-
-  @Autowired private SettingService settingService;
-
-  @PostConstruct
-  public void loadTemplateFile() throws Exception {
-    String template = "alert-email.ftl";
-    this.template = FreemarkerUtils.loadTemplateFile(template);
-  }
+  private final Template template = 
FreemarkerUtils.loadTemplateFile("alert-email.ftl");
 
   @Override
   public boolean doAlert(AlertConfigParams alertConfig, AlertTemplate template)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
index a71c2c4ab..976152d57 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
@@ -74,7 +74,7 @@ public class HttpCallbackAlertNotifyServiceImpl implements 
AlertNotifyService {
     }
   }
 
-  private Object sendMessage(AlertHttpCallbackParams params, Map<String, 
Object> body)
+  private void sendMessage(AlertHttpCallbackParams params, Map<String, Object> 
body)
       throws AlertException {
     String url = params.getUrl();
     HttpHeaders headers = new HttpHeaders();
@@ -118,7 +118,5 @@ public class HttpCallbackAlertNotifyServiceImpl implements 
AlertNotifyService {
     if (response == null) {
       throw new AlertException(String.format("Failed to request httpCallback 
alert,\nurl:%s", url));
     }
-
-    return response;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
index a0025c7ef..8bea1f67e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
@@ -37,7 +37,6 @@ import org.springframework.http.MediaType;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
-import javax.annotation.PostConstruct;
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 
@@ -50,7 +49,7 @@ import java.util.Map;
 @Service
 @Lazy
 public class LarkAlertNotifyServiceImpl implements AlertNotifyService {
-  private Template template;
+  private final Template template = 
FreemarkerUtils.loadTemplateFile("alert-lark.ftl");
   private final RestTemplate alertRestTemplate;
   private final ObjectMapper mapper;
 
@@ -62,12 +61,6 @@ public class LarkAlertNotifyServiceImpl implements 
AlertNotifyService {
     this.mapper = mapper;
   }
 
-  @PostConstruct
-  public void loadTemplateFile() {
-    String template = "alert-lark.ftl";
-    this.template = FreemarkerUtils.loadTemplateFile(template);
-  }
-
   @Override
   public boolean doAlert(AlertConfigParams alertConfig, AlertTemplate 
alertTemplate)
       throws AlertException {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/WeComAlertNotifyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/WeComAlertNotifyServiceImpl.java
index 350cf6092..b0354618c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/WeComAlertNotifyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/WeComAlertNotifyServiceImpl.java
@@ -34,8 +34,6 @@ import org.springframework.http.MediaType;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
-import javax.annotation.PostConstruct;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -43,7 +41,7 @@ import java.util.Map;
 @Service
 @Lazy
 public class WeComAlertNotifyServiceImpl implements AlertNotifyService {
-  private Template template;
+  private final Template template = 
FreemarkerUtils.loadTemplateFile("alert-weCom.ftl");
 
   private final RestTemplate alertRestTemplate;
 
@@ -51,12 +49,6 @@ public class WeComAlertNotifyServiceImpl implements 
AlertNotifyService {
     this.alertRestTemplate = alertRestTemplate;
   }
 
-  @PostConstruct
-  public void loadTemplateFile() throws Exception {
-    String template = "alert-weCom.ftl";
-    this.template = FreemarkerUtils.loadTemplateFile(template);
-  }
-
   @Override
   public boolean doAlert(AlertConfigParams alertConfig, AlertTemplate 
alertTemplate)
       throws AlertException {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index eba7041b2..02d4f8ca1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.task;
 
+import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.SavePoint;
 import org.apache.streampark.console.core.enums.CheckPointStatus;
@@ -110,7 +111,8 @@ public class CheckpointProcessor {
           }
           switch (failoverStrategy) {
             case ALERT:
-              alertService.alert(application, CheckPointStatus.FAILED);
+              alertService.alert(
+                  application.getAlertId(), AlertTemplate.of(application, 
CheckPointStatus.FAILED));
               break;
             case RESTART:
               try {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index f14424877..5924aa815 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -26,6 +26,7 @@ import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.metrics.flink.Overview;
 import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
@@ -138,15 +139,15 @@ public class FlinkClusterWatcher {
               cluster.getId(), 
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
       cluster.setClusterState(state.getValue());
       cluster.setEndTime(new Date());
-      alertService.alert(cluster, state);
+      alertService.alert(cluster.getAlertId(), AlertTemplate.of(cluster, 
state));
     }
   }
 
   /**
-   * cluster get state from flink or yarn api
+   * Retrieves the state of a cluster from the Flink or YARN API.
    *
-   * @param flinkCluster
-   * @return
+   * @param flinkCluster The FlinkCluster object representing the cluster.
+   * @return The ClusterState object representing the state of the cluster.
    */
   public ClusterState getClusterState(FlinkCluster flinkCluster) {
     ClusterState state = FAILED_STATES.getIfPresent(flinkCluster.getId());
@@ -164,10 +165,10 @@ public class FlinkClusterWatcher {
   }
 
   /**
-   * get remote cluster state
+   * Retrieves the state of a cluster from the Flink or YARN API using the 
remote HTTP endpoint.
    *
-   * @param flinkCluster
-   * @return
+   * @param flinkCluster The FlinkCluster object representing the cluster.
+   * @return The ClusterState object representing the state of the cluster.
    */
   private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
     return getStateFromFlinkRestApi(flinkCluster);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index ec15c3bda..b7a69acd5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -22,6 +22,7 @@ import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -268,10 +269,11 @@ public class FlinkHttpWatcher {
   }
 
   /**
-   * Get the current task running status information from flink restapi
+   * Get the current task running status information from Flink rest api.
    *
-   * @param application application
-   * @param stopFrom stopFrom
+   * @param application The application for which to retrieve the information
+   * @param stopFrom The stop source from which the method was called
+   * @throws Exception if an error occurs while retrieving the information 
from the Flink REST API
    */
   private void getFromFlinkRestApi(Application application, StopFrom stopFrom) 
throws Exception {
     JobsOverview jobsOverview = httpJobsOverview(application);
@@ -778,20 +780,31 @@ public class FlinkHttpWatcher {
   }
 
   /**
-   * The situation of abnormal operation alarm is as follows: When the job 
running mode is yarn per
-   * job or yarn application, when the job is abnormal, an alarm will be 
triggered directly; The job
-   * running mode is yarn session or reome: a. If the flink cluster is not 
configured with an alarm
-   * information, it will directly alarm when the job is abnormal. b. If the 
flink cluster is
-   * configured with alarm information: if the abnormal behavior of the job is 
caused by an
-   * abnormality in the flink cluster, block the alarm of the job and wait for 
the flink cluster
-   * alarm; If the abnormal behavior of the job is caused by itself and the 
flink cluster is running
-   * normally, the job will an alarm
+   * Describes the alarming behavior under abnormal operation for different 
job running modes:
+   *
+   * <p>- <strong>yarn per job</strong> or <strong>yarn application</strong>
+   *
+   * <p>Directly triggers an alarm when the job encounters an abnormal 
condition.<br>
+   *
+   * <p>- <strong>yarn session</strong> or <strong>remote</strong>
+   *
+   * <p>If the Flink cluster configuration lacks alarm information, it 
triggers an alarm directly
+   * when the job is abnormal.<br>
+   * If the Flink cluster configuration has alarm information:
+   *
+   * <p>When the job is abnormal due to an issue in the Flink cluster, the 
job's alarm will be held
+   * back, instead waiting for the Flink cluster's alarm.<br>
+   * When the job is abnormal due to the job itself and the Flink cluster is 
running normally, an
+   * alarm specific to the job will be triggered.
+   *
+   * @param app application
+   * @param appState application state
    */
   private void doAlert(Application app, FlinkAppState appState) {
     switch (app.getExecutionModeEnum()) {
       case YARN_APPLICATION:
       case YARN_PER_JOB:
-        alertService.alert(app, appState);
+        alertService.alert(app.getAlertId(), AlertTemplate.of(app, appState));
         return;
       case YARN_SESSION:
       case REMOTE:
@@ -801,7 +814,7 @@ public class FlinkHttpWatcher {
               "application with id {} is yarn session or remote and flink 
cluster with id {} is alive, application send alert",
               app.getId(),
               app.getFlinkClusterId());
-          alertService.alert(app, appState);
+          alertService.alert(app.getAlertId(), AlertTemplate.of(app, 
appState));
         }
         break;
       default:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 7f7972632..5da80337b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.task;
 
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.enums.FlinkAppState;
 import org.apache.streampark.console.core.enums.OptionState;
@@ -99,7 +100,7 @@ public class FlinkK8sChangeEventListener {
         || FlinkAppState.LOST.equals(state)
         || FlinkAppState.RESTARTING.equals(state)
         || FlinkAppState.FINISHED.equals(state)) {
-      executor.execute(() -> alertService.alert(app, state));
+      executor.execute(() -> alertService.alert(app.getAlertId(), 
AlertTemplate.of(app, state)));
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
index 7b580b556..54e592c46 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
@@ -117,7 +117,6 @@ class AlertServiceTest {
   void testDingTalkAlert() throws Exception {
     DingTalkAlertNotifyServiceImpl notifyService = new 
DingTalkAlertNotifyServiceImpl(restTemplate);
 
-    notifyService.loadTemplateFile();
     AlertDingTalkParams dingTalkParams = new AlertDingTalkParams();
     dingTalkParams.setToken("your_token");
     dingTalkParams.setContacts("175xxxx1234");
@@ -132,7 +131,6 @@ class AlertServiceTest {
   @Test
   void testWeComAlert() throws Exception {
     WeComAlertNotifyServiceImpl notifyService = new 
WeComAlertNotifyServiceImpl(restTemplate);
-    notifyService.loadTemplateFile();
 
     AlertWeComParams weComParams = new AlertWeComParams();
     weComParams.setToken("your_token");
@@ -146,7 +144,6 @@ class AlertServiceTest {
   @Test
   void testLarkAlert() {
     LarkAlertNotifyServiceImpl notifyService = new 
LarkAlertNotifyServiceImpl(restTemplate, mapper);
-    notifyService.loadTemplateFile();
 
     AlertLarkParams alertLarkParams = new AlertLarkParams();
     alertLarkParams.setToken("your_token");
@@ -163,7 +160,7 @@ class AlertServiceTest {
     application.setStartTime(new Date());
     application.setJobName("Test My Job");
     application.setAppId("1234567890");
-    application.setAlertId(1);
+    application.setAlertId(1L);
 
     application.setRestartCount(5);
     application.setRestartSize(100);
@@ -193,7 +190,7 @@ class AlertServiceTest {
           String.format("StreamPark Alert: %s %s", application.getJobName(), 
appState.name());
       sendEmail(subject, html, "****@domain.com");
     } catch (Exception e) {
-      e.printStackTrace();
+      log.error("Failed to send email alert", e);
     }
   }
 


Reply via email to