This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 332c6756b [Feature] StreamPark Autonomous Health Probing (#3036)
332c6756b is described below

commit 332c6756be4d2942929a1e6e5f15fe4ae8e3098e
Author: xujiangfeng001 <[email protected]>
AuthorDate: Sat Sep 16 23:15:28 2023 +0800

    [Feature] StreamPark Autonomous Health Probing (#3036)
    
    * [Feature] StreamPark Autonomous Health Probing
---
 .../main/assembly/script/schema/mysql-schema.sql   |   1 +
 .../main/assembly/script/schema/pgsql-schema.sql   |   3 +-
 .../main/assembly/script/upgrade/mysql/2.2.0.sql   |   3 +
 .../main/assembly/script/upgrade/pgsql/2.2.0.sql   |   3 +
 .../console/core/bean/AlertProbeMsg.java           |  56 ++++++
 .../console/core/bean/AlertTemplate.java           |  43 +++++
 .../console/core/entity/Application.java           |   3 +-
 .../console/core/enums/FlinkAppState.java          |   8 +
 .../console/core/mapper/ApplicationMapper.java     |   2 +
 .../application/ApplicationManageService.java      |   7 +
 .../impl/ApplicationManageServiceImpl.java         |   4 +
 .../console/core/task/AutoHealthProbingTask.java   | 189 +++++++++++++++++++++
 .../console/core/task/FlinkHttpWatcher.java        |   5 +
 .../core/task/FlinkK8sChangeEventListener.java     |  11 +-
 .../console/core/task/FlinkK8sWatcherWrapper.java  |   2 +
 .../resources/alert-template/alert-dingTalk.ftl    |  16 +-
 .../main/resources/alert-template/alert-email.ftl  |  76 +++++++--
 .../main/resources/alert-template/alert-lark.ftl   |  47 +++++
 .../main/resources/alert-template/alert-weCom.ftl  |  16 +-
 .../src/main/resources/db/schema-h2.sql            |   1 +
 .../resources/mapper/core/ApplicationMapper.xml    |  16 ++
 .../src/enums/flinkEnum.ts                         |   2 +
 .../src/views/flink/app/components/State.less      |  16 ++
 .../src/views/flink/app/components/State.tsx       |   5 +
 24 files changed, 517 insertions(+), 18 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 08a1693a3..8fd8c1831 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -102,6 +102,7 @@ create table `t_flink_app` (
   `ingress_template` text collate utf8mb4_general_ci,
   `default_mode_ingress` text collate utf8mb4_general_ci,
   `tags` varchar(500) default null,
+  `probing` tinyint default 0,
   primary key (`id`) using btree,
   key `inx_job_type` (`job_type`) using btree,
   key `inx_track` (`tracking`) using btree,
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index eef89390c..81dc67f4f 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -249,7 +249,8 @@ create table "public"."t_flink_app" (
   "flink_cluster_id" int8,
   "ingress_template" text collate "pg_catalog"."default",
   "default_mode_ingress" text collate "pg_catalog"."default",
-  "tags" varchar(500) collate "pg_catalog"."default"
+  "tags" varchar(500) collate "pg_catalog"."default",
+  "probing" boolean default false
 )
 ;
 alter table "public"."t_flink_app" add constraint "t_flink_app_pkey" primary 
key ("id");
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index f428f9216..79a890def 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -43,6 +43,9 @@ unique key `un_team_vcode_inx` (`team_id`,`resource_name`) 
using btree
 alter table `t_flink_sql`
     add column `team_resource` varchar(64) default null;
 
+alter table `t_flink_app`
+    add column `probing` tinyint default 0;
+
 alter table `t_flink_cluster`
     add column `job_manager_url` varchar(150) default null comment 'url 
address of jobmanager' after `address`,
     add column `start_time` datetime default null comment 'start time',
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index 4e7a43b2f..508c8cc07 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -59,6 +59,9 @@ create index "un_team_dname_inx" on "public"."t_resource" 
using btree (
 alter table "public"."t_flink_sql"
     add column "team_resource" varchar(64) default null;
 
+alter table "public"."t_flink_app"
+    add column "probing" boolean default false;
+
 alter table "public"."t_flink_cluster"
     add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
     add column "start_time" timestamp(6) collate "pg_catalog"."default",
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
new file mode 100644
index 000000000..d42537746
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Set;
+
+@NoArgsConstructor
+@Data
+public class AlertProbeMsg {
+
+  private Set<Long> alertId;
+
+  private String user;
+
+  private Integer probeJobs = 0;
+
+  private Integer failedJobs = 0;
+
+  private Integer lostJobs = 0;
+
+  private Integer cancelledJobs = 0;
+
+  public void incrementProbeJobs() {
+    this.probeJobs++;
+  }
+
+  public void incrementFailedJobs() {
+    this.failedJobs++;
+  }
+
+  public void incrementLostJobs() {
+    this.lostJobs++;
+  }
+
+  public void incrementCancelledJobs() {
+    this.cancelledJobs++;
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index 733fa020c..be407078a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -51,6 +51,11 @@ public class AlertTemplate implements Serializable {
   private boolean atAll = false;
   private Integer allJobs;
   private Integer affectedJobs;
+  private String user;
+  private Integer probeJobs;
+  private Integer failedJobs;
+  private Integer lostJobs;
+  private Integer cancelledJobs;
 
   public static AlertTemplate of(Application application, FlinkAppState 
appState) {
     return new AlertTemplateBuilder()
@@ -102,6 +107,19 @@ public class AlertTemplate implements Serializable {
         .build();
   }
 
+  public static AlertTemplate of(AlertProbeMsg alertProbeMsg) {
+    return new AlertTemplateBuilder()
+        .setType(4)
+        .setUser(alertProbeMsg.getUser())
+        .setProbeJobs(alertProbeMsg.getProbeJobs())
+        .setFailedJobs(alertProbeMsg.getFailedJobs())
+        .setLostJobs(alertProbeMsg.getLostJobs())
+        .setCancelledJobs(alertProbeMsg.getCancelledJobs())
+        .setSubject("StreamPark Alert: PROBE")
+        .setTitle("PROBE")
+        .build();
+  }
+
   private static class AlertTemplateBuilder {
     private final AlertTemplate alertTemplate = new AlertTemplate();
 
@@ -229,6 +247,31 @@ public class AlertTemplate implements Serializable {
       return this;
     }
 
+    public AlertTemplateBuilder setUser(String user) {
+      alertTemplate.setUser(user);
+      return this;
+    }
+
+    public AlertTemplateBuilder setProbeJobs(Integer probeJobs) {
+      alertTemplate.setProbeJobs(probeJobs);
+      return this;
+    }
+
+    public AlertTemplateBuilder setFailedJobs(Integer failedJobs) {
+      alertTemplate.setFailedJobs(failedJobs);
+      return this;
+    }
+
+    public AlertTemplateBuilder setLostJobs(Integer lostJobs) {
+      alertTemplate.setLostJobs(lostJobs);
+      return this;
+    }
+
+    public AlertTemplateBuilder setCancelledJobs(Integer cancelledJobs) {
+      alertTemplate.setCancelledJobs(cancelledJobs);
+      return this;
+    }
+
     public AlertTemplate build() {
       return this.alertTemplate;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 049af6f13..f0b283be1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -211,6 +211,8 @@ public class Application implements Serializable {
 
   private String tags;
 
+  private Boolean probing = false;
+
   /** running job */
   private transient JobsOverview.Task overview;
 
@@ -308,7 +310,6 @@ public class Application implements Serializable {
       case CANCELED:
       case TERMINATED:
       case POS_TERMINATED:
-      case LOST:
         return 0;
       default:
         return 1;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 84bf447f3..59dc4c6f5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -99,6 +99,9 @@ public enum FlinkAppState {
   /** Job SUCCEEDED on yarn. */
   SUCCEEDED(20),
 
+  /** Job auto Health probe */
+  PROBING(21),
+
   /** Has killed in Yarn. */
   KILLED(-9);
 
@@ -137,6 +140,11 @@ public enum FlinkAppState {
         || FlinkAppState.TERMINATED == flinkAppState;
   }
 
+  public static boolean isLost(Integer appState) {
+    FlinkAppState flinkAppState = FlinkAppState.of(appState);
+    return FlinkAppState.LOST == flinkAppState;
+  }
+
   /**
    * Type conversion bridging Deprecated, see {@link
    * org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 3744efa84..7076b5786 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -37,6 +37,8 @@ public interface ApplicationMapper extends 
BaseMapper<Application> {
 
   List<Application> getByTeamId(@Param("teamId") Long teamId);
 
+  List<Application> getProbeApps();
+
   boolean mapping(@Param("application") Application appParam);
 
   List<String> getRecentK8sNamespace(@Param("limitSize") Integer limit);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
index d55564111..c472b0cc3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
@@ -149,4 +149,11 @@ public interface ApplicationManageService extends 
IService<Application> {
    */
   List<Application> getByTeamIdAndExecutionModes(
       Long teamId, Collection<ExecutionMode> executionModes);
+
+  /**
+   * Retrieves a list of applications be probing or need to probe.
+   *
+   * @return a list of applications be probing or need to probe.
+   */
+  List<Application> getProbeApps();
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 8f8ada3ac..e89b16117 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -686,6 +686,10 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
                         .collect(Collectors.toSet())));
   }
 
+  public List<Application> getProbeApps() {
+    return this.baseMapper.getProbeApps();
+  }
+
   @Override
   public boolean checkBuildAndUpdate(Application appParam) {
     boolean build = appParam.getBuild();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
new file mode 100644
index 000000000..d19865f34
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.console.core.bean.AlertProbeMsg;
+import org.apache.streampark.console.core.bean.AlertTemplate;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.enums.FlinkAppState;
+import org.apache.streampark.console.core.service.alert.AlertService;
+import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
+import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
+import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
+
+/** This implementation is currently used for probe on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class AutoHealthProbingTask {
+
+  @Autowired private ApplicationManageService applicationManageService;
+
+  @Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
+
+  @Autowired private AlertService alertService;
+
+  /** probe interval every 30 seconds */
+  private static final Duration PROBE_INTERVAL = Duration.ofSeconds(30);
+
+  /** probe wait interval every 5 seconds */
+  private static final Duration PROBE_WAIT_INTERVAL = Duration.ofSeconds(5);
+
+  /** probe failed retry count */
+  private static final Short PROBE_RETRY_COUNT = 10;
+
+  private long lastWatchTime = 0L;
+
+  private Boolean isProbing = false;
+
+  private Short retryAttempts = PROBE_RETRY_COUNT;
+
+  @Scheduled(fixedDelay = 1000)
+  private void schedule() {
+    long timeMillis = System.currentTimeMillis();
+    if (isProbing) {
+      if (timeMillis - lastWatchTime >= PROBE_WAIT_INTERVAL.toMillis()) {
+        handleProbeResults();
+        lastWatchTime = timeMillis;
+      }
+    } else {
+      if (timeMillis - lastWatchTime >= PROBE_INTERVAL.toMillis()) {
+        lastWatchTime = timeMillis;
+        probe(Collections.emptyList());
+      }
+    }
+  }
+
+  public void probe(List<Application> applications) {
+    List<Application> probeApplication =
+        applications.isEmpty() ? applicationManageService.getProbeApps() : 
applications;
+    if (probeApplication.isEmpty()) {
+      log.info("there is no application that needs to be probe");
+      return;
+    }
+    isProbing = true;
+    probeApplication =
+        probeApplication.stream()
+            .filter(application -> 
FlinkAppState.isLost(application.getState()))
+            .collect(Collectors.toList());
+    updateProbingState(probeApplication);
+    probeApplication.stream().forEach(this::monitorApplication);
+  }
+
+  private void updateProbingState(List<Application> applications) {
+    applications.stream()
+        .filter(application -> FlinkAppState.isLost(application.getState()))
+        .forEach(
+            application -> {
+              application.setState(FlinkAppState.PROBING.getValue());
+              application.setProbing(true);
+            });
+    applicationManageService.updateBatchById(applications);
+  }
+
+  private void handleProbeResults() {
+    List<Application> probeApps = applicationManageService.getProbeApps();
+    if (shouldRetry(probeApps)) {
+      probe(probeApps);
+    } else {
+      List<AlertProbeMsg> alertProbeMsgs = generateProbeResults(probeApps);
+      alertProbeMsgs.stream().forEach(this::alert);
+      reset(probeApps);
+    }
+  }
+
+  private void reset(List<Application> applications) {
+    applications.forEach(
+        application -> {
+          application.setProbing(false);
+          application.setTracking(0);
+        });
+    applicationManageService.updateBatchById(applications);
+    retryAttempts = PROBE_RETRY_COUNT;
+    isProbing = false;
+  }
+
+  private void alert(AlertProbeMsg alertProbeMsg) {
+    alertProbeMsg.getAlertId().stream()
+        .forEach((alterId) -> alertService.alert(alterId, 
AlertTemplate.of(alertProbeMsg)));
+  }
+
+  /** statistical probe results */
+  private List<AlertProbeMsg> generateProbeResults(List<Application> 
applications) {
+    if (applications == null || applications.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return new ArrayList<>(
+        applications.stream()
+            .collect(
+                Collectors.groupingBy(
+                    Application::getUserId,
+                    Collectors.collectingAndThen(
+                        Collectors.toList(),
+                        apps -> {
+                          Set<Long> alertIds = new HashSet<>();
+                          AlertProbeMsg alertProbeMsg = new AlertProbeMsg();
+                          apps.forEach(
+                              app -> {
+                                alertProbeMsg.setUser(app.getUserName());
+                                alertProbeMsg.incrementProbeJobs();
+                                if (app.getState() == 
FlinkAppState.LOST.getValue()) {
+                                  alertProbeMsg.incrementLostJobs();
+                                } else if (app.getState() == 
FlinkAppState.FAILED.getValue()) {
+                                  alertProbeMsg.incrementFailedJobs();
+                                } else if (app.getState() == 
FlinkAppState.CANCELED.getValue()) {
+                                  alertProbeMsg.incrementCancelledJobs();
+                                }
+                                alertIds.add(app.getAlertId());
+                              });
+
+                          alertProbeMsg.setAlertId(alertIds);
+                          return alertProbeMsg;
+                        })))
+            .values());
+  }
+
+  private void monitorApplication(Application application) {
+    if (isKubernetesApp(application)) {
+      k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+    } else {
+      FlinkHttpWatcher.doWatching(application);
+    }
+  }
+
+  private Boolean shouldRetry(List<Application> applications) {
+    return applications.stream()
+            .anyMatch(
+                application -> FlinkAppState.LOST.getValue() == 
application.getState().intValue())
+        && (retryAttempts-- > 0);
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index 59eb3f542..d61018564 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -170,6 +170,7 @@ public class FlinkHttpWatcher {
         applicationManageService.list(
             new LambdaQueryWrapper<Application>()
                 .eq(Application::getTracking, 1)
+                .ne(Application::getState, FlinkAppState.LOST.getValue())
                 .notIn(Application::getExecutionMode, 
ExecutionMode.getKubernetesMode()));
     applications.forEach(
         (app) -> {
@@ -801,6 +802,10 @@ public class FlinkHttpWatcher {
    * @param appState application state
    */
   private void doAlert(Application app, FlinkAppState appState) {
+    if (app.getProbing()) {
+      log.info("application with id {} is probing, don't send alert", 
app.getId());
+      return;
+    }
     switch (app.getExecutionModeEnum()) {
       case YARN_APPLICATION:
       case YARN_PER_JOB:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 550a9a41e..3006a1b6b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -39,6 +39,7 @@ import 
org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
 
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
@@ -62,6 +63,7 @@ import static 
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8
  * @link org.apache.streampark.console.core.task.FlinkK8sChangeListenerV2
  */
 @Deprecated
+@Slf4j
 @Component
 public class FlinkK8sChangeEventListener {
 
@@ -107,7 +109,14 @@ public class FlinkK8sChangeEventListener {
         || FlinkAppState.LOST.equals(state)
         || FlinkAppState.RESTARTING.equals(state)
         || FlinkAppState.FINISHED.equals(state)) {
-      executor.execute(() -> alertService.alert(app.getAlertId(), 
AlertTemplate.of(app, state)));
+      executor.execute(
+          () -> {
+            if (app.getProbing()) {
+              log.info("application with id {} is probing, don't send alert", 
app.getId());
+              return;
+            }
+            alertService.alert(app.getAlertId(), AlertTemplate.of(app, state));
+          });
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 00901367e..7840d06e6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.task;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.enums.FlinkAppState;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
@@ -98,6 +99,7 @@ public class FlinkK8sWatcherWrapper {
     final LambdaQueryWrapper<Application> queryWrapper = new 
LambdaQueryWrapper<>();
     queryWrapper
         .eq(Application::getTracking, 1)
+        .ne(Application::getState, FlinkAppState.LOST.getValue())
         .in(Application::getExecutionMode, ExecutionMode.getKubernetesMode());
 
     List<Application> k8sApplication = 
applicationManageService.list(queryWrapper);
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
index 799923c7c..403e3dc17 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
+++ 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
@@ -2,9 +2,17 @@
 
 # ${subject}
 
-### **Dear StreamPark user:**
+
+<#if  type == 1 || type == 2 || type == 3>
+### **Dear StreamPark User:**
 
 > ** Oops! I'm sorry to inform you that something wrong with your app **
+</#if>
+<#if  type == 4 >
+### **Dear StreamPark User: ${user}**
+
+> ** This is the latest auto probe result **
+</#if>
 <#if  type == 1 || type == 2 >
 -   **Job Name:${jobName}**
 </#if>
@@ -35,6 +43,12 @@
 -   **All Jobs:${allJobs}**
 -   **About Affected Jobs:${affectedJobs}**
 </#if>
+<#if  type == 4 >
+-   **Probe Jobs:${probeJobs}**
+-   **Failed Jobs:${failedJobs}**
+-   **Lost Jobs:${lostJobs}**
+-   **Cancelled Jobs:${cancelledJobs}**
+</#if>
 
 > Best Wishes!
 >
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
index 17a44f140..51649c165 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
+++ 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
@@ -836,18 +836,35 @@
                                                                     
border="0px"
                                                                     
width="550px">
                                                                 <tbody>
-                                                                <tr>
-                                                                    <td 
colspan="2">
-                                                                        <br>
-                                                                        <p 
style="margin: 0 0 1em;">
-                                                                            
Dear StreamPark user:
-                                                                        </p>
-                                                                        <p 
style="margin: 0 1em 1em;">
-                                                                            
Oops! I'm sorry to inform you that something wrong with your app
-                                                                        </p>
-                                                                        <br>
-                                                                    </td>
-                                                                </tr>
+                                                                <#if mail.type 
== 1 || mail.type == 2 || mail.type == 3>
+                                                                    <tr>
+                                                                        <td 
colspan="2">
+                                                                            
<br>
+                                                                            <p 
style="margin: 0 0 1em;">
+                                                                               
 Dear StreamPark user:
+                                                                            
</p>
+                                                                            <p 
style="margin: 0 1em 1em;">
+                                                                               
 Oops! I'm sorry to inform you that something wrong with your app
+                                                                            
</p>
+                                                                            
<br>
+                                                                        </td>
+                                                                    </tr>
+                                                                </#if>
+
+                                                                <#if mail.type 
== 4>
+                                                                    <tr>
+                                                                        <td 
colspan="2">
+                                                                            
<br>
+                                                                            <p 
style="margin: 0 0 1em;">
+                                                                               
 Dear StreamPark user: ${mail.user}
+                                                                            
</p>
+                                                                            <p 
style="margin: 0 1em 1em;">
+                                                                               
 This is the latest auto probe result
+                                                                            
</p>
+                                                                            
<br>
+                                                                        </td>
+                                                                    </tr>
+                                                                </#if>
 
                                                                 <#if  
mail.type == 1 >
                                                                     <tr>
@@ -1030,6 +1047,41 @@
 
                                                                 </#if>
 
+                                                                <#if  
mail.type == 4 >
+                                                                    <tr>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px; 
border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
Probe Jobs
+                                                                        </td>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px; 
border-right-color: rgba(169,169,169,.5); border-top-style:solid; 
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
${mail.probeJobs}
+                                                                        </td>
+                                                                    </tr>
+                                                                    <tr>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px; 
border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
Failed Jobs
+                                                                        </td>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px; 
border-right-color: rgba(169,169,169,.5); border-top-style:solid; 
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
<span style="font-size: 1.25em;font-weight: bold;color: 
RED;">${mail.failedJobs}</span>
+                                                                        </td>
+                                                                    </tr>
+                                                                    <tr>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px; 
border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
Lost Jobs
+                                                                        </td>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px; 
border-right-color: rgba(169,169,169,.5); border-top-style:solid; 
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
${mail.lostJobs}
+                                                                        </td>
+                                                                    </tr>
+                                                                    <tr>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px; 
border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
Cancelled Jobs
+                                                                        </td>
+                                                                        <td 
style="border-left-style:solid; border-left-width: 1px; border-left-color: 
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px; 
border-right-color: rgba(169,169,169,.5); border-top-style:solid; 
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+                                                                            
${mail.cancelledJobs}
+                                                                        </td>
+                                                                    </tr>
+                                                                </#if>
+
                                                                 <tr>
                                                                     <td 
colspan="2" style="height: 3rem;">
                                                                         <div 
style="margin: 1.5em 0;">Best Wishes!
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
index fb45eda6f..f9f42e427 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
+++ 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
@@ -12,6 +12,7 @@
         }
       ]
     },
+<#if type == 1 || type == 2 || type == 3>
     {
       "tag": "markdown",
       "content": "**Dear StreamPark user:**"
@@ -25,6 +26,22 @@
         }
       ]
     },
+</#if>
+<#if type == 4>
+    {
+      "tag": "markdown",
+      "content": "**Dear StreamPark user: ${user}**"
+    },
+    {
+      "tag": "note",
+      "elements": [
+        {
+          "tag": "plain_text",
+          "content": "This is the latest auto probe result"
+        }
+      ]
+    },
+</#if>
     {
       "fields": [
 <#if type == 1 || type == 2>
@@ -164,6 +181,36 @@
             "tag": "lark_md"
           }
        }
+</#if>
+<#if  type == 4 >
+       {
+         "is_short": false,
+         "text": {
+           "content": "**Probe Jobs:${probeJobs}**",
+           "tag": "lark_md"
+         }
+       },
+       {
+         "is_short": false,
+         "text": {
+           "content": "**Failed Jobs:${failedJobs}**",
+           "tag": "lark_md"
+         }
+       },
+       {
+         "is_short": false,
+         "text": {
+           "content": "**Lost Jobs:${lostJobs}**",
+           "tag": "lark_md"
+         }
+       },
+       {
+         "is_short": false,
+         "text": {
+           "content": "**Cancelled Jobs:${cancelledJobs}**",
+           "tag": "lark_md"
+         }
+       }
 </#if>
       ],
       "tag": "div"
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
index 731d1adaa..143aa4ea6 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
+++ 
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
@@ -1,11 +1,17 @@
 > Apache StreamPark , Make stream processing easier!
 
 # ${subject}
-
+<#if  type == 1 || type == 2 || type == 3>
 ### **Dear StreamPark user:**
 
 `Oops! I'm sorry to inform you that something wrong with your app`
-<#if  type == 1 || type ==2 >
+</#if>
+<#if  type == 4>
+### **Dear StreamPark user: ${user}**
+
+`This is the latest auto probe result`
+</#if>
+<#if  type == 1 || type == 2 >
 -   **Job Name:${jobName}**
 </#if>
 <#if  type == 3 >
@@ -35,6 +41,12 @@
 -   **All Jobs:${allJobs}**
 -   **About Affected Jobs:${affectedJobs}**
 </#if>
+<#if  type == 4 >
+-   **Probe Jobs:${probeJobs}**
+-   **Failed Jobs:${failedJobs}**
+-   **Lost Jobs:${lostJobs}**
+-   **Cancelled Jobs:${cancelledJobs}**
+</#if>
 
 > Best Wishes!
 > Apache StreamPark
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index f6b88d4b4..a509afa5e 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -95,6 +95,7 @@ create table if not exists `t_flink_app` (
   `ingress_template` text ,
   `default_mode_ingress` text ,
   `tags` varchar(500) default null,
+  `probing` tinyint default 0,
   primary key(`id`)
 );
 
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 045a8f850..c25a504a4 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -76,6 +76,7 @@
         <result column="rest_url" jdbcType="VARCHAR" property="restUrl"/>
         <result column="rest_port" jdbcType="INTEGER" property="restPort"/>
         <result column="tags" jdbcType="VARCHAR" property="tags"/>
+        <result column="probing" jdbcType="INTEGER" property="probing"/>
     </resultMap>
 
     <update id="resetOptionState">
@@ -254,6 +255,21 @@
         where t.team_id=#{teamId}
     </select>
 
+    <select id="getProbeApps" 
resultType="org.apache.streampark.console.core.entity.Application">
+        select
+            t.*,
+            u.username,
+            case
+                when trim(u.nick_name) = ''
+                    then u.username
+                else u.nick_name
+                end as nick_name
+        from t_flink_app t
+                 inner join t_user u
+                            on t.user_id = u.user_id
+        where (t.tracking = 1 and t.state = 13) or t.probing = 1
+    </select>
+
     <update id="mapping" 
parameterType="org.apache.streampark.console.core.entity.Application">
         update t_flink_app
         <set>
diff --git 
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts 
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index 6301878d8..49e7b8093 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -133,6 +133,8 @@ export enum AppStateEnum {
   /** job SUCCEEDED on yarn */
   SUCCEEDED = 20,
   /** has killed in Yarn */
+  PROBING = 21,
+  /** Job auto Health probe */
   KILLED = -9,
 }
 
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.less
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.less
index 1ba9175bb..74c6995d3 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.less
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.less
@@ -47,6 +47,10 @@
   animation: reconciling-color 800ms ease-out infinite alternate;
 }
 
+.status-processing-probing {
+  animation: probing-color 800ms ease-out infinite alternate;
+}
+
 @keyframes deploying-color {
   0% {
     border-color: #1abbdc;
@@ -142,3 +146,15 @@
     box-shadow: 0 0 10px #eb2f96, inset 0 0 5px #eb2f96;
   }
 }
+
+@keyframes probing-color {
+  0% {
+    border-color: #2febc9;
+    box-shadow: 0 0 1px #2febc9, inset 0 0 2px #2febc9;
+  }
+
+  100% {
+    border-color: #2febc9;
+    box-shadow: 0 0 10px #2febc9, inset 0 0 5px #2febc9;
+  }
+}
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
index 7e4f34aa4..c6d9224de 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
@@ -71,6 +71,11 @@ const stateMap = {
     class: 'status-processing-initializing',
   },
   [AppStateEnum.TERMINATED]: { color: '#8E50FF', title: 'TERMINATED' },
+  [AppStateEnum.PROBING]: {
+    color: '#2febc9',
+    title: 'PROBING',
+    class: 'status-processing-probing',
+  },
 };
 /*  option state map*/
 const optionStateMap = {


Reply via email to