This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 72207f43d [ISSUE-2423][Feature][WIP] flink cluster failure 
alarm&failover (#2809)
72207f43d is described below

commit 72207f43d74bfd7eefc167effa26ebb33acb2726
Author: xujiangfeng001 <[email protected]>
AuthorDate: Sat Jul 1 17:42:36 2023 +0800

    [ISSUE-2423][Feature][WIP] flink cluster failure alarm&failover (#2809)
    
    * [Feature] flink cluster failure alarm&failover
---
 .../streampark/common/enums/ClusterState.java      |   4 +
 .../main/assembly/script/schema/mysql-schema.sql   |   3 +
 .../main/assembly/script/schema/pgsql-schema.sql   |   8 +-
 .../main/assembly/script/upgrade/mysql/2.2.0.sql   |   5 +-
 .../main/assembly/script/upgrade/pgsql/2.2.0.sql   |   5 +-
 .../console/core/bean/AlertTemplate.java           | 224 ++++++++++++++++-----
 .../console/core/entity/FlinkCluster.java          |   8 +
 .../console/core/mapper/ApplicationMapper.java     |   2 +
 .../console/core/service/ApplicationService.java   |   6 +-
 .../console/core/service/alert/AlertService.java   |   4 +
 .../core/service/alert/impl/AlertServiceImpl.java  |  15 +-
 .../core/service/impl/ApplicationServiceImpl.java  |   5 +
 .../core/service/impl/FlinkClusterServiceImpl.java |   5 +
 .../console/core/task/FlinkClusterWatcher.java     |  64 +++++-
 .../console/core/task/FlinkRESTAPIWatcher.java     |  38 +++-
 .../resources/alert-template/alert-dingTalk.ftl    |  12 ++
 .../main/resources/alert-template/alert-email.ftl  |  54 +++++
 .../main/resources/alert-template/alert-lark.ftl   |  48 +++++
 .../main/resources/alert-template/alert-weCom.ftl  |  13 +-
 .../src/main/resources/db/schema-h2.sql            |   3 +
 .../resources/mapper/core/ApplicationMapper.xml    |   8 +
 .../resources/mapper/core/FlinkClusterMapper.xml   |   3 +
 22 files changed, 462 insertions(+), 75 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index cefe347c7..decfc770a 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -75,4 +75,8 @@ public enum ClusterState implements Serializable {
   public static boolean isLostState(ClusterState state) {
     return LOST.equals(state);
   }
+
+  public static boolean isUnknownState(ClusterState state) {
+    return UNKNOWN.equals(state);
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 01d037191..7783e6c66 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -458,6 +458,9 @@ create table `t_flink_cluster` (
   `exception` text comment 'exception information',
   `cluster_state` tinyint default 0 comment 'cluster status (0: created but 
not started, 1: started, 2: stopped)',
   `create_time` datetime not null default current_timestamp comment 'create 
time',
+  `start_time` datetime default null comment 'start time',
+  `end_time` datetime default null comment 'end time',
+  `alert_id` bigint default null comment 'alert id',
   primary key (`id`,`cluster_name`),
   unique key `id` (`cluster_id`,`address`,`execution_mode`)
 ) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 1e68b967b..b1b8d1b1d 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -290,7 +290,10 @@ create table "public"."t_flink_cluster" (
   "resolve_order" int4,
   "exception" text collate "pg_catalog"."default",
   "cluster_state" int2 default 0,
-  "create_time" timestamp(6) not null default timezone('UTC-8'::text, 
(now())::timestamp(0) without time zone)
+  "create_time" timestamp(6) not null default timezone('UTC-8'::text, 
(now())::timestamp(0) without time zone),
+  "start_time" timestamp(6),
+  "end_time" timestamp(6),
+  "alert_id" int8
 )
 ;
 comment on column "public"."t_flink_cluster"."address" is 'url address of 
cluster';
@@ -309,6 +312,9 @@ comment on column 
"public"."t_flink_cluster"."k8s_rest_exposed_type" is 'k8s exp
 comment on column "public"."t_flink_cluster"."k8s_conf" is 'the path where the 
k 8 s configuration file is located';
 comment on column "public"."t_flink_cluster"."exception" is 'exception 
information';
 comment on column "public"."t_flink_cluster"."cluster_state" is 'cluster 
status (0: create not started, 1: started, 2: stopped)';
+comment on column "public"."t_flink_cluster"."start_time" is 'cluster start 
time';
+comment on column "public"."t_flink_cluster"."end_time" is 'cluster end time';
+comment on column "public"."t_flink_cluster"."alert_id" is 'alert id';
 alter table "public"."t_flink_cluster" add constraint "t_flink_cluster_pkey" 
primary key ("id", "cluster_name");
 create index "id" on "public"."t_flink_cluster" using btree (
   "cluster_id" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc 
nulls last,
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 87406e9e8..81db6ac9b 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -44,7 +44,10 @@ alter table `t_flink_sql`
     add column `team_resource` varchar(64) default null;
 
 alter table `t_flink_cluster`
-    add column `job_manager_url` varchar(150) default null comment 'url 
address of jobmanager' after `address`;
+    add column `job_manager_url` varchar(150) default null comment 'url 
address of jobmanager' after `address`,
+    add column `start_time` datetime default null comment 'start time',
+    add column `end_time` datetime default null comment 'end time',
+    add column `alert_id` bigint default null comment 'alert id';
 
 -- menu level 2
 insert into `t_menu` values (120400, 120000, 'menu.resource', 
'/flink/resource', 'flink/resource/View', null, 'apartment', '0', 1, 3, now(), 
now());
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index f2720b43f..9352cd563 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -57,7 +57,10 @@ alter table "public"."t_flink_sql"
     add column "team_resource" varchar(64) default null;
 
 alter table "public"."t_flink_cluster"
-    add column "job_manager_url" varchar(150) collate "pg_catalog"."default";
+    add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
+    add column "start_time" timestamp(6) collate "pg_catalog"."default",
+    add column "end_time" timestamp(6) collate "pg_catalog"."default",
+    add column "alert_id" int8 collate "pg_catalog"."default";
 
 insert into "public"."t_menu" values (120400, 120000, 'menu.resource', 
'/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3, 
now(), now());
 insert into "public"."t_menu" values (110401, 110400, 'add', null, null, 
'token:add', null, '1', '1', null, now(), now());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index fa6487707..b9a148349 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -17,10 +17,12 @@
 
 package org.apache.streampark.console.core.bean;
 
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.DateUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.enums.CheckPointStatus;
 import org.apache.streampark.console.core.enums.FlinkAppState;
 
@@ -47,61 +49,181 @@ public class AlertTemplate implements Serializable {
   private Integer restartIndex;
   private Integer totalRestart;
   private boolean atAll = false;
-
-  private static AlertTemplate of(Application application) {
-    long duration;
-    if (application.getEndTime() == null) {
-      duration = System.currentTimeMillis() - 
application.getStartTime().getTime();
-    } else {
-      duration = application.getEndTime().getTime() - 
application.getStartTime().getTime();
-    }
-    AlertTemplate template = new AlertTemplate();
-    template.setJobName(application.getJobName());
-
-    if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-      String format = "%s/proxy/%s/";
-      String url = String.format(format, YarnUtils.getRMWebAppURL(false), 
application.getAppId());
-      template.setLink(url);
-    } else {
-      template.setLink(null);
-    }
-
-    template.setStartTime(
-        DateUtils.format(
-            application.getStartTime(), DateUtils.fullFormat(), 
TimeZone.getDefault()));
-    template.setEndTime(
-        DateUtils.format(
-            application.getEndTime() == null ? new Date() : 
application.getEndTime(),
-            DateUtils.fullFormat(),
-            TimeZone.getDefault()));
-    template.setDuration(DateUtils.toDuration(duration));
-    boolean needRestart = application.isNeedRestartOnFailed() && 
application.getRestartCount() > 0;
-    template.setRestart(needRestart);
-    if (needRestart) {
-      template.setRestartIndex(application.getRestartCount());
-      template.setTotalRestart(application.getRestartSize());
-    }
-    return template;
-  }
+  private Integer affectedJobs;
 
   public static AlertTemplate of(Application application, FlinkAppState 
appState) {
-    AlertTemplate template = of(application);
-    template.setType(1);
-    template.setTitle(String.format("Notify: %s %s", application.getJobName(), 
appState.name()));
-    template.setSubject(String.format("StreamPark Alert: %s %s", 
template.getJobName(), appState));
-    template.setStatus(appState.name());
-    return template;
+    return new AlertTemplateBuilder()
+        .setDuration(application.getStartTime(), application.getEndTime())
+        .setJobName(application.getJobName())
+        .setLink(application.getExecutionModeEnum(), application.getAppId())
+        .setStartTime(application.getStartTime())
+        .setEndTime(application.getEndTime())
+        .setRestart(application.isNeedRestartOnFailed(), 
application.getRestartCount())
+        .setRestartIndex(application.getRestartCount())
+        .setTotalRestart(application.getRestartSize())
+        .setType(1)
+        .setTitle(String.format("Notify: %s %s", application.getJobName(), 
appState.name()))
+        .setSubject(String.format("StreamPark Alert: %s %s", 
application.getJobName(), appState))
+        .setStatus(appState.name())
+        .build();
   }
 
   public static AlertTemplate of(Application application, CheckPointStatus 
checkPointStatus) {
-    AlertTemplate template = of(application);
-    template.setType(2);
-    template.setCpFailureRateInterval(
-        DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 
60));
-    template.setCpMaxFailureInterval(application.getCpMaxFailureInterval());
-    template.setTitle(String.format("Notify: %s checkpoint FAILED", 
application.getJobName()));
-    template.setSubject(
-        String.format("StreamPark Alert: %s, checkPoint is Failed", 
template.getJobName()));
-    return template;
+    return new AlertTemplateBuilder()
+        .setDuration(application.getStartTime(), application.getEndTime())
+        .setJobName(application.getJobName())
+        .setLink(application.getExecutionModeEnum(), application.getAppId())
+        .setStartTime(application.getStartTime())
+        .setType(2)
+        .setCpFailureRateInterval(
+            DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 
* 60))
+        .setCpMaxFailureInterval(application.getCpMaxFailureInterval())
+        .setTitle(String.format("Notify: %s checkpoint FAILED", 
application.getJobName()))
+        .setSubject(
+            String.format("StreamPark Alert: %s, checkPoint is Failed", 
application.getJobName()))
+        .build();
+  }
+
+  public static AlertTemplate of(FlinkCluster cluster, ClusterState 
clusterState) {
+    return new AlertTemplateBuilder()
+        .setDuration(cluster.getStartTime(), cluster.getEndTime())
+        .setJobName(cluster.getClusterName())
+        .setLink(ExecutionMode.YARN_SESSION, cluster.getClusterId())
+        .setStartTime(cluster.getStartTime())
+        .setEndTime(cluster.getEndTime())
+        .setType(3)
+        .setTitle(String.format("Notify: %s %s", cluster.getClusterName(), 
clusterState.name()))
+        .setSubject(
+            String.format("StreamPark Alert: %s %s", cluster.getClusterName(), 
clusterState))
+        .setStatus(clusterState.name())
+        .setAffectedJobs(cluster.getJobs())
+        .build();
+  }
+
+  private static class AlertTemplateBuilder {
+    private AlertTemplate alertTemplate = new AlertTemplate();
+
+    public AlertTemplateBuilder setTitle(String title) {
+      alertTemplate.setTitle(title);
+      return this;
+    }
+
+    public AlertTemplateBuilder setSubject(String subject) {
+      alertTemplate.setSubject(subject);
+      return this;
+    }
+
+    public AlertTemplateBuilder setJobName(String jobName) {
+      alertTemplate.setJobName(jobName);
+      return this;
+    }
+
+    public AlertTemplateBuilder setType(Integer type) {
+      alertTemplate.setType(type);
+      return this;
+    }
+
+    public AlertTemplateBuilder setStatus(String status) {
+      alertTemplate.setStatus(status);
+      return this;
+    }
+
+    public AlertTemplateBuilder setStartTime(Date startTime) {
+      alertTemplate.setStartTime(
+          DateUtils.format(startTime, DateUtils.fullFormat(), 
TimeZone.getDefault()));
+      return this;
+    }
+
+    public AlertTemplateBuilder setEndTime(Date endTime) {
+      alertTemplate.setEndTime(
+          DateUtils.format(
+              endTime == null ? new Date() : endTime,
+              DateUtils.fullFormat(),
+              TimeZone.getDefault()));
+      return this;
+    }
+
+    public AlertTemplateBuilder setDuration(String duration) {
+      alertTemplate.setDuration(duration);
+      return this;
+    }
+
+    public AlertTemplateBuilder setDuration(Date start, Date end) {
+      long duration;
+      if (start == null && end == null) {
+        duration = 0L;
+      } else if (end == null) {
+        duration = System.currentTimeMillis() - start.getTime();
+      } else {
+        duration = end.getTime() - start.getTime();
+      }
+      alertTemplate.setDuration(DateUtils.toDuration(duration));
+      return this;
+    }
+
+    public AlertTemplateBuilder setLink(String link) {
+      alertTemplate.setLink(link);
+      return this;
+    }
+
+    public AlertTemplateBuilder setLink(ExecutionMode mode, String appId) {
+      if (ExecutionMode.isYarnMode(mode)) {
+        String format = "%s/proxy/%s/";
+        String url = String.format(format, YarnUtils.getRMWebAppURL(false), 
appId);
+        alertTemplate.setLink(url);
+      } else {
+        alertTemplate.setLink(null);
+      }
+      return this;
+    }
+
+    public AlertTemplateBuilder setCpFailureRateInterval(String 
cpFailureRateInterval) {
+      alertTemplate.setCpFailureRateInterval(cpFailureRateInterval);
+      return this;
+    }
+
+    public AlertTemplateBuilder setCpMaxFailureInterval(Integer 
cpMaxFailureInterval) {
+      alertTemplate.setCpMaxFailureInterval(cpMaxFailureInterval);
+      return this;
+    }
+
+    public AlertTemplateBuilder setRestart(Boolean restart) {
+      alertTemplate.setRestart(restart);
+      return this;
+    }
+
+    public AlertTemplateBuilder setRestart(Boolean needRestartOnFailed, 
Integer restartCount) {
+      boolean needRestart = needRestartOnFailed && restartCount > 0;
+      alertTemplate.setRestart(needRestart);
+      return this;
+    }
+
+    public AlertTemplateBuilder setRestartIndex(Integer restartIndex) {
+      if (alertTemplate.getRestart()) {
+        alertTemplate.setRestartIndex(restartIndex);
+      }
+      return this;
+    }
+
+    public AlertTemplateBuilder setTotalRestart(Integer totalRestart) {
+      if (alertTemplate.getRestart()) {
+        alertTemplate.setTotalRestart(totalRestart);
+      }
+      return this;
+    }
+
+    public AlertTemplateBuilder setAtAll(Boolean atAll) {
+      alertTemplate.setAtAll(atAll);
+      return this;
+    }
+
+    public AlertTemplateBuilder setAffectedJobs(Integer jobs) {
+      alertTemplate.setAffectedJobs(jobs);
+      return this;
+    }
+
+    public AlertTemplate build() {
+      return this.alertTemplate;
+    }
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 7bfa52e72..32feec747 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -107,6 +107,14 @@ public class FlinkCluster implements Serializable {
 
   private Date createTime = new Date();
 
+  private Date startTime;
+
+  private Date endTime;
+
+  private Integer alertId;
+
+  private transient Integer jobs = 0;
+
   @JsonIgnore
   public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() {
     return FlinkK8sRestExposedType.of(this.k8sRestExposedType);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index dc7c129b6..d1b59fa07 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -65,4 +65,6 @@ public interface ApplicationMapper extends 
BaseMapper<Application> {
   boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId);
 
   boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
+
+  Integer getAffectedJobsByClusterId(@Param("clusterId") Long clusterId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index f78dcf6f2..a64922afa 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -103,9 +103,11 @@ public interface ApplicationService extends 
IService<Application> {
 
   boolean existsRunningJobByClusterId(Long clusterId);
 
-  boolean existsJobByClusterId(Long id);
+  boolean existsJobByClusterId(Long clusterId);
 
-  boolean existsJobByFlinkEnvId(Long id);
+  Integer getAffectedJobsByClusterId(Long clusterId);
+
+  boolean existsJobByFlinkEnvId(Long flinkEnvId);
 
   List<String> getRecentK8sNamespace();
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
index 4627f0831..d47b1ab62 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertService.java
@@ -17,10 +17,12 @@
 
 package org.apache.streampark.console.core.service.alert;
 
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.console.base.exception.AlertException;
 import org.apache.streampark.console.core.bean.AlertConfigWithParams;
 import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.enums.CheckPointStatus;
 import org.apache.streampark.console.core.enums.FlinkAppState;
 
@@ -30,5 +32,7 @@ public interface AlertService {
 
   void alert(Application application, FlinkAppState appState);
 
+  void alert(FlinkCluster flinkCluster, ClusterState clusterState);
+
   boolean alert(AlertConfigWithParams params, AlertTemplate alertTemplate) 
throws AlertException;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index 19a40249e..05b1b8294 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.alert.impl;
 
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.AlertException;
 import org.apache.streampark.console.base.util.SpringContextUtils;
@@ -24,6 +25,7 @@ import 
org.apache.streampark.console.core.bean.AlertConfigWithParams;
 import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.AlertConfig;
 import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.enums.AlertType;
 import org.apache.streampark.console.core.enums.CheckPointStatus;
 import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -48,17 +50,22 @@ public class AlertServiceImpl implements AlertService {
   @Override
   public void alert(Application application, CheckPointStatus 
checkPointStatus) {
     AlertTemplate alertTemplate = AlertTemplate.of(application, 
checkPointStatus);
-    alert(application, alertTemplate);
+    alert(application.getAlertId(), alertTemplate);
   }
 
   @Override
   public void alert(Application application, FlinkAppState appState) {
     AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
-    alert(application, alertTemplate);
+    alert(application.getAlertId(), alertTemplate);
   }
 
-  private void alert(Application application, AlertTemplate alertTemplate) {
-    Integer alertId = application.getAlertId();
+  @Override
+  public void alert(FlinkCluster flinkCluster, ClusterState clusterState) {
+    AlertTemplate alertTemplate = AlertTemplate.of(flinkCluster, clusterState);
+    alert(flinkCluster.getAlertId(), alertTemplate);
+  }
+
+  private void alert(Integer alertId, AlertTemplate alertTemplate) {
     if (alertId == null) {
       return;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 73e76e6e9..98253bb9f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -544,6 +544,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return baseMapper.existsJobByClusterId(clusterId);
   }
 
+  @Override
+  public Integer getAffectedJobsByClusterId(Long clusterId) {
+    return baseMapper.getAffectedJobsByClusterId(clusterId);
+  }
+
   @Override
   public boolean existsJobByFlinkEnvId(Long flinkEnvId) {
     LambdaQueryWrapper<Application> lambdaQueryWrapper =
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 31d4ead6f..769ddf626 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -142,6 +142,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     flinkCluster.setCreateTime(new Date());
     if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
       flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+      flinkCluster.setStartTime(new Date());
+      flinkCluster.setEndTime(null);
     } else {
       flinkCluster.setClusterState(ClusterState.CREATED.getValue());
     }
@@ -172,6 +174,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setClusterId(deployResponse.clusterId());
       flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
       flinkCluster.setException(null);
+      flinkCluster.setStartTime(new Date());
+      flinkCluster.setEndTime(null);
       FlinkClusterWatcher.addFlinkCluster(flinkCluster);
       updateById(flinkCluster);
     } catch (Exception e) {
@@ -222,6 +226,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response 
failed");
       flinkCluster.setAddress(null);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+      flinkCluster.setEndTime(new Date());
       FlinkClusterWatcher.removeFlinkCluster(flinkCluster);
       updateById(flinkCluster);
     } catch (Exception e) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 2ecb2c7de..29131f651 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -26,7 +26,9 @@ import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.metrics.flink.Overview;
 import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.alert.AlertService;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -43,6 +45,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
 import java.time.Duration;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -58,6 +61,10 @@ public class FlinkClusterWatcher {
 
   @Autowired private FlinkClusterService flinkClusterService;
 
+  @Autowired private AlertService alertService;
+
+  @Autowired private ApplicationService applicationService;
+
   private Long lastWatcheringTime = 0L;
 
   // Track interval  every 30 seconds
@@ -107,17 +114,54 @@ public class FlinkClusterWatcher {
       EXECUTOR.execute(
           () -> {
             FlinkCluster flinkCluster = entry.getValue();
-            Integer clusterExecutionMode = flinkCluster.getExecutionMode();
-            if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
-              ClusterState state = getClusterState(flinkCluster);
-              handleClusterState(flinkCluster, state);
-            } else {
-              // TODO: K8s Session status monitoring
-            }
+            updateClusterState(flinkCluster);
           });
     }
   }
 
+  private ClusterState updateClusterState(FlinkCluster flinkCluster) {
+    Integer clusterExecutionMode = flinkCluster.getExecutionMode();
+    if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
+      ClusterState state = getClusterState(flinkCluster);
+      handleClusterState(flinkCluster, state);
+      return state;
+    } else {
+      // TODO: K8s Session status monitoring
+      return ClusterState.UNKNOWN;
+    }
+  }
+
+  public synchronized boolean verifyClusterValidByClusterId(Long clusterId) {
+    FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
+    ClusterState state = ClusterState.of(flinkCluster.getClusterState());
+    if (!ClusterState.isRunningState(state)) {
+      return false;
+    }
+    state = updateClusterState(flinkCluster);
+    if (!ClusterState.isRunningState(state)) {
+      return false;
+    }
+    return true;
+  }
+
+  public boolean checkAlert(Long clusterId) {
+    FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
+    if (flinkCluster.getAlertId() == null) {
+      return false;
+    }
+    return true;
+  }
+
+  private void alert(FlinkCluster cluster, ClusterState state) {
+    if (!checkAlert(cluster.getId())) {
+      return;
+    }
+    
cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId()));
+    cluster.setClusterState(state.getValue());
+    cluster.setEndTime(new Date());
+    alertService.alert(cluster, state);
+  }
+
   /**
    * cluster get state from flink or yarn api
    *
@@ -171,7 +215,7 @@ public class FlinkClusterWatcher {
    */
   private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) {
     if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
-      return ClusterState.STOPPED;
+      return ClusterState.LOST;
     }
     String clusterId = flinkCluster.getClusterId();
     if (StringUtils.isEmpty(clusterId)) {
@@ -214,13 +258,15 @@ public class FlinkClusterWatcher {
         {
           updateWrapper
               .set(FlinkCluster::getAddress, null)
-              .set(FlinkCluster::getJobManagerUrl, null);
+              .set(FlinkCluster::getJobManagerUrl, null)
+              .set(FlinkCluster::getEndTime, new Date());
         }
         // fall through
       case LOST:
       case UNKNOWN:
         {
           removeFlinkCluster(flinkCluster);
+          alert(flinkCluster, state);
           break;
         }
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index e9b9f2224..737cc1bc6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -80,6 +80,8 @@ public class FlinkRESTAPIWatcher {
 
   @Autowired private SavePointService savePointService;
 
+  @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
   // track interval  every 5 seconds
   private static final long WATCHING_INTERVAL = 1000L * 5;
   // option interval within 10 seconds
@@ -228,7 +230,7 @@ public class FlinkRESTAPIWatcher {
                   if (StopFrom.NONE.equals(stopFrom)) {
                     savePointService.expire(application.getId());
                     application.setState(FlinkAppState.LOST.getValue());
-                    alertService.alert(application, FlinkAppState.LOST);
+                    alert(application, FlinkAppState.LOST);
                   } else {
                     application.setState(FlinkAppState.CANCELED.getValue());
                   }
@@ -244,7 +246,7 @@ public class FlinkRESTAPIWatcher {
                 doPersistMetrics(application, true);
                 FlinkAppState appState = 
FlinkAppState.of(application.getState());
                 if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
-                  alertService.alert(application, 
FlinkAppState.of(application.getState()));
+                  alert(application, FlinkAppState.of(application.getState()));
                   if (appState.equals(FlinkAppState.FAILED)) {
                     try {
                       applicationService.start(application, true);
@@ -456,7 +458,7 @@ public class FlinkRESTAPIWatcher {
             savePointService.expire(application.getId());
           }
           stopCanceledJob(application.getId());
-          alertService.alert(application, FlinkAppState.CANCELED);
+          alert(application, FlinkAppState.CANCELED);
         }
         STOP_FROM_MAP.remove(application.getId());
         doPersistMetrics(application, true);
@@ -467,7 +469,7 @@ public class FlinkRESTAPIWatcher {
         STOP_FROM_MAP.remove(application.getId());
         application.setState(FlinkAppState.FAILED.getValue());
         doPersistMetrics(application, true);
-        alertService.alert(application, FlinkAppState.FAILED);
+        alert(application, FlinkAppState.FAILED);
         applicationService.start(application, true);
         break;
       case RESTARTING:
@@ -544,7 +546,7 @@ public class FlinkRESTAPIWatcher {
               || flinkAppState.equals(FlinkAppState.LOST)
               || (flinkAppState.equals(FlinkAppState.CANCELED) && 
StopFrom.NONE.equals(stopFrom))
               || applicationService.checkAlter(application)) {
-            alertService.alert(application, flinkAppState);
+            alert(application, flinkAppState);
             stopCanceledJob(application.getId());
             if (flinkAppState.equals(FlinkAppState.FAILED)) {
               applicationService.start(application, true);
@@ -768,4 +770,30 @@ public class FlinkRESTAPIWatcher {
   interface Callback<T, R> {
     R call(T e) throws Exception;
   }
+
+  /**
+   * The situation of abnormal operation alarm is as follows: When the job 
running mode is yarn per
+   * job or yarn application, when the job is abnormal, an alarm will be 
triggered directly; The job
+   * running mode is yarn session or reome: a. If the flink cluster is not 
configured with an alarm
+   * information, it will directly alarm when the job is abnormal. b. If the 
flink cluster is
+   * configured with alarm information: if the abnormal behavior of the job is 
caused by an
+   * abnormality in the flink cluster, block the alarm of the job and wait for 
the flink cluster
+   * alarm; If the abnormal behavior of the job is caused by itself and the 
flink cluster is running
+   * normally, the job will an alarm
+   */
+  private void alert(Application app, FlinkAppState appState) {
+    if (ExecutionMode.isYarnPerJobOrAppMode(app.getExecutionModeEnum())
+        || !flinkClusterWatcher.checkAlert(app.getFlinkClusterId())) {
+      alertService.alert(app, appState);
+      return;
+    }
+    boolean isValid = 
flinkClusterWatcher.verifyClusterValidByClusterId(app.getFlinkClusterId());
+    if (isValid) {
+      log.info(
+          "application with id {} is yarn session or remote and flink cluster 
with id {} is alive, application send alert",
+          app.getId(),
+          app.getFlinkClusterId());
+      alertService.alert(app, appState);
+    }
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
index 14e30170b..dd7331bac 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
+++ 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
@@ -5,7 +5,12 @@
 ### **Dear StreamPark user:**
 
 > ** Oops! I'm sorry to inform you that something wrong with your app **
+<#if  type == 1 || type == 2 >
 -   **Job Name:${jobName}**
+</#if>
+<#if  type == 3 >
+-   **Cluster Name:${jobName}**
+</#if>
 <#if  type == 1 >
 -   **Job Status:${status}**
 -   **Start Time:${startTime}**
@@ -22,6 +27,13 @@
 -   **Start Time:${startTime}**
 -   **Duration:${duration}**
 </#if>
+<#if  type == 3 >
+-   **Cluster Status:${status}**
+-   **Start Time:${startTime}**
+-   **End Time:${endTime}**
+-   **Duration:${duration}**
+-   **Affected Jobs:${affectedJobs}**
+</#if>
 
 > Best Wishes!
 >
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
index 3cb0a9323..a3d622858 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
+++ 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
@@ -967,6 +967,60 @@
                                                                     </tr>
                                                                 </#if>
 
+                                                                <#if  
mail.type == 3 >
+                                                                    <tr>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px; 
border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
Cluster Name
+                                                                        </td>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px; 
border-right-color: rgba(169,169,169,.5); border-top-style:solid; 
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
${mail.jobName}
+                                                                        </td>
+                                                                    </tr>
+                                                                    <tr>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px; 
border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
Cluster Status
+                                                                        </td>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px; 
border-right-color: rgba(169,169,169,.5); border-top-style:solid; 
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
<span style="font-size: 1.25em;font-weight: bold;color: 
RED;">${mail.status}</span>
+                                                                        </td>
+                                                                    </tr>
+                                                                    <tr>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px; 
border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
Start Time
+                                                                        </td>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px; 
border-right-color: rgba(169,169,169,.5); border-top-style:solid; 
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
${mail.startTime}
+                                                                        </td>
+                                                                    </tr>
+                                                                    <tr>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px; 
border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
End Time
+                                                                        </td>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px; 
border-right-color: rgba(169,169,169,.5); border-top-style:solid; 
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
${mail.endTime}
+                                                                        </td>
+                                                                    </tr>
+
+                                                                    <tr>
+                                                                        <td 
style="border-bottom-style:solid; border-bottom-width: 1px; 
border-bottom-color: rgba(169,169,169,.5); border-left-style:solid; 
border-left-width: 1px; border-left-color: rgba(169,169,169,.5); 
border-top-style:solid; border-top-width: 1px; border-top-color: 
rgba(169,169,169,.5); padding: 1em">
+                                                                            
Duration
+                                                                        </td>
+                                                                        <td 
style="border: 1px solid rgba(169,169,169,.5); padding: 1em">
+                                                                            
${mail.duration}
+                                                                        </td>
+                                                                    </tr>
+
+                                                                    <tr>
+                                                                        <td 
style="border-bottom-style:solid; border-bottom-width: 1px; 
border-bottom-color: rgba(169,169,169,.5); border-left-style:solid; 
border-left-width: 1px; border-left-color: rgba(169,169,169,.5); 
border-top-style:solid; border-top-width: 1px; border-top-color: 
rgba(169,169,169,.5); padding: 1em">
+                                                                            
Affected Jobs
+                                                                        </td>
+                                                                        <td 
style="border: 1px solid rgba(169,169,169,.5); padding: 1em">
+                                                                            
${mail.affectedJobs}
+                                                                        </td>
+                                                                    </tr>
+
+                                                                </#if>
+
                                                                 <tr>
                                                                     <td 
colspan="2" style="height: 3rem;">
                                                                         <div 
style="margin: 1.5em 0;">Best Wishes!
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
index 2f7171fd7..a33b5811d 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
+++ 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
@@ -27,6 +27,7 @@
     },
     {
       "fields": [
+<#if type == 1 || type == 2>
         {
           "is_short": false,
           "text": {
@@ -34,6 +35,16 @@
             "tag": "lark_md"
           }
         },
+</#if>
+<#if type == 3>
+        {
+          "is_short": false,
+          "text": {
+            "content": "**Cluster Name:${jobName}**",
+            "tag": "lark_md"
+          }
+        },
+</#if>
 <#if  type == 1 >
         {
           "is_short": false,
@@ -109,6 +120,43 @@
             "tag": "lark_md"
           }
         }
+</#if>
+<#if  type == 3 >
+       {
+          "is_short": false,
+          "text": {
+            "content": "**Cluster Status:${status}**",
+            "tag": "lark_md"
+          }
+       },
+       {
+          "is_short": true,
+          "text": {
+            "content": "**Start Time:${startTime}**",
+            "tag": "lark_md"
+          }
+       },
+       {
+          "is_short": false,
+          "text": {
+            "content": "**End Time:${endTime}**",
+            "tag": "lark_md"
+          }
+       },
+       {
+          "is_short": true,
+          "text": {
+            "content": "**Duration:${duration}**",
+            "tag": "lark_md"
+          }
+       },
+       {
+          "is_short": false,
+          "text": {
+            "content": "**Affected Jobs:${affectedJobs}**",
+            "tag": "lark_md"
+          }
+       }
 </#if>
       ],
       "tag": "div"
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
index 7d863c15d..d72a46872 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
+++ 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
@@ -5,8 +5,12 @@
 ### **Dear StreamPark user:**
 
 `Oops! I'm sorry to inform you that something wrong with your app`
-
+<#if  type == 1 || type ==2 >
 -   **Job Name:${jobName}**
+</#if>
+<#if  type == 3 >
+-   **Cluster Name:${jobName}**
+</#if>
 <#if  type == 1 >
 -   **Job Status:${status}**
 -   **Start Time:${startTime}**
@@ -23,6 +27,13 @@
 -   **Start Time:${startTime}**
 -   **Duration:${duration}**
 </#if>
+<#if  type == 3 >
+-   **Cluster Status:${status}**
+-   **Start Time:${startTime}**
+-   **End Time:${endTime}**
+-   **Duration:${duration}**
+-   **Affected Jobs:${affectedJobs}**
+</#if>
 
 > Best Wishes!
 > Apache StreamPark
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index fd625c21e..fcb05c25e 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -411,6 +411,9 @@ create table if not exists `t_flink_cluster` (
   `exception` text comment 'exception information',
   `cluster_state` tinyint default 0 comment 'cluster status (0: created but 
not started, 1: started, 2: stopped)',
   `create_time` datetime not null default current_timestamp comment 'create 
time',
+  `start_time` datetime default null comment 'start time',
+  `end_time` datetime default null comment 'end time',
+  `alert_id` bigint default null comment 'alert id',
   primary key(`id`,`cluster_name`),
   unique (`cluster_id`,`address`,`execution_mode`)
 );
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index d8565cc76..cc0173387 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -133,6 +133,14 @@
              limit 1
     </select>
 
+    <select id="getAffectedJobsByClusterId" resultType="java.lang.Integer" 
parameterType="java.lang.Long">
+        select
+            count(1)
+        from t_flink_app
+        where flink_cluster_id = #{clusterId}
+            limit 1
+    </select>
+
     <select id="getByProjectId" 
resultType="org.apache.streampark.console.core.entity.Application" 
parameterType="java.lang.Long">
         select * from t_flink_app where project_id=#{projectId}
     </select>
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 91e791847..769fe521c 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -40,6 +40,9 @@
         <result column="exception" jdbcType="LONGVARCHAR" 
property="exception"/>
         <result column="cluster_state" jdbcType="TINYINT" 
property="clusterState"/>
         <result column="create_time" jdbcType="DATE" property="createTime"/>
+        <result column="start_time" jdbcType="DATE" property="startTime"/>
+        <result column="end_time" jdbcType="DATE" property="endTime"/>
+        <result column="alert_id" jdbcType="BIGINT" property="alertId"/>
     </resultMap>
 
     <select id="existsByClusterId" resultType="java.lang.Boolean" 
parameterType="java.lang.String">

Reply via email to