This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4390d0693 [Refactor] Refactor alert module. (#3023)
4390d0693 is described below
commit 4390d0693e7afe3a2b26241639005b6ffed4a5f7
Author: gongzhongqiang <[email protected]>
AuthorDate: Sun Sep 10 11:01:43 2023 +0800
[Refactor] Refactor alert module. (#3023)
* [Refactor] Refactor alert module.
* address comment
---
.../console/core/controller/AlertController.java | 11 +-
.../console/core/entity/Application.java | 2 +-
.../console/core/entity/FlinkCluster.java | 2 +-
.../streampark/console/core/enums/AlertType.java | 33 +++--
.../core/service/alert/AlertNotifyService.java | 12 ++
.../console/core/service/alert/AlertService.java | 23 ++--
.../service/alert/impl/AlertConfigServiceImpl.java | 2 +-
.../core/service/alert/impl/AlertServiceImpl.java | 142 +++++++--------------
.../alert/impl/DingTalkAlertNotifyServiceImpl.java | 11 +-
.../alert/impl/EmailAlertNotifyServiceImpl.java | 14 +-
.../impl/HttpCallbackAlertNotifyServiceImpl.java | 4 +-
.../alert/impl/LarkAlertNotifyServiceImpl.java | 9 +-
.../alert/impl/WeComAlertNotifyServiceImpl.java | 10 +-
.../console/core/task/CheckpointProcessor.java | 4 +-
.../console/core/task/FlinkClusterWatcher.java | 15 ++-
.../console/core/task/FlinkHttpWatcher.java | 39 ++++--
.../core/task/FlinkK8sChangeEventListener.java | 3 +-
.../core/service/alert/AlertServiceTest.java | 7 +-
18 files changed, 146 insertions(+), 197 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
index 04e6bbdb6..66182a7a6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
@@ -32,8 +32,8 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
@@ -51,13 +51,14 @@ import java.util.TimeZone;
@Tag(name = "ALERT_TAG")
@Slf4j
@Validated
+@RequiredArgsConstructor
@RestController
@RequestMapping("/flink/alert")
public class AlertController {
- @Autowired private AlertConfigService alertConfigService;
+ private final AlertConfigService alertConfigService;
- @Autowired private AlertService alertService;
+ private final AlertService alertService;
@Operation(summary = "Create alert config")
@PostMapping(value = "/add")
@@ -126,8 +127,6 @@ public class AlertController {
DateUtils.format(date, DateUtils.fullFormat(), TimeZone.getDefault()));
alertTemplate.setEndTime(DateUtils.format(date, DateUtils.fullFormat(),
TimeZone.getDefault()));
alertTemplate.setDuration("");
- boolean alert =
-
alertService.alert(AlertConfigParams.of(alertConfigService.getById(id)),
alertTemplate);
- return RestResponse.success(alert);
+ return RestResponse.success(alertService.alert(id, alertTemplate));
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index b12218169..acdb140a9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -122,7 +122,7 @@ public class Application implements Serializable {
/** alert id */
@TableField(updateStrategy = FieldStrategy.IGNORED)
- private Integer alertId;
+ private Long alertId;
private String args;
/** application module */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 23a7ec726..81b97be00 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -110,7 +110,7 @@ public class FlinkCluster implements Serializable {
private Date endTime;
@TableField(updateStrategy = FieldStrategy.IGNORED)
- private Integer alertId;
+ private Long alertId;
private transient Integer allJobs = 0;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/AlertType.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/AlertType.java
index 205379030..b63c8495d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/AlertType.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/AlertType.java
@@ -17,7 +17,15 @@
package org.apache.streampark.console.core.enums;
+import org.apache.streampark.console.core.service.alert.AlertNotifyService;
+import
org.apache.streampark.console.core.service.alert.impl.DingTalkAlertNotifyServiceImpl;
+import
org.apache.streampark.console.core.service.alert.impl.EmailAlertNotifyServiceImpl;
+import
org.apache.streampark.console.core.service.alert.impl.HttpCallbackAlertNotifyServiceImpl;
+import
org.apache.streampark.console.core.service.alert.impl.LarkAlertNotifyServiceImpl;
+import
org.apache.streampark.console.core.service.alert.impl.WeComAlertNotifyServiceImpl;
+
import com.fasterxml.jackson.annotation.JsonValue;
+import lombok.Getter;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
@@ -27,27 +35,32 @@ import java.util.List;
import java.util.Map;
/** The AlertType enum represents different types of alerts that can be used
for notifications. */
+@Getter
public enum AlertType {
+
/** Email */
- EMAIL(1),
+ EMAIL(1, EmailAlertNotifyServiceImpl.class),
/** Ding talk */
- DING_TALK(2),
+ DING_TALK(2, DingTalkAlertNotifyServiceImpl.class),
/** WeChat work */
- WE_COM(4),
+ WE_COM(4, WeComAlertNotifyServiceImpl.class),
/** Http callback */
- HTTP_CALLBACK(8),
+ HTTP_CALLBACK(8, HttpCallbackAlertNotifyServiceImpl.class),
/** Lark */
- LARK(16);
+ LARK(16, LarkAlertNotifyServiceImpl.class);
/** The empty level */
private static final Integer EMPTY_LEVEL = 0;
/** Get the alert type by the code */
- private final Integer code;
+ @JsonValue private final Integer code;
+
+ /** Holds the reference to a Class object. */
+ private final Class<? extends AlertNotifyService> clazz;
/** A cache map used to quickly get the alert type from an integer code */
private static final Map<Integer, AlertType> CACHE_MAP = createCacheMap();
@@ -60,13 +73,9 @@ public enum AlertType {
return Collections.unmodifiableMap(map);
}
- AlertType(Integer code) {
+ AlertType(Integer code, Class<? extends AlertNotifyService> clazz) {
this.code = code;
- }
-
- @JsonValue
- public int getCode() {
- return this.code;
+ this.clazz = clazz;
}
public static List<AlertType> decode(Integer level) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertNotifyService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertNotifyService.java
index fb401e168..a8ea01eed 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertNotifyService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertNotifyService.java
@@ -21,7 +21,19 @@ import
org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.core.bean.AlertConfigParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
+/**
+ * This interface defines a service for sending alert notifications, it has
multiple
+ * implementations.
+ */
public interface AlertNotifyService {
+ /**
+ * Performs an alert with the given alert configuration parameters and alert
template.
+ *
+ * @param alertConfig alert configuration parameters.
+ * @param template alert template to use.
+ * @return true if the alert was successfully triggered, false otherwise.
+ * @throws AlertException if an error occurs while performing the alert.
+ */
boolean doAlert(AlertConfigParams alertConfig, AlertTemplate template)
throws AlertException;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
index 59ab1dd02..7880ee23f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
@@ -17,22 +17,17 @@
package org.apache.streampark.console.core.service.alert;
-import org.apache.streampark.common.enums.ClusterState;
-import org.apache.streampark.console.base.exception.AlertException;
-import org.apache.streampark.console.core.bean.AlertConfigParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.FlinkCluster;
-import org.apache.streampark.console.core.enums.CheckPointStatus;
-import org.apache.streampark.console.core.enums.FlinkAppState;
+/** The AlertService interface represents a service for sending alert. */
public interface AlertService {
- void alert(Application application, CheckPointStatus checkPointStatus);
-
- void alert(Application application, FlinkAppState appState);
-
- void alert(FlinkCluster flinkCluster, ClusterState clusterState);
-
- boolean alert(AlertConfigParams params, AlertTemplate alertTemplate) throws
AlertException;
+ /**
+ * Sends an alert based on the given alert configuration ID and alert
template.
+ *
+ * @param alertConfigId the ID of the alert configuration
+ * @param alertTemplate the alert template to use for generating the alert
content
+ * @return true if the alert is sent successfully, false otherwise
+ */
+ boolean alert(Long alertConfigId, AlertTemplate alertTemplate);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
index feb07b490..2b6c29879 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
@@ -52,7 +52,7 @@ public class AlertConfigServiceImpl extends
ServiceImpl<AlertConfigMapper, Alert
@Override
public IPage<AlertConfigParams> page(AlertConfigParams params, RestRequest
request) {
// build query conditions
- LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper();
+ LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(params.getUserId() != null, AlertConfig::getUserId,
params.getUserId());
Page<AlertConfig> page = new
MybatisPager<AlertConfig>().getDefaultPage(request);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index 4ad4f1fe0..dfaac9873 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -17,26 +17,19 @@
package org.apache.streampark.console.core.service.alert.impl;
-import org.apache.streampark.common.enums.ClusterState;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.core.bean.AlertConfigParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.AlertConfig;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.AlertType;
-import org.apache.streampark.console.core.enums.CheckPointStatus;
-import org.apache.streampark.console.core.enums.FlinkAppState;
import org.apache.streampark.console.core.service.alert.AlertConfigService;
-import org.apache.streampark.console.core.service.alert.AlertNotifyService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.flink.api.java.tuple.Tuple2;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@@ -44,100 +37,63 @@ import java.util.List;
@Slf4j
@Service
+@RequiredArgsConstructor
public class AlertServiceImpl implements AlertService {
- @Autowired private AlertConfigService alertConfigService;
- @Override
- public void alert(Application application, CheckPointStatus
checkPointStatus) {
- AlertTemplate alertTemplate = AlertTemplate.of(application,
checkPointStatus);
- alert(application.getAlertId(), alertTemplate);
- }
-
- @Override
- public void alert(Application application, FlinkAppState appState) {
- AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
- alert(application.getAlertId(), alertTemplate);
- }
+ private final AlertConfigService alertConfigService;
@Override
- public void alert(FlinkCluster flinkCluster, ClusterState clusterState) {
- AlertTemplate alertTemplate = AlertTemplate.of(flinkCluster, clusterState);
- alert(flinkCluster.getAlertId(), alertTemplate);
- }
+ public boolean alert(Long alertConfigId, AlertTemplate alertTemplate) {
- private void alert(Integer alertId, AlertTemplate alertTemplate) {
- if (alertId == null) {
- return;
+ if (alertConfigId == null) {
+ log.warn("alertConfigId is null");
+ return false;
}
- AlertConfig alertConfig = alertConfigService.getById(alertId);
+ AlertConfig alertConfig = alertConfigService.getById(alertConfigId);
try {
- alert(AlertConfigParams.of(alertConfig), alertTemplate);
+ AlertConfigParams params = AlertConfigParams.of(alertConfig);
+ List<AlertType> alertTypes = AlertType.decode(params.getAlertType());
+ if (CollectionUtils.isEmpty(alertTypes)) {
+ return true;
+ }
+ // No use thread pool, ensure that the alarm can be sent successfully
+ Tuple2<Boolean, AlertException> reduce =
+ alertTypes.stream()
+ .map(
+ alertType -> {
+ try {
+ boolean alertRes =
+ SpringContextUtils.getBean(alertType.getClazz())
+ .doAlert(params, alertTemplate);
+ return new Tuple2<Boolean, AlertException>(alertRes,
null);
+ } catch (AlertException e) {
+ return new Tuple2<>(false, e);
+ }
+ })
+ .reduce(
+ new Tuple2<>(true, null),
+ (tp1, tp2) -> {
+ boolean alertResult = tp1.f0 & tp2.f0;
+ if (tp1.f1 == null && tp2.f1 == null) {
+ return new Tuple2<>(tp1.f0 & tp2.f0, null);
+ }
+ if (tp1.f1 != null && tp2.f1 != null) {
+ // merge multiple exception, and keep the details of the
first exception
+ AlertException alertException =
+ new AlertException(
+ tp1.f1.getMessage() + "\n" +
tp2.f1.getMessage(), tp1.f1);
+ return new Tuple2<>(alertResult, alertException);
+ }
+ return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 :
tp1.f1);
+ });
+ if (reduce.f1 != null) {
+ throw reduce.f1;
+ }
+
+ return reduce.f0;
} catch (Exception e) {
log.error(e.getMessage(), e);
}
- }
-
- @Override
- public boolean alert(AlertConfigParams params, AlertTemplate alertTemplate)
- throws AlertException {
- List<AlertType> alertTypes = AlertType.decode(params.getAlertType());
- if (CollectionUtils.isEmpty(alertTypes)) {
- return true;
- }
- // No use thread pool, ensure that the alarm can be sent successfully
- Tuple2<Boolean, AlertException> reduce =
- alertTypes.stream()
- .map(
- alertType -> {
- try {
- Class<? extends AlertNotifyService> notifyServiceClass =
- getAlertServiceImpl(alertType);
- Utils.notNull(notifyServiceClass);
- boolean alertRes =
- SpringContextUtils.getBean(notifyServiceClass)
- .doAlert(params, alertTemplate);
- return new Tuple2<Boolean, AlertException>(alertRes, null);
- } catch (AlertException e) {
- return new Tuple2<>(false, e);
- }
- })
- .reduce(
- new Tuple2<>(true, null),
- (tp1, tp2) -> {
- boolean alertResult = tp1.f0 & tp2.f0;
- if (tp1.f1 == null && tp2.f1 == null) {
- return new Tuple2<>(tp1.f0 & tp2.f0, null);
- }
- if (tp1.f1 != null && tp2.f1 != null) {
- // merge multiple exception, and keep the details of the
first exception
- AlertException alertException =
- new AlertException(
- tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(),
tp1.f1);
- return new Tuple2<>(alertResult, alertException);
- }
- return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 :
tp1.f1);
- });
- if (reduce.f1 != null) {
- throw reduce.f1;
- }
-
- return reduce.f0;
- }
-
- private Class<? extends AlertNotifyService> getAlertServiceImpl(AlertType
alertType) {
- switch (alertType) {
- case EMAIL:
- return EmailAlertNotifyServiceImpl.class;
- case DING_TALK:
- return DingTalkAlertNotifyServiceImpl.class;
- case WE_COM:
- return WeComAlertNotifyServiceImpl.class;
- case LARK:
- return LarkAlertNotifyServiceImpl.class;
- case HTTP_CALLBACK:
- return HttpCallbackAlertNotifyServiceImpl.class;
- default:
- return null;
- }
+ return false;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
index 539910437..cd4a39332 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
@@ -37,7 +37,6 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-import javax.annotation.PostConstruct;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
@@ -55,7 +54,7 @@ import java.util.StringJoiner;
@Service
@Lazy
public class DingTalkAlertNotifyServiceImpl implements AlertNotifyService {
- private Template template;
+ private final Template template =
FreemarkerUtils.loadTemplateFile("alert-dingTalk.ftl");
private final RestTemplate alertRestTemplate;
@@ -63,12 +62,6 @@ public class DingTalkAlertNotifyServiceImpl implements
AlertNotifyService {
this.alertRestTemplate = alertRestTemplate;
}
- @PostConstruct
- public void loadTemplateFile() throws Exception {
- String template = "alert-dingTalk.ftl";
- this.template = FreemarkerUtils.loadTemplateFile(template);
- }
-
@Override
public boolean doAlert(AlertConfigParams alertConfig, AlertTemplate
alertTemplate)
throws AlertException {
@@ -81,7 +74,7 @@ public class DingTalkAlertNotifyServiceImpl implements
AlertNotifyService {
Collections.addAll(contactList, contacts.split(","));
}
String title = alertTemplate.getTitle();
- if (contactList.size() > 0) {
+ if (!contactList.isEmpty()) {
StringJoiner joiner = new StringJoiner(",@", title + " @", "");
contactList.forEach(joiner::add);
title = joiner.toString();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/EmailAlertNotifyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/EmailAlertNotifyServiceImpl.java
index 6a597beea..9f32ea37e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/EmailAlertNotifyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/EmailAlertNotifyServiceImpl.java
@@ -22,20 +22,16 @@ import
org.apache.streampark.console.base.util.FreemarkerUtils;
import org.apache.streampark.console.core.bean.AlertConfigParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.bean.EmailConfig;
-import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.alert.AlertNotifyService;
import org.apache.commons.mail.HtmlEmail;
import freemarker.template.Template;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
-import javax.annotation.PostConstruct;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -45,15 +41,7 @@ import java.util.Optional;
@Lazy
public class EmailAlertNotifyServiceImpl implements AlertNotifyService {
- private Template template;
-
- @Autowired private SettingService settingService;
-
- @PostConstruct
- public void loadTemplateFile() throws Exception {
- String template = "alert-email.ftl";
- this.template = FreemarkerUtils.loadTemplateFile(template);
- }
+ private final Template template =
FreemarkerUtils.loadTemplateFile("alert-email.ftl");
@Override
public boolean doAlert(AlertConfigParams alertConfig, AlertTemplate template)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
index a71c2c4ab..976152d57 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
@@ -74,7 +74,7 @@ public class HttpCallbackAlertNotifyServiceImpl implements
AlertNotifyService {
}
}
- private Object sendMessage(AlertHttpCallbackParams params, Map<String,
Object> body)
+ private void sendMessage(AlertHttpCallbackParams params, Map<String, Object>
body)
throws AlertException {
String url = params.getUrl();
HttpHeaders headers = new HttpHeaders();
@@ -118,7 +118,5 @@ public class HttpCallbackAlertNotifyServiceImpl implements
AlertNotifyService {
if (response == null) {
throw new AlertException(String.format("Failed to request httpCallback
alert,\nurl:%s", url));
}
-
- return response;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
index a0025c7ef..8bea1f67e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
@@ -37,7 +37,6 @@ import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
-import javax.annotation.PostConstruct;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
@@ -50,7 +49,7 @@ import java.util.Map;
@Service
@Lazy
public class LarkAlertNotifyServiceImpl implements AlertNotifyService {
- private Template template;
+ private final Template template =
FreemarkerUtils.loadTemplateFile("alert-lark.ftl");
private final RestTemplate alertRestTemplate;
private final ObjectMapper mapper;
@@ -62,12 +61,6 @@ public class LarkAlertNotifyServiceImpl implements
AlertNotifyService {
this.mapper = mapper;
}
- @PostConstruct
- public void loadTemplateFile() {
- String template = "alert-lark.ftl";
- this.template = FreemarkerUtils.loadTemplateFile(template);
- }
-
@Override
public boolean doAlert(AlertConfigParams alertConfig, AlertTemplate
alertTemplate)
throws AlertException {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/WeComAlertNotifyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/WeComAlertNotifyServiceImpl.java
index 350cf6092..b0354618c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/WeComAlertNotifyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/WeComAlertNotifyServiceImpl.java
@@ -34,8 +34,6 @@ import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
-import javax.annotation.PostConstruct;
-
import java.util.HashMap;
import java.util.Map;
@@ -43,7 +41,7 @@ import java.util.Map;
@Service
@Lazy
public class WeComAlertNotifyServiceImpl implements AlertNotifyService {
- private Template template;
+ private final Template template =
FreemarkerUtils.loadTemplateFile("alert-weCom.ftl");
private final RestTemplate alertRestTemplate;
@@ -51,12 +49,6 @@ public class WeComAlertNotifyServiceImpl implements
AlertNotifyService {
this.alertRestTemplate = alertRestTemplate;
}
- @PostConstruct
- public void loadTemplateFile() throws Exception {
- String template = "alert-weCom.ftl";
- this.template = FreemarkerUtils.loadTemplateFile(template);
- }
-
@Override
public boolean doAlert(AlertConfigParams alertConfig, AlertTemplate
alertTemplate)
throws AlertException {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index eba7041b2..02d4f8ca1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.task;
+import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.enums.CheckPointStatus;
@@ -110,7 +111,8 @@ public class CheckpointProcessor {
}
switch (failoverStrategy) {
case ALERT:
- alertService.alert(application, CheckPointStatus.FAILED);
+ alertService.alert(
+ application.getAlertId(), AlertTemplate.of(application,
CheckPointStatus.FAILED));
break;
case RESTART:
try {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index f14424877..5924aa815 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -26,6 +26,7 @@ import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
@@ -138,15 +139,15 @@ public class FlinkClusterWatcher {
cluster.getId(),
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
cluster.setClusterState(state.getValue());
cluster.setEndTime(new Date());
- alertService.alert(cluster, state);
+ alertService.alert(cluster.getAlertId(), AlertTemplate.of(cluster,
state));
}
}
/**
- * cluster get state from flink or yarn api
+ * Retrieves the state of a cluster from the Flink or YARN API.
*
- * @param flinkCluster
- * @return
+ * @param flinkCluster The FlinkCluster object representing the cluster.
+ * @return The ClusterState object representing the state of the cluster.
*/
public ClusterState getClusterState(FlinkCluster flinkCluster) {
ClusterState state = FAILED_STATES.getIfPresent(flinkCluster.getId());
@@ -164,10 +165,10 @@ public class FlinkClusterWatcher {
}
/**
- * get remote cluster state
+ * Retrieves the state of a cluster from the Flink or YARN API using the
remote HTTP endpoint.
*
- * @param flinkCluster
- * @return
+ * @param flinkCluster The FlinkCluster object representing the cluster.
+ * @return The ClusterState object representing the state of the cluster.
*/
private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
return getStateFromFlinkRestApi(flinkCluster);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index ec15c3bda..b7a69acd5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -22,6 +22,7 @@ import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -268,10 +269,11 @@ public class FlinkHttpWatcher {
}
/**
- * Get the current task running status information from flink restapi
+ * Get the current task running status information from Flink rest api.
*
- * @param application application
- * @param stopFrom stopFrom
+ * @param application The application for which to retrieve the information
+ * @param stopFrom The stop source from which the method was called
+ * @throws Exception if an error occurs while retrieving the information
from the Flink REST API
*/
private void getFromFlinkRestApi(Application application, StopFrom stopFrom)
throws Exception {
JobsOverview jobsOverview = httpJobsOverview(application);
@@ -778,20 +780,31 @@ public class FlinkHttpWatcher {
}
/**
- * The situation of abnormal operation alarm is as follows: When the job
running mode is yarn per
- * job or yarn application, when the job is abnormal, an alarm will be
triggered directly; The job
- * running mode is yarn session or reome: a. If the flink cluster is not
configured with an alarm
- * information, it will directly alarm when the job is abnormal. b. If the
flink cluster is
- * configured with alarm information: if the abnormal behavior of the job is
caused by an
- * abnormality in the flink cluster, block the alarm of the job and wait for
the flink cluster
- * alarm; If the abnormal behavior of the job is caused by itself and the
flink cluster is running
- * normally, the job will an alarm
+ * Describes the alarming behavior under abnormal operation for different
job running modes:
+ *
+ * <p>- <strong>yarn per job</strong> or <strong>yarn application</strong>
+ *
+ * <p>Directly triggers an alarm when the job encounters an abnormal
condition.<br>
+ *
+ * <p>- <strong>yarn session</strong> or <strong>remote</strong>
+ *
+ * <p>If the Flink cluster configuration lacks alarm information, it
triggers an alarm directly
+ * when the job is abnormal.<br>
+ * If the Flink cluster configuration has alarm information:
+ *
+ * <p>When the job is abnormal due to an issue in the Flink cluster, the
job's alarm will be held
+ * back, instead waiting for the Flink cluster's alarm.<br>
+ * When the job is abnormal due to the job itself and the Flink cluster is
running normally, an
+ * alarm specific to the job will be triggered.
+ *
+ * @param app application
+ * @param appState application state
*/
private void doAlert(Application app, FlinkAppState appState) {
switch (app.getExecutionModeEnum()) {
case YARN_APPLICATION:
case YARN_PER_JOB:
- alertService.alert(app, appState);
+ alertService.alert(app.getAlertId(), AlertTemplate.of(app, appState));
return;
case YARN_SESSION:
case REMOTE:
@@ -801,7 +814,7 @@ public class FlinkHttpWatcher {
"application with id {} is yarn session or remote and flink
cluster with id {} is alive, application send alert",
app.getId(),
app.getFlinkClusterId());
- alertService.alert(app, appState);
+ alertService.alert(app.getAlertId(), AlertTemplate.of(app,
appState));
}
break;
default:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 7f7972632..5da80337b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.task;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.FlinkAppState;
import org.apache.streampark.console.core.enums.OptionState;
@@ -99,7 +100,7 @@ public class FlinkK8sChangeEventListener {
|| FlinkAppState.LOST.equals(state)
|| FlinkAppState.RESTARTING.equals(state)
|| FlinkAppState.FINISHED.equals(state)) {
- executor.execute(() -> alertService.alert(app, state));
+ executor.execute(() -> alertService.alert(app.getAlertId(),
AlertTemplate.of(app, state)));
}
}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
index 7b580b556..54e592c46 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
@@ -117,7 +117,6 @@ class AlertServiceTest {
void testDingTalkAlert() throws Exception {
DingTalkAlertNotifyServiceImpl notifyService = new
DingTalkAlertNotifyServiceImpl(restTemplate);
- notifyService.loadTemplateFile();
AlertDingTalkParams dingTalkParams = new AlertDingTalkParams();
dingTalkParams.setToken("your_token");
dingTalkParams.setContacts("175xxxx1234");
@@ -132,7 +131,6 @@ class AlertServiceTest {
@Test
void testWeComAlert() throws Exception {
WeComAlertNotifyServiceImpl notifyService = new
WeComAlertNotifyServiceImpl(restTemplate);
- notifyService.loadTemplateFile();
AlertWeComParams weComParams = new AlertWeComParams();
weComParams.setToken("your_token");
@@ -146,7 +144,6 @@ class AlertServiceTest {
@Test
void testLarkAlert() {
LarkAlertNotifyServiceImpl notifyService = new
LarkAlertNotifyServiceImpl(restTemplate, mapper);
- notifyService.loadTemplateFile();
AlertLarkParams alertLarkParams = new AlertLarkParams();
alertLarkParams.setToken("your_token");
@@ -163,7 +160,7 @@ class AlertServiceTest {
application.setStartTime(new Date());
application.setJobName("Test My Job");
application.setAppId("1234567890");
- application.setAlertId(1);
+ application.setAlertId(1L);
application.setRestartCount(5);
application.setRestartSize(100);
@@ -193,7 +190,7 @@ class AlertServiceTest {
String.format("StreamPark Alert: %s %s", application.getJobName(),
appState.name());
sendEmail(subject, html, "****@domain.com");
} catch (Exception e) {
- e.printStackTrace();
+ log.error("Failed to send email alert", e);
}
}