This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 332c6756b [Feature] StreamPark Autonomous Health Probing (#3036)
332c6756b is described below
commit 332c6756be4d2942929a1e6e5f15fe4ae8e3098e
Author: xujiangfeng001 <[email protected]>
AuthorDate: Sat Sep 16 23:15:28 2023 +0800
[Feature] StreamPark Autonomous Health Probing (#3036)
* [Feature] StreamPark Autonomous Health Probing
---
.../main/assembly/script/schema/mysql-schema.sql | 1 +
.../main/assembly/script/schema/pgsql-schema.sql | 3 +-
.../main/assembly/script/upgrade/mysql/2.2.0.sql | 3 +
.../main/assembly/script/upgrade/pgsql/2.2.0.sql | 3 +
.../console/core/bean/AlertProbeMsg.java | 56 ++++++
.../console/core/bean/AlertTemplate.java | 43 +++++
.../console/core/entity/Application.java | 3 +-
.../console/core/enums/FlinkAppState.java | 8 +
.../console/core/mapper/ApplicationMapper.java | 2 +
.../application/ApplicationManageService.java | 7 +
.../impl/ApplicationManageServiceImpl.java | 4 +
.../console/core/task/AutoHealthProbingTask.java | 189 +++++++++++++++++++++
.../console/core/task/FlinkHttpWatcher.java | 5 +
.../core/task/FlinkK8sChangeEventListener.java | 11 +-
.../console/core/task/FlinkK8sWatcherWrapper.java | 2 +
.../resources/alert-template/alert-dingTalk.ftl | 16 +-
.../main/resources/alert-template/alert-email.ftl | 76 +++++++--
.../main/resources/alert-template/alert-lark.ftl | 47 +++++
.../main/resources/alert-template/alert-weCom.ftl | 16 +-
.../src/main/resources/db/schema-h2.sql | 1 +
.../resources/mapper/core/ApplicationMapper.xml | 16 ++
.../src/enums/flinkEnum.ts | 2 +
.../src/views/flink/app/components/State.less | 16 ++
.../src/views/flink/app/components/State.tsx | 5 +
24 files changed, 517 insertions(+), 18 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 08a1693a3..8fd8c1831 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -102,6 +102,7 @@ create table `t_flink_app` (
`ingress_template` text collate utf8mb4_general_ci,
`default_mode_ingress` text collate utf8mb4_general_ci,
`tags` varchar(500) default null,
+ `probing` tinyint default 0,
primary key (`id`) using btree,
key `inx_job_type` (`job_type`) using btree,
key `inx_track` (`tracking`) using btree,
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index eef89390c..81dc67f4f 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -249,7 +249,8 @@ create table "public"."t_flink_app" (
"flink_cluster_id" int8,
"ingress_template" text collate "pg_catalog"."default",
"default_mode_ingress" text collate "pg_catalog"."default",
- "tags" varchar(500) collate "pg_catalog"."default"
+ "tags" varchar(500) collate "pg_catalog"."default",
+ "probing" boolean default false
)
;
alter table "public"."t_flink_app" add constraint "t_flink_app_pkey" primary
key ("id");
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index f428f9216..79a890def 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -43,6 +43,9 @@ unique key `un_team_vcode_inx` (`team_id`,`resource_name`)
using btree
alter table `t_flink_sql`
add column `team_resource` varchar(64) default null;
+alter table `t_flink_app`
+ add column `probing` tinyint default 0;
+
alter table `t_flink_cluster`
add column `job_manager_url` varchar(150) default null comment 'url
address of jobmanager' after `address`,
add column `start_time` datetime default null comment 'start time',
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index 4e7a43b2f..508c8cc07 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -59,6 +59,9 @@ create index "un_team_dname_inx" on "public"."t_resource"
using btree (
alter table "public"."t_flink_sql"
add column "team_resource" varchar(64) default null;
+alter table "public"."t_flink_app"
+ add column "probing" boolean default false;
+
alter table "public"."t_flink_cluster"
add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
add column "start_time" timestamp(6) collate "pg_catalog"."default",
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
new file mode 100644
index 000000000..d42537746
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Set;
+
+@NoArgsConstructor
+@Data
+public class AlertProbeMsg {
+
+ private Set<Long> alertId;
+
+ private String user;
+
+ private Integer probeJobs = 0;
+
+ private Integer failedJobs = 0;
+
+ private Integer lostJobs = 0;
+
+ private Integer cancelledJobs = 0;
+
+ public void incrementProbeJobs() {
+ this.probeJobs++;
+ }
+
+ public void incrementFailedJobs() {
+ this.failedJobs++;
+ }
+
+ public void incrementLostJobs() {
+ this.lostJobs++;
+ }
+
+ public void incrementCancelledJobs() {
+ this.cancelledJobs++;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index 733fa020c..be407078a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -51,6 +51,11 @@ public class AlertTemplate implements Serializable {
private boolean atAll = false;
private Integer allJobs;
private Integer affectedJobs;
+ private String user;
+ private Integer probeJobs;
+ private Integer failedJobs;
+ private Integer lostJobs;
+ private Integer cancelledJobs;
public static AlertTemplate of(Application application, FlinkAppState
appState) {
return new AlertTemplateBuilder()
@@ -102,6 +107,19 @@ public class AlertTemplate implements Serializable {
.build();
}
+ public static AlertTemplate of(AlertProbeMsg alertProbeMsg) {
+ return new AlertTemplateBuilder()
+ .setType(4)
+ .setUser(alertProbeMsg.getUser())
+ .setProbeJobs(alertProbeMsg.getProbeJobs())
+ .setFailedJobs(alertProbeMsg.getFailedJobs())
+ .setLostJobs(alertProbeMsg.getLostJobs())
+ .setCancelledJobs(alertProbeMsg.getCancelledJobs())
+ .setSubject("StreamPark Alert: PROBE")
+ .setTitle("PROBE")
+ .build();
+ }
+
private static class AlertTemplateBuilder {
private final AlertTemplate alertTemplate = new AlertTemplate();
@@ -229,6 +247,31 @@ public class AlertTemplate implements Serializable {
return this;
}
+ public AlertTemplateBuilder setUser(String user) {
+ alertTemplate.setUser(user);
+ return this;
+ }
+
+ public AlertTemplateBuilder setProbeJobs(Integer probeJobs) {
+ alertTemplate.setProbeJobs(probeJobs);
+ return this;
+ }
+
+ public AlertTemplateBuilder setFailedJobs(Integer failedJobs) {
+ alertTemplate.setFailedJobs(failedJobs);
+ return this;
+ }
+
+ public AlertTemplateBuilder setLostJobs(Integer lostJobs) {
+ alertTemplate.setLostJobs(lostJobs);
+ return this;
+ }
+
+ public AlertTemplateBuilder setCancelledJobs(Integer cancelledJobs) {
+ alertTemplate.setCancelledJobs(cancelledJobs);
+ return this;
+ }
+
public AlertTemplate build() {
return this.alertTemplate;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 049af6f13..f0b283be1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -211,6 +211,8 @@ public class Application implements Serializable {
private String tags;
+ private Boolean probing = false;
+
/** running job */
private transient JobsOverview.Task overview;
@@ -308,7 +310,6 @@ public class Application implements Serializable {
case CANCELED:
case TERMINATED:
case POS_TERMINATED:
- case LOST:
return 0;
default:
return 1;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 84bf447f3..59dc4c6f5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -99,6 +99,9 @@ public enum FlinkAppState {
/** Job SUCCEEDED on yarn. */
SUCCEEDED(20),
+ /** Job auto Health probe */
+ PROBING(21),
+
/** Has killed in Yarn. */
KILLED(-9);
@@ -137,6 +140,11 @@ public enum FlinkAppState {
|| FlinkAppState.TERMINATED == flinkAppState;
}
+ public static boolean isLost(Integer appState) {
+ FlinkAppState flinkAppState = FlinkAppState.of(appState);
+ return FlinkAppState.LOST == flinkAppState;
+ }
+
/**
* Type conversion bridging Deprecated, see {@link
* org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 3744efa84..7076b5786 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -37,6 +37,8 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
List<Application> getByTeamId(@Param("teamId") Long teamId);
+ List<Application> getProbeApps();
+
boolean mapping(@Param("application") Application appParam);
List<String> getRecentK8sNamespace(@Param("limitSize") Integer limit);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
index d55564111..c472b0cc3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
@@ -149,4 +149,11 @@ public interface ApplicationManageService extends
IService<Application> {
*/
List<Application> getByTeamIdAndExecutionModes(
Long teamId, Collection<ExecutionMode> executionModes);
+
+ /**
+ * Retrieves a list of applications be probing or need to probe.
+ *
+ * @return a list of applications be probing or need to probe.
+ */
+ List<Application> getProbeApps();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 8f8ada3ac..e89b16117 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -686,6 +686,10 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
.collect(Collectors.toSet())));
}
+ public List<Application> getProbeApps() {
+ return this.baseMapper.getProbeApps();
+ }
+
@Override
public boolean checkBuildAndUpdate(Application appParam) {
boolean build = appParam.getBuild();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
new file mode 100644
index 000000000..d19865f34
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.console.core.bean.AlertProbeMsg;
+import org.apache.streampark.console.core.bean.AlertTemplate;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.enums.FlinkAppState;
+import org.apache.streampark.console.core.service.alert.AlertService;
+import
org.apache.streampark.console.core.service.application.ApplicationManageService;
+import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
+import static
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
+
+/** This implementation is currently used for probe on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class AutoHealthProbingTask {
+
+ @Autowired private ApplicationManageService applicationManageService;
+
+ @Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
+
+ @Autowired private AlertService alertService;
+
+ /** probe interval every 30 seconds */
+ private static final Duration PROBE_INTERVAL = Duration.ofSeconds(30);
+
+ /** probe wait interval every 5 seconds */
+ private static final Duration PROBE_WAIT_INTERVAL = Duration.ofSeconds(5);
+
+ /** probe failed retry count */
+ private static final Short PROBE_RETRY_COUNT = 10;
+
+ private long lastWatchTime = 0L;
+
+ private Boolean isProbing = false;
+
+ private Short retryAttempts = PROBE_RETRY_COUNT;
+
+ @Scheduled(fixedDelay = 1000)
+ private void schedule() {
+ long timeMillis = System.currentTimeMillis();
+ if (isProbing) {
+ if (timeMillis - lastWatchTime >= PROBE_WAIT_INTERVAL.toMillis()) {
+ handleProbeResults();
+ lastWatchTime = timeMillis;
+ }
+ } else {
+ if (timeMillis - lastWatchTime >= PROBE_INTERVAL.toMillis()) {
+ lastWatchTime = timeMillis;
+ probe(Collections.emptyList());
+ }
+ }
+ }
+
+ public void probe(List<Application> applications) {
+ List<Application> probeApplication =
+ applications.isEmpty() ? applicationManageService.getProbeApps() :
applications;
+ if (probeApplication.isEmpty()) {
+ log.info("there is no application that needs to be probe");
+ return;
+ }
+ isProbing = true;
+ probeApplication =
+ probeApplication.stream()
+ .filter(application ->
FlinkAppState.isLost(application.getState()))
+ .collect(Collectors.toList());
+ updateProbingState(probeApplication);
+ probeApplication.stream().forEach(this::monitorApplication);
+ }
+
+ private void updateProbingState(List<Application> applications) {
+ applications.stream()
+ .filter(application -> FlinkAppState.isLost(application.getState()))
+ .forEach(
+ application -> {
+ application.setState(FlinkAppState.PROBING.getValue());
+ application.setProbing(true);
+ });
+ applicationManageService.updateBatchById(applications);
+ }
+
+ private void handleProbeResults() {
+ List<Application> probeApps = applicationManageService.getProbeApps();
+ if (shouldRetry(probeApps)) {
+ probe(probeApps);
+ } else {
+ List<AlertProbeMsg> alertProbeMsgs = generateProbeResults(probeApps);
+ alertProbeMsgs.stream().forEach(this::alert);
+ reset(probeApps);
+ }
+ }
+
+ private void reset(List<Application> applications) {
+ applications.forEach(
+ application -> {
+ application.setProbing(false);
+ application.setTracking(0);
+ });
+ applicationManageService.updateBatchById(applications);
+ retryAttempts = PROBE_RETRY_COUNT;
+ isProbing = false;
+ }
+
+ private void alert(AlertProbeMsg alertProbeMsg) {
+ alertProbeMsg.getAlertId().stream()
+ .forEach((alterId) -> alertService.alert(alterId,
AlertTemplate.of(alertProbeMsg)));
+ }
+
+ /** statistical probe results */
+ private List<AlertProbeMsg> generateProbeResults(List<Application>
applications) {
+ if (applications == null || applications.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<>(
+ applications.stream()
+ .collect(
+ Collectors.groupingBy(
+ Application::getUserId,
+ Collectors.collectingAndThen(
+ Collectors.toList(),
+ apps -> {
+ Set<Long> alertIds = new HashSet<>();
+ AlertProbeMsg alertProbeMsg = new AlertProbeMsg();
+ apps.forEach(
+ app -> {
+ alertProbeMsg.setUser(app.getUserName());
+ alertProbeMsg.incrementProbeJobs();
+ if (app.getState() ==
FlinkAppState.LOST.getValue()) {
+ alertProbeMsg.incrementLostJobs();
+ } else if (app.getState() ==
FlinkAppState.FAILED.getValue()) {
+ alertProbeMsg.incrementFailedJobs();
+ } else if (app.getState() ==
FlinkAppState.CANCELED.getValue()) {
+ alertProbeMsg.incrementCancelledJobs();
+ }
+ alertIds.add(app.getAlertId());
+ });
+
+ alertProbeMsg.setAlertId(alertIds);
+ return alertProbeMsg;
+ })))
+ .values());
+ }
+
+ private void monitorApplication(Application application) {
+ if (isKubernetesApp(application)) {
+ k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ } else {
+ FlinkHttpWatcher.doWatching(application);
+ }
+ }
+
+ private Boolean shouldRetry(List<Application> applications) {
+ return applications.stream()
+ .anyMatch(
+ application -> FlinkAppState.LOST.getValue() ==
application.getState().intValue())
+ && (retryAttempts-- > 0);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index 59eb3f542..d61018564 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -170,6 +170,7 @@ public class FlinkHttpWatcher {
applicationManageService.list(
new LambdaQueryWrapper<Application>()
.eq(Application::getTracking, 1)
+ .ne(Application::getState, FlinkAppState.LOST.getValue())
.notIn(Application::getExecutionMode,
ExecutionMode.getKubernetesMode()));
applications.forEach(
(app) -> {
@@ -801,6 +802,10 @@ public class FlinkHttpWatcher {
* @param appState application state
*/
private void doAlert(Application app, FlinkAppState appState) {
+ if (app.getProbing()) {
+ log.info("application with id {} is probing, don't send alert",
app.getId());
+ return;
+ }
switch (app.getExecutionModeEnum()) {
case YARN_APPLICATION:
case YARN_PER_JOB:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 550a9a41e..3006a1b6b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -39,6 +39,7 @@ import
org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@@ -62,6 +63,7 @@ import static
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8
* @link org.apache.streampark.console.core.task.FlinkK8sChangeListenerV2
*/
@Deprecated
+@Slf4j
@Component
public class FlinkK8sChangeEventListener {
@@ -107,7 +109,14 @@ public class FlinkK8sChangeEventListener {
|| FlinkAppState.LOST.equals(state)
|| FlinkAppState.RESTARTING.equals(state)
|| FlinkAppState.FINISHED.equals(state)) {
- executor.execute(() -> alertService.alert(app.getAlertId(),
AlertTemplate.of(app, state)));
+ executor.execute(
+ () -> {
+ if (app.getProbing()) {
+ log.info("application with id {} is probing, don't send alert",
app.getId());
+ return;
+ }
+ alertService.alert(app.getAlertId(), AlertTemplate.of(app, state));
+ });
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 00901367e..7840d06e6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.task;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.enums.FlinkAppState;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
@@ -98,6 +99,7 @@ public class FlinkK8sWatcherWrapper {
final LambdaQueryWrapper<Application> queryWrapper = new
LambdaQueryWrapper<>();
queryWrapper
.eq(Application::getTracking, 1)
+ .ne(Application::getState, FlinkAppState.LOST.getValue())
.in(Application::getExecutionMode, ExecutionMode.getKubernetesMode());
List<Application> k8sApplication =
applicationManageService.list(queryWrapper);
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
index 799923c7c..403e3dc17 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
@@ -2,9 +2,17 @@
# ${subject}
-### **Dear StreamPark user:**
+
+<#if type == 1 || type == 2 || type == 3>
+### **Dear StreamPark User:**
> ** Oops! I'm sorry to inform you that something wrong with your app **
+</#if>
+<#if type == 4 >
+### **Dear StreamPark User: ${user}**
+
+> ** This is the latest auto probe result **
+</#if>
<#if type == 1 || type == 2 >
- **Job Name:${jobName}**
</#if>
@@ -35,6 +43,12 @@
- **All Jobs:${allJobs}**
- **About Affected Jobs:${affectedJobs}**
</#if>
+<#if type == 4 >
+- **Probe Jobs:${probeJobs}**
+- **Failed Jobs:${failedJobs}**
+- **Lost Jobs:${lostJobs}**
+- **Cancelled Jobs:${cancelledJobs}**
+</#if>
> Best Wishes!
>
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
index 17a44f140..51649c165 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
@@ -836,18 +836,35 @@
border="0px"
width="550px">
<tbody>
- <tr>
- <td
colspan="2">
- <br>
- <p
style="margin: 0 0 1em;">
-
Dear StreamPark user:
- </p>
- <p
style="margin: 0 1em 1em;">
-
Oops! I'm sorry to inform you that something wrong with your app
- </p>
- <br>
- </td>
- </tr>
+ <#if mail.type
== 1 || mail.type == 2 || mail.type == 3>
+ <tr>
+ <td
colspan="2">
+
<br>
+ <p
style="margin: 0 0 1em;">
+
Dear StreamPark user:
+
</p>
+ <p
style="margin: 0 1em 1em;">
+
Oops! I'm sorry to inform you that something wrong with your app
+
</p>
+
<br>
+ </td>
+ </tr>
+ </#if>
+
+ <#if mail.type
== 4>
+ <tr>
+ <td
colspan="2">
+
<br>
+ <p
style="margin: 0 0 1em;">
+
Dear StreamPark user: ${mail.user}
+
</p>
+ <p
style="margin: 0 1em 1em;">
+
This is the latest auto probe result
+
</p>
+
<br>
+ </td>
+ </tr>
+ </#if>
<#if
mail.type == 1 >
<tr>
@@ -1030,6 +1047,41 @@
</#if>
+ <#if
mail.type == 4 >
+ <tr>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px;
border-top-color: rgba(169,169,169,.5); padding: 1em">
+
Probe Jobs
+ </td>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px;
border-right-color: rgba(169,169,169,.5); border-top-style:solid;
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+
${mail.probeJobs}
+ </td>
+ </tr>
+ <tr>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px;
border-top-color: rgba(169,169,169,.5); padding: 1em">
+
Failed Jobs
+ </td>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px;
border-right-color: rgba(169,169,169,.5); border-top-style:solid;
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+
<span style="font-size: 1.25em;font-weight: bold;color:
RED;">${mail.failedJobs}</span>
+ </td>
+ </tr>
+ <tr>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px;
border-top-color: rgba(169,169,169,.5); padding: 1em">
+
Lost Jobs
+ </td>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px;
border-right-color: rgba(169,169,169,.5); border-top-style:solid;
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+
${mail.lostJobs}
+ </td>
+ </tr>
+ <tr>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5); border-top-style:solid; border-top-width: 1px;
border-top-color: rgba(169,169,169,.5); padding: 1em">
+
Cancelled Jobs
+ </td>
+ <td
style="border-left-style:solid; border-left-width: 1px; border-left-color:
rgba(169,169,169,.5);border-right-style:solid; border-right-width: 1px;
border-right-color: rgba(169,169,169,.5); border-top-style:solid;
border-top-width: 1px; border-top-color: rgba(169,169,169,.5); padding: 1em">
+
${mail.cancelledJobs}
+ </td>
+ </tr>
+ </#if>
+
<tr>
<td
colspan="2" style="height: 3rem;">
<div
style="margin: 1.5em 0;">Best Wishes!
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
index fb45eda6f..f9f42e427 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
@@ -12,6 +12,7 @@
}
]
},
+<#if type == 1 || type == 2 || type == 3>
{
"tag": "markdown",
"content": "**Dear StreamPark user:**"
@@ -25,6 +26,22 @@
}
]
},
+</#if>
+<#if type == 4>
+ {
+ "tag": "markdown",
+ "content": "**Dear StreamPark user: ${user}**"
+ },
+ {
+ "tag": "note",
+ "elements": [
+ {
+ "tag": "plain_text",
+ "content": "This is the latest auto probe result"
+ }
+ ]
+ },
+</#if>
{
"fields": [
<#if type == 1 || type == 2>
@@ -164,6 +181,36 @@
"tag": "lark_md"
}
}
+</#if>
+<#if type == 4 >
+ {
+ "is_short": false,
+ "text": {
+ "content": "**Probe Jobs:${probeJobs}**",
+ "tag": "lark_md"
+ }
+ },
+ {
+ "is_short": false,
+ "text": {
+ "content": "**Failed Jobs:${failedJobs}**",
+ "tag": "lark_md"
+ }
+ },
+ {
+ "is_short": false,
+ "text": {
+ "content": "**Lost Jobs:${lostJobs}**",
+ "tag": "lark_md"
+ }
+ },
+ {
+ "is_short": false,
+ "text": {
+ "content": "**Cancelled Jobs:${cancelledJobs}**",
+ "tag": "lark_md"
+ }
+ }
</#if>
],
"tag": "div"
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
index 731d1adaa..143aa4ea6 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
@@ -1,11 +1,17 @@
> Apache StreamPark , Make stream processing easier!
# ${subject}
-
+<#if type == 1 || type == 2 || type == 3>
### **Dear StreamPark user:**
`Oops! I'm sorry to inform you that something wrong with your app`
-<#if type == 1 || type ==2 >
+</#if>
+<#if type == 4>
+### **Dear StreamPark user: ${user}**
+
+`This is the latest auto probe result`
+</#if>
+<#if type == 1 || type == 2 >
- **Job Name:${jobName}**
</#if>
<#if type == 3 >
@@ -35,6 +41,12 @@
- **All Jobs:${allJobs}**
- **About Affected Jobs:${affectedJobs}**
</#if>
+<#if type == 4 >
+- **Probe Jobs:${probeJobs}**
+- **Failed Jobs:${failedJobs}**
+- **Lost Jobs:${lostJobs}**
+- **Cancelled Jobs:${cancelledJobs}**
+</#if>
> Best Wishes!
> Apache StreamPark
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index f6b88d4b4..a509afa5e 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -95,6 +95,7 @@ create table if not exists `t_flink_app` (
`ingress_template` text ,
`default_mode_ingress` text ,
`tags` varchar(500) default null,
+ `probing` tinyint default 0,
primary key(`id`)
);
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 045a8f850..c25a504a4 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -76,6 +76,7 @@
<result column="rest_url" jdbcType="VARCHAR" property="restUrl"/>
<result column="rest_port" jdbcType="INTEGER" property="restPort"/>
<result column="tags" jdbcType="VARCHAR" property="tags"/>
+ <result column="probing" jdbcType="INTEGER" property="probing"/>
</resultMap>
<update id="resetOptionState">
@@ -254,6 +255,21 @@
where t.team_id=#{teamId}
</select>
+ <select id="getProbeApps"
resultType="org.apache.streampark.console.core.entity.Application">
+ select
+ t.*,
+ u.username,
+ case
+ when trim(u.nick_name) = ''
+ then u.username
+ else u.nick_name
+ end as nick_name
+ from t_flink_app t
+ inner join t_user u
+ on t.user_id = u.user_id
+ where (t.tracking = 1 and t.state = 13) or t.probing = 1
+ </select>
+
<update id="mapping"
parameterType="org.apache.streampark.console.core.entity.Application">
update t_flink_app
<set>
diff --git
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index 6301878d8..49e7b8093 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -133,6 +133,8 @@ export enum AppStateEnum {
/** job SUCCEEDED on yarn */
SUCCEEDED = 20,
/** has killed in Yarn */
+ PROBING = 21,
+ /** Job auto Health probe */
KILLED = -9,
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.less
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.less
index 1ba9175bb..74c6995d3 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.less
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.less
@@ -47,6 +47,10 @@
animation: reconciling-color 800ms ease-out infinite alternate;
}
+.status-processing-probing {
+ animation: probing-color 800ms ease-out infinite alternate;
+}
+
@keyframes deploying-color {
0% {
border-color: #1abbdc;
@@ -142,3 +146,15 @@
box-shadow: 0 0 10px #eb2f96, inset 0 0 5px #eb2f96;
}
}
+
+@keyframes probing-color {
+ 0% {
+ border-color: #2febc9;
+ box-shadow: 0 0 1px #2febc9, inset 0 0 2px #2febc9;
+ }
+
+ 100% {
+ border-color: #2febc9;
+ box-shadow: 0 0 10px #2febc9, inset 0 0 5px #2febc9;
+ }
+}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
index 7e4f34aa4..c6d9224de 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
@@ -71,6 +71,11 @@ const stateMap = {
class: 'status-processing-initializing',
},
[AppStateEnum.TERMINATED]: { color: '#8E50FF', title: 'TERMINATED' },
+ [AppStateEnum.PROBING]: {
+ color: '#2febc9',
+ title: 'PROBING',
+ class: 'status-processing-probing',
+ },
};
/* option state map*/
const optionStateMap = {