This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 72207f43d [ISSUE-2423][Feature][WIP] flink cluster failure
alarm&failover (#2809)
72207f43d is described below
commit 72207f43d74bfd7eefc167effa26ebb33acb2726
Author: xujiangfeng001 <[email protected]>
AuthorDate: Sat Jul 1 17:42:36 2023 +0800
[ISSUE-2423][Feature][WIP] flink cluster failure alarm&failover (#2809)
* [Feature] flink cluster failure alarm&failover
---
.../streampark/common/enums/ClusterState.java | 4 +
.../main/assembly/script/schema/mysql-schema.sql | 3 +
.../main/assembly/script/schema/pgsql-schema.sql | 8 +-
.../main/assembly/script/upgrade/mysql/2.2.0.sql | 5 +-
.../main/assembly/script/upgrade/pgsql/2.2.0.sql | 5 +-
.../console/core/bean/AlertTemplate.java | 224 ++++++++++++++++-----
.../console/core/entity/FlinkCluster.java | 8 +
.../console/core/mapper/ApplicationMapper.java | 2 +
.../console/core/service/ApplicationService.java | 6 +-
.../console/core/service/alert/AlertService.java | 4 +
.../core/service/alert/impl/AlertServiceImpl.java | 15 +-
.../core/service/impl/ApplicationServiceImpl.java | 5 +
.../core/service/impl/FlinkClusterServiceImpl.java | 5 +
.../console/core/task/FlinkClusterWatcher.java | 64 +++++-
.../console/core/task/FlinkRESTAPIWatcher.java | 38 +++-
.../resources/alert-template/alert-dingTalk.ftl | 12 ++
.../main/resources/alert-template/alert-email.ftl | 54 +++++
.../main/resources/alert-template/alert-lark.ftl | 48 +++++
.../main/resources/alert-template/alert-weCom.ftl | 13 +-
.../src/main/resources/db/schema-h2.sql | 3 +
.../resources/mapper/core/ApplicationMapper.xml | 8 +
.../resources/mapper/core/FlinkClusterMapper.xml | 3 +
22 files changed, 462 insertions(+), 75 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index cefe347c7..decfc770a 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -75,4 +75,8 @@ public enum ClusterState implements Serializable {
public static boolean isLostState(ClusterState state) {
return LOST.equals(state);
}
+
+ public static boolean isUnknownState(ClusterState state) {
+ return UNKNOWN.equals(state);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 01d037191..7783e6c66 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -458,6 +458,9 @@ create table `t_flink_cluster` (
`exception` text comment 'exception information',
`cluster_state` tinyint default 0 comment 'cluster status (0: created but
not started, 1: started, 2: stopped)',
`create_time` datetime not null default current_timestamp comment 'create
time',
+ `start_time` datetime default null comment 'start time',
+ `end_time` datetime default null comment 'end time',
+ `alert_id` bigint default null comment 'alert id',
primary key (`id`,`cluster_name`),
unique key `id` (`cluster_id`,`address`,`execution_mode`)
) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 1e68b967b..b1b8d1b1d 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -290,7 +290,10 @@ create table "public"."t_flink_cluster" (
"resolve_order" int4,
"exception" text collate "pg_catalog"."default",
"cluster_state" int2 default 0,
- "create_time" timestamp(6) not null default timezone('UTC-8'::text,
(now())::timestamp(0) without time zone)
+ "create_time" timestamp(6) not null default timezone('UTC-8'::text,
(now())::timestamp(0) without time zone),
+ "start_time" timestamp(6),
+ "end_time" timestamp(6),
+ "alert_id" int8
)
;
comment on column "public"."t_flink_cluster"."address" is 'url address of
cluster';
@@ -309,6 +312,9 @@ comment on column
"public"."t_flink_cluster"."k8s_rest_exposed_type" is 'k8s exp
comment on column "public"."t_flink_cluster"."k8s_conf" is 'the path where the
k 8 s configuration file is located';
comment on column "public"."t_flink_cluster"."exception" is 'exception
information';
comment on column "public"."t_flink_cluster"."cluster_state" is 'cluster
status (0: create not started, 1: started, 2: stopped)';
+comment on column "public"."t_flink_cluster"."start_time" is 'cluster start
time';
+comment on column "public"."t_flink_cluster"."end_time" is 'cluster end time';
+comment on column "public"."t_flink_cluster"."alert_id" is 'alert id';
alter table "public"."t_flink_cluster" add constraint "t_flink_cluster_pkey"
primary key ("id", "cluster_name");
create index "id" on "public"."t_flink_cluster" using btree (
"cluster_id" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc
nulls last,
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 87406e9e8..81db6ac9b 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -44,7 +44,10 @@ alter table `t_flink_sql`
add column `team_resource` varchar(64) default null;
alter table `t_flink_cluster`
- add column `job_manager_url` varchar(150) default null comment 'url
address of jobmanager' after `address`;
+ add column `job_manager_url` varchar(150) default null comment 'url
address of jobmanager' after `address`,
+ add column `start_time` datetime default null comment 'start time',
+ add column `end_time` datetime default null comment 'end time',
+ add column `alert_id` bigint default null comment 'alert id';
-- menu level 2
insert into `t_menu` values (120400, 120000, 'menu.resource',
'/flink/resource', 'flink/resource/View', null, 'apartment', '0', 1, 3, now(),
now());
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index f2720b43f..9352cd563 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -57,7 +57,10 @@ alter table "public"."t_flink_sql"
add column "team_resource" varchar(64) default null;
alter table "public"."t_flink_cluster"
- add column "job_manager_url" varchar(150) collate "pg_catalog"."default";
+ add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
+ add column "start_time" timestamp(6) collate "pg_catalog"."default",
+ add column "end_time" timestamp(6) collate "pg_catalog"."default",
+ add column "alert_id" int8 collate "pg_catalog"."default";
insert into "public"."t_menu" values (120400, 120000, 'menu.resource',
'/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3,
now(), now());
insert into "public"."t_menu" values (110401, 110400, 'add', null, null,
'token:add', null, '1', '1', null, now(), now());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index fa6487707..b9a148349 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -17,10 +17,12 @@
package org.apache.streampark.console.core.bean;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.CheckPointStatus;
import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -47,61 +49,181 @@ public class AlertTemplate implements Serializable {
private Integer restartIndex;
private Integer totalRestart;
private boolean atAll = false;
-
- private static AlertTemplate of(Application application) {
- long duration;
- if (application.getEndTime() == null) {
- duration = System.currentTimeMillis() -
application.getStartTime().getTime();
- } else {
- duration = application.getEndTime().getTime() -
application.getStartTime().getTime();
- }
- AlertTemplate template = new AlertTemplate();
- template.setJobName(application.getJobName());
-
- if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- String format = "%s/proxy/%s/";
- String url = String.format(format, YarnUtils.getRMWebAppURL(false),
application.getAppId());
- template.setLink(url);
- } else {
- template.setLink(null);
- }
-
- template.setStartTime(
- DateUtils.format(
- application.getStartTime(), DateUtils.fullFormat(),
TimeZone.getDefault()));
- template.setEndTime(
- DateUtils.format(
- application.getEndTime() == null ? new Date() :
application.getEndTime(),
- DateUtils.fullFormat(),
- TimeZone.getDefault()));
- template.setDuration(DateUtils.toDuration(duration));
- boolean needRestart = application.isNeedRestartOnFailed() &&
application.getRestartCount() > 0;
- template.setRestart(needRestart);
- if (needRestart) {
- template.setRestartIndex(application.getRestartCount());
- template.setTotalRestart(application.getRestartSize());
- }
- return template;
- }
+ private Integer affectedJobs;
public static AlertTemplate of(Application application, FlinkAppState
appState) {
- AlertTemplate template = of(application);
- template.setType(1);
- template.setTitle(String.format("Notify: %s %s", application.getJobName(),
appState.name()));
- template.setSubject(String.format("StreamPark Alert: %s %s",
template.getJobName(), appState));
- template.setStatus(appState.name());
- return template;
+ return new AlertTemplateBuilder()
+ .setDuration(application.getStartTime(), application.getEndTime())
+ .setJobName(application.getJobName())
+ .setLink(application.getExecutionModeEnum(), application.getAppId())
+ .setStartTime(application.getStartTime())
+ .setEndTime(application.getEndTime())
+ .setRestart(application.isNeedRestartOnFailed(),
application.getRestartCount())
+ .setRestartIndex(application.getRestartCount())
+ .setTotalRestart(application.getRestartSize())
+ .setType(1)
+ .setTitle(String.format("Notify: %s %s", application.getJobName(),
appState.name()))
+ .setSubject(String.format("StreamPark Alert: %s %s",
application.getJobName(), appState))
+ .setStatus(appState.name())
+ .build();
}
public static AlertTemplate of(Application application, CheckPointStatus
checkPointStatus) {
- AlertTemplate template = of(application);
- template.setType(2);
- template.setCpFailureRateInterval(
- DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 *
60));
- template.setCpMaxFailureInterval(application.getCpMaxFailureInterval());
- template.setTitle(String.format("Notify: %s checkpoint FAILED",
application.getJobName()));
- template.setSubject(
- String.format("StreamPark Alert: %s, checkPoint is Failed",
template.getJobName()));
- return template;
+ return new AlertTemplateBuilder()
+ .setDuration(application.getStartTime(), application.getEndTime())
+ .setJobName(application.getJobName())
+ .setLink(application.getExecutionModeEnum(), application.getAppId())
+ .setStartTime(application.getStartTime())
+ .setType(2)
+ .setCpFailureRateInterval(
+ DateUtils.toDuration(application.getCpFailureRateInterval() * 1000
* 60))
+ .setCpMaxFailureInterval(application.getCpMaxFailureInterval())
+ .setTitle(String.format("Notify: %s checkpoint FAILED",
application.getJobName()))
+ .setSubject(
+ String.format("StreamPark Alert: %s, checkPoint is Failed",
application.getJobName()))
+ .build();
+ }
+
+ public static AlertTemplate of(FlinkCluster cluster, ClusterState
clusterState) {
+ return new AlertTemplateBuilder()
+ .setDuration(cluster.getStartTime(), cluster.getEndTime())
+ .setJobName(cluster.getClusterName())
+ .setLink(ExecutionMode.YARN_SESSION, cluster.getClusterId())
+ .setStartTime(cluster.getStartTime())
+ .setEndTime(cluster.getEndTime())
+ .setType(3)
+ .setTitle(String.format("Notify: %s %s", cluster.getClusterName(),
clusterState.name()))
+ .setSubject(
+ String.format("StreamPark Alert: %s %s", cluster.getClusterName(),
clusterState))
+ .setStatus(clusterState.name())
+ .setAffectedJobs(cluster.getJobs())
+ .build();
+ }
+
+ private static class AlertTemplateBuilder {
+ private AlertTemplate alertTemplate = new AlertTemplate();
+
+ public AlertTemplateBuilder setTitle(String title) {
+ alertTemplate.setTitle(title);
+ return this;
+ }
+
+ public AlertTemplateBuilder setSubject(String subject) {
+ alertTemplate.setSubject(subject);
+ return this;
+ }
+
+ public AlertTemplateBuilder setJobName(String jobName) {
+ alertTemplate.setJobName(jobName);
+ return this;
+ }
+
+ public AlertTemplateBuilder setType(Integer type) {
+ alertTemplate.setType(type);
+ return this;
+ }
+
+ public AlertTemplateBuilder setStatus(String status) {
+ alertTemplate.setStatus(status);
+ return this;
+ }
+
+ public AlertTemplateBuilder setStartTime(Date startTime) {
+ alertTemplate.setStartTime(
+ DateUtils.format(startTime, DateUtils.fullFormat(),
TimeZone.getDefault()));
+ return this;
+ }
+
+ public AlertTemplateBuilder setEndTime(Date endTime) {
+ alertTemplate.setEndTime(
+ DateUtils.format(
+ endTime == null ? new Date() : endTime,
+ DateUtils.fullFormat(),
+ TimeZone.getDefault()));
+ return this;
+ }
+
+ public AlertTemplateBuilder setDuration(String duration) {
+ alertTemplate.setDuration(duration);
+ return this;
+ }
+
+ public AlertTemplateBuilder setDuration(Date start, Date end) {
+ long duration;
+ if (start == null && end == null) {
+ duration = 0L;
+ } else if (end == null) {
+ duration = System.currentTimeMillis() - start.getTime();
+ } else {
+ duration = end.getTime() - start.getTime();
+ }
+ alertTemplate.setDuration(DateUtils.toDuration(duration));
+ return this;
+ }
+
+ public AlertTemplateBuilder setLink(String link) {
+ alertTemplate.setLink(link);
+ return this;
+ }
+
+ public AlertTemplateBuilder setLink(ExecutionMode mode, String appId) {
+ if (ExecutionMode.isYarnMode(mode)) {
+ String format = "%s/proxy/%s/";
+ String url = String.format(format, YarnUtils.getRMWebAppURL(false),
appId);
+ alertTemplate.setLink(url);
+ } else {
+ alertTemplate.setLink(null);
+ }
+ return this;
+ }
+
+ public AlertTemplateBuilder setCpFailureRateInterval(String
cpFailureRateInterval) {
+ alertTemplate.setCpFailureRateInterval(cpFailureRateInterval);
+ return this;
+ }
+
+ public AlertTemplateBuilder setCpMaxFailureInterval(Integer
cpMaxFailureInterval) {
+ alertTemplate.setCpMaxFailureInterval(cpMaxFailureInterval);
+ return this;
+ }
+
+ public AlertTemplateBuilder setRestart(Boolean restart) {
+ alertTemplate.setRestart(restart);
+ return this;
+ }
+
+ public AlertTemplateBuilder setRestart(Boolean needRestartOnFailed,
Integer restartCount) {
+ boolean needRestart = needRestartOnFailed && restartCount > 0;
+ alertTemplate.setRestart(needRestart);
+ return this;
+ }
+
+ public AlertTemplateBuilder setRestartIndex(Integer restartIndex) {
+ if (alertTemplate.getRestart()) {
+ alertTemplate.setRestartIndex(restartIndex);
+ }
+ return this;
+ }
+
+ public AlertTemplateBuilder setTotalRestart(Integer totalRestart) {
+ if (alertTemplate.getRestart()) {
+ alertTemplate.setTotalRestart(totalRestart);
+ }
+ return this;
+ }
+
+ public AlertTemplateBuilder setAtAll(Boolean atAll) {
+ alertTemplate.setAtAll(atAll);
+ return this;
+ }
+
+ public AlertTemplateBuilder setAffectedJobs(Integer jobs) {
+ alertTemplate.setAffectedJobs(jobs);
+ return this;
+ }
+
+ public AlertTemplate build() {
+ return this.alertTemplate;
+ }
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 7bfa52e72..32feec747 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -107,6 +107,14 @@ public class FlinkCluster implements Serializable {
private Date createTime = new Date();
+ private Date startTime;
+
+ private Date endTime;
+
+ private Integer alertId;
+
+ private transient Integer jobs = 0;
+
@JsonIgnore
public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() {
return FlinkK8sRestExposedType.of(this.k8sRestExposedType);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index dc7c129b6..d1b59fa07 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -65,4 +65,6 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId);
boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
+
+ Integer getAffectedJobsByClusterId(@Param("clusterId") Long clusterId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index f78dcf6f2..a64922afa 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -103,9 +103,11 @@ public interface ApplicationService extends
IService<Application> {
boolean existsRunningJobByClusterId(Long clusterId);
- boolean existsJobByClusterId(Long id);
+ boolean existsJobByClusterId(Long clusterId);
- boolean existsJobByFlinkEnvId(Long id);
+ Integer getAffectedJobsByClusterId(Long clusterId);
+
+ boolean existsJobByFlinkEnvId(Long flinkEnvId);
List<String> getRecentK8sNamespace();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
index 4627f0831..d47b1ab62 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
@@ -17,10 +17,12 @@
package org.apache.streampark.console.core.service.alert;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.core.bean.AlertConfigWithParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.CheckPointStatus;
import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -30,5 +32,7 @@ public interface AlertService {
void alert(Application application, FlinkAppState appState);
+ void alert(FlinkCluster flinkCluster, ClusterState clusterState);
+
boolean alert(AlertConfigWithParams params, AlertTemplate alertTemplate)
throws AlertException;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index 19a40249e..05b1b8294 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.alert.impl;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.base.util.SpringContextUtils;
@@ -24,6 +25,7 @@ import
org.apache.streampark.console.core.bean.AlertConfigWithParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.AlertConfig;
import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.AlertType;
import org.apache.streampark.console.core.enums.CheckPointStatus;
import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -48,17 +50,22 @@ public class AlertServiceImpl implements AlertService {
@Override
public void alert(Application application, CheckPointStatus
checkPointStatus) {
AlertTemplate alertTemplate = AlertTemplate.of(application,
checkPointStatus);
- alert(application, alertTemplate);
+ alert(application.getAlertId(), alertTemplate);
}
@Override
public void alert(Application application, FlinkAppState appState) {
AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
- alert(application, alertTemplate);
+ alert(application.getAlertId(), alertTemplate);
}
- private void alert(Application application, AlertTemplate alertTemplate) {
- Integer alertId = application.getAlertId();
+ @Override
+ public void alert(FlinkCluster flinkCluster, ClusterState clusterState) {
+ AlertTemplate alertTemplate = AlertTemplate.of(flinkCluster, clusterState);
+ alert(flinkCluster.getAlertId(), alertTemplate);
+ }
+
+ private void alert(Integer alertId, AlertTemplate alertTemplate) {
if (alertId == null) {
return;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 73e76e6e9..98253bb9f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -544,6 +544,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return baseMapper.existsJobByClusterId(clusterId);
}
+ @Override
+ public Integer getAffectedJobsByClusterId(Long clusterId) {
+ return baseMapper.getAffectedJobsByClusterId(clusterId);
+ }
+
@Override
public boolean existsJobByFlinkEnvId(Long flinkEnvId) {
LambdaQueryWrapper<Application> lambdaQueryWrapper =
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 31d4ead6f..769ddf626 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -142,6 +142,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setCreateTime(new Date());
if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+ flinkCluster.setStartTime(new Date());
+ flinkCluster.setEndTime(null);
} else {
flinkCluster.setClusterState(ClusterState.CREATED.getValue());
}
@@ -172,6 +174,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setClusterId(deployResponse.clusterId());
flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
flinkCluster.setException(null);
+ flinkCluster.setStartTime(new Date());
+ flinkCluster.setEndTime(null);
FlinkClusterWatcher.addFlinkCluster(flinkCluster);
updateById(flinkCluster);
} catch (Exception e) {
@@ -222,6 +226,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response
failed");
flinkCluster.setAddress(null);
flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+ flinkCluster.setEndTime(new Date());
FlinkClusterWatcher.removeFlinkCluster(flinkCluster);
updateById(flinkCluster);
} catch (Exception e) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 2ecb2c7de..29131f651 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -26,7 +26,9 @@ import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -43,6 +45,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -58,6 +61,10 @@ public class FlinkClusterWatcher {
@Autowired private FlinkClusterService flinkClusterService;
+ @Autowired private AlertService alertService;
+
+ @Autowired private ApplicationService applicationService;
+
private Long lastWatcheringTime = 0L;
// Track interval every 30 seconds
@@ -107,17 +114,54 @@ public class FlinkClusterWatcher {
EXECUTOR.execute(
() -> {
FlinkCluster flinkCluster = entry.getValue();
- Integer clusterExecutionMode = flinkCluster.getExecutionMode();
- if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
- ClusterState state = getClusterState(flinkCluster);
- handleClusterState(flinkCluster, state);
- } else {
- // TODO: K8s Session status monitoring
- }
+ updateClusterState(flinkCluster);
});
}
}
+ private ClusterState updateClusterState(FlinkCluster flinkCluster) {
+ Integer clusterExecutionMode = flinkCluster.getExecutionMode();
+ if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
+ ClusterState state = getClusterState(flinkCluster);
+ handleClusterState(flinkCluster, state);
+ return state;
+ } else {
+ // TODO: K8s Session status monitoring
+ return ClusterState.UNKNOWN;
+ }
+ }
+
+ public synchronized boolean verifyClusterValidByClusterId(Long clusterId) {
+ FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
+ ClusterState state = ClusterState.of(flinkCluster.getClusterState());
+ if (!ClusterState.isRunningState(state)) {
+ return false;
+ }
+ state = updateClusterState(flinkCluster);
+ if (!ClusterState.isRunningState(state)) {
+ return false;
+ }
+ return true;
+ }
+
+ public boolean checkAlert(Long clusterId) {
+ FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
+ if (flinkCluster.getAlertId() == null) {
+ return false;
+ }
+ return true;
+ }
+
+ private void alert(FlinkCluster cluster, ClusterState state) {
+ if (!checkAlert(cluster.getId())) {
+ return;
+ }
+
cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId()));
+ cluster.setClusterState(state.getValue());
+ cluster.setEndTime(new Date());
+ alertService.alert(cluster, state);
+ }
+
/**
* cluster get state from flink or yarn api
*
@@ -171,7 +215,7 @@ public class FlinkClusterWatcher {
*/
private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) {
if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
- return ClusterState.STOPPED;
+ return ClusterState.LOST;
}
String clusterId = flinkCluster.getClusterId();
if (StringUtils.isEmpty(clusterId)) {
@@ -214,13 +258,15 @@ public class FlinkClusterWatcher {
{
updateWrapper
.set(FlinkCluster::getAddress, null)
- .set(FlinkCluster::getJobManagerUrl, null);
+ .set(FlinkCluster::getJobManagerUrl, null)
+ .set(FlinkCluster::getEndTime, new Date());
}
// fall through
case LOST:
case UNKNOWN:
{
removeFlinkCluster(flinkCluster);
+ alert(flinkCluster, state);
break;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index e9b9f2224..737cc1bc6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -80,6 +80,8 @@ public class FlinkRESTAPIWatcher {
@Autowired private SavePointService savePointService;
+ @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
// track interval every 5 seconds
private static final long WATCHING_INTERVAL = 1000L * 5;
// option interval within 10 seconds
@@ -228,7 +230,7 @@ public class FlinkRESTAPIWatcher {
if (StopFrom.NONE.equals(stopFrom)) {
savePointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
- alertService.alert(application, FlinkAppState.LOST);
+ alert(application, FlinkAppState.LOST);
} else {
application.setState(FlinkAppState.CANCELED.getValue());
}
@@ -244,7 +246,7 @@ public class FlinkRESTAPIWatcher {
doPersistMetrics(application, true);
FlinkAppState appState =
FlinkAppState.of(application.getState());
if (appState.equals(FlinkAppState.FAILED) ||
appState.equals(FlinkAppState.LOST)) {
- alertService.alert(application,
FlinkAppState.of(application.getState()));
+ alert(application, FlinkAppState.of(application.getState()));
if (appState.equals(FlinkAppState.FAILED)) {
try {
applicationService.start(application, true);
@@ -456,7 +458,7 @@ public class FlinkRESTAPIWatcher {
savePointService.expire(application.getId());
}
stopCanceledJob(application.getId());
- alertService.alert(application, FlinkAppState.CANCELED);
+ alert(application, FlinkAppState.CANCELED);
}
STOP_FROM_MAP.remove(application.getId());
doPersistMetrics(application, true);
@@ -467,7 +469,7 @@ public class FlinkRESTAPIWatcher {
STOP_FROM_MAP.remove(application.getId());
application.setState(FlinkAppState.FAILED.getValue());
doPersistMetrics(application, true);
- alertService.alert(application, FlinkAppState.FAILED);
+ alert(application, FlinkAppState.FAILED);
applicationService.start(application, true);
break;
case RESTARTING:
@@ -544,7 +546,7 @@ public class FlinkRESTAPIWatcher {
|| flinkAppState.equals(FlinkAppState.LOST)
|| (flinkAppState.equals(FlinkAppState.CANCELED) &&
StopFrom.NONE.equals(stopFrom))
|| applicationService.checkAlter(application)) {
- alertService.alert(application, flinkAppState);
+ alert(application, flinkAppState);
stopCanceledJob(application.getId());
if (flinkAppState.equals(FlinkAppState.FAILED)) {
applicationService.start(application, true);
@@ -768,4 +770,30 @@ public class FlinkRESTAPIWatcher {
interface Callback<T, R> {
R call(T e) throws Exception;
}
+
+ /**
+ * The situation of abnormal operation alarm is as follows: When the job
running mode is yarn per
+ * job or yarn application, when the job is abnormal, an alarm will be
triggered directly; The job
+ * running mode is yarn session or reome: a. If the flink cluster is not
configured with an alarm
+ * information, it will directly alarm when the job is abnormal. b. If the
flink cluster is
+ * configured with alarm information: if the abnormal behavior of the job is
caused by an
+ * abnormality in the flink cluster, block the alarm of the job and wait for
the flink cluster
+ * alarm; If the abnormal behavior of the job is caused by itself and the
flink cluster is running
+ * normally, the job will an alarm
+ */
+ private void alert(Application app, FlinkAppState appState) {
+ if (ExecutionMode.isYarnPerJobOrAppMode(app.getExecutionModeEnum())
+ || !flinkClusterWatcher.checkAlert(app.getFlinkClusterId())) {
+ alertService.alert(app, appState);
+ return;
+ }
+ boolean isValid =
flinkClusterWatcher.verifyClusterValidByClusterId(app.getFlinkClusterId());
+ if (isValid) {
+ log.info(
+ "application with id {} is yarn session or remote and flink cluster
with id {} is alive, application send alert",
+ app.getId(),
+ app.getFlinkClusterId());
+ alertService.alert(app, appState);
+ }
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
index 14e30170b..dd7331bac 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
@@ -5,7 +5,12 @@
### **Dear StreamPark user:**
> ** Oops! I'm sorry to inform you that something wrong with your app **
+<#if type == 1 || type == 2 >
- **Job Name:${jobName}**
+</#if>
+<#if type == 3 >
+- **Cluster Name:${jobName}**
+</#if>
<#if type == 1 >
- **Job Status:${status}**
- **Start Time:${startTime}**
@@ -22,6 +27,13 @@
- **Start Time:${startTime}**
- **Duration:${duration}**
</#if>
+<#if type == 3 >
+- **Cluster Status:${status}**
+- **Start Time:${startTime}**
+- **End Time:${endTime}**
+- **Duration:${duration}**
+- **Affected Jobs:${affectedJobs}**
+</#if>
> Best Wishes!
>
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
index 3cb0a9323..a3d622858 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
@@ -967,6 +967,60 @@
</tr>
</#if>
+ <#if
mail.type == 3 >
+ <tr>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px;
border-top-color: rgba(169,169,169,.5); padding: 1em">
+
Cluster Name
+ </td>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px;
border-right-color: rgba(169,169,169,.5); border-top-style:solid;
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+
${mail.jobName}
+ </td>
+ </tr>
+ <tr>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px;
border-top-color: rgba(169,169,169,.5); padding: 1em">
+
Cluster Status
+ </td>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px;
border-right-color: rgba(169,169,169,.5); border-top-style:solid;
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+
<span style="font-size: 1.25em;font-weight: bold;color:
RED;">${mail.status}</span>
+ </td>
+ </tr>
+ <tr>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px;
border-top-color: rgba(169,169,169,.5); padding: 1em">
+
Start Time
+ </td>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px;
border-right-color: rgba(169,169,169,.5); border-top-style:solid;
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+
${mail.startTime}
+ </td>
+ </tr>
+ <tr>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px;
border-top-color: rgba(169,169,169,.5); padding: 1em">
+
End Time
+ </td>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px;
border-right-color: rgba(169,169,169,.5); border-top-style:solid;
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+
${mail.endTime}
+ </td>
+ </tr>
+
+ <tr>
+ <td
style="border-bottom-style:solid; border-bottom-width: 1px;
border-bottom-color: rgba(169,169,169,.5); border-left-style:solid;
border-left-width: 1px; border-left-color: rgba(169,169,169,.5);
border-top-style:solid; border-top-width: 1px; border-top-color:
rgba(169,169,169,.5); padding: 1em">
+
Duration
+ </td>
+ <td
style="border: 1px solid rgba(169,169,169,.5); padding: 1em">
+
${mail.duration}
+ </td>
+ </tr>
+
+ <tr>
+ <td
style="border-bottom-style:solid; border-bottom-width: 1px;
border-bottom-color: rgba(169,169,169,.5); border-left-style:solid;
border-left-width: 1px; border-left-color: rgba(169,169,169,.5);
border-top-style:solid; border-top-width: 1px; border-top-color:
rgba(169,169,169,.5); padding: 1em">
+
Affected Jobs
+ </td>
+ <td
style="border: 1px solid rgba(169,169,169,.5); padding: 1em">
+
${mail.affectedJobs}
+ </td>
+ </tr>
+
+ </#if>
+
<tr>
<td
colspan="2" style="height: 3rem;">
<div
style="margin: 1.5em 0;">Best Wishes!
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
index 2f7171fd7..a33b5811d 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
@@ -27,6 +27,7 @@
},
{
"fields": [
+<#if type == 1 || type == 2>
{
"is_short": false,
"text": {
@@ -34,6 +35,16 @@
"tag": "lark_md"
}
},
+</#if>
+<#if type == 3>
+ {
+ "is_short": false,
+ "text": {
+ "content": "**Cluster Name:${jobName}**",
+ "tag": "lark_md"
+ }
+ },
+</#if>
<#if type == 1 >
{
"is_short": false,
@@ -109,6 +120,43 @@
"tag": "lark_md"
}
}
+</#if>
+<#if type == 3 >
+ {
+ "is_short": false,
+ "text": {
+ "content": "**Cluster Status:${status}**",
+ "tag": "lark_md"
+ }
+ },
+ {
+ "is_short": true,
+ "text": {
+ "content": "**Start Time:${startTime}**",
+ "tag": "lark_md"
+ }
+ },
+ {
+ "is_short": false,
+ "text": {
+ "content": "**End Time:${endTime}**",
+ "tag": "lark_md"
+ }
+ },
+ {
+ "is_short": true,
+ "text": {
+ "content": "**Duration:${duration}**",
+ "tag": "lark_md"
+ }
+ },
+ {
+ "is_short": false,
+ "text": {
+ "content": "**Affected Jobs:${affectedJobs}**",
+ "tag": "lark_md"
+ }
+ }
</#if>
],
"tag": "div"
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
index 7d863c15d..d72a46872 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
@@ -5,8 +5,12 @@
### **Dear StreamPark user:**
`Oops! I'm sorry to inform you that something wrong with your app`
-
+<#if type == 1 || type ==2 >
- **Job Name:${jobName}**
+</#if>
+<#if type == 3 >
+- **Cluster Name:${jobName}**
+</#if>
<#if type == 1 >
- **Job Status:${status}**
- **Start Time:${startTime}**
@@ -23,6 +27,13 @@
- **Start Time:${startTime}**
- **Duration:${duration}**
</#if>
+<#if type == 3 >
+- **Cluster Status:${status}**
+- **Start Time:${startTime}**
+- **End Time:${endTime}**
+- **Duration:${duration}**
+- **Affected Jobs:${affectedJobs}**
+</#if>
> Best Wishes!
> Apache StreamPark
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index fd625c21e..fcb05c25e 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -411,6 +411,9 @@ create table if not exists `t_flink_cluster` (
`exception` text comment 'exception information',
`cluster_state` tinyint default 0 comment 'cluster status (0: created but
not started, 1: started, 2: stopped)',
`create_time` datetime not null default current_timestamp comment 'create
time',
+ `start_time` datetime default null comment 'start time',
+ `end_time` datetime default null comment 'end time',
+ `alert_id` bigint default null comment 'alert id',
primary key(`id`,`cluster_name`),
unique (`cluster_id`,`address`,`execution_mode`)
);
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index d8565cc76..cc0173387 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -133,6 +133,14 @@
limit 1
</select>
+ <select id="getAffectedJobsByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
+ select
+ count(1)
+ from t_flink_app
+ where flink_cluster_id = #{clusterId}
+ limit 1
+ </select>
+
<select id="getByProjectId"
resultType="org.apache.streampark.console.core.entity.Application"
parameterType="java.lang.Long">
select * from t_flink_app where project_id=#{projectId}
</select>
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 91e791847..769fe521c 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -40,6 +40,9 @@
<result column="exception" jdbcType="LONGVARCHAR"
property="exception"/>
<result column="cluster_state" jdbcType="TINYINT"
property="clusterState"/>
<result column="create_time" jdbcType="DATE" property="createTime"/>
+ <result column="start_time" jdbcType="DATE" property="startTime"/>
+ <result column="end_time" jdbcType="DATE" property="endTime"/>
+ <result column="alert_id" jdbcType="BIGINT" property="alertId"/>
</resultMap>
<select id="existsByClusterId" resultType="java.lang.Boolean"
parameterType="java.lang.String">