This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 58709cd53 [improvement] improvement flink cluster alert (#2875)
58709cd53 is described below
commit 58709cd53b9d2db2f130ad8556230beebd6dd2d0
Author: xujiangfeng001 <[email protected]>
AuthorDate: Thu Aug 17 08:58:55 2023 +0800
[improvement] improvement flink cluster alert (#2875)
* [improvement] improvement flink cluster alert
---
.../streampark/common/enums/ClusterState.java | 2 +-
.../streampark/common/conf/CommonConfig.scala | 7 ++
.../console/core/bean/AlertTemplate.java | 13 +++-
.../core/controller/FlinkClusterController.java | 7 +-
.../console/core/entity/FlinkCluster.java | 5 +-
.../console/core/mapper/ApplicationMapper.java | 3 +
.../console/core/service/ApplicationService.java | 2 +
.../console/core/service/FlinkClusterService.java | 2 +
.../core/service/impl/ApplicationServiceImpl.java | 5 ++
.../core/service/impl/FlinkClusterServiceImpl.java | 36 +++++-----
.../console/core/task/FlinkClusterWatcher.java | 7 +-
.../resources/alert-template/alert-dingTalk.ftl | 3 +-
.../main/resources/alert-template/alert-email.ftl | 11 +++-
.../main/resources/alert-template/alert-lark.ftl | 9 ++-
.../main/resources/alert-template/alert-weCom.ftl | 3 +-
.../resources/mapper/core/ApplicationMapper.xml | 18 +++++
.../src/enums/flinkEnum.ts | 8 ++-
.../src/locales/lang/en/setting/flinkCluster.ts | 1 +
.../src/locales/lang/zh-CN/setting/flinkCluster.ts | 1 +
.../src/views/flink/app/Add.vue | 2 +-
.../src/views/flink/app/EditStreamPark.vue | 2 +-
.../flink/app/hooks/useCreateAndEditSchema.ts | 4 +-
.../src/views/setting/FlinkCluster/State.less | 64 ++++++++++++++++++
.../src/views/setting/FlinkCluster/State.tsx | 77 ++++++++++++++++++++++
.../src/views/setting/FlinkCluster/index.vue | 22 ++++++-
25 files changed, 279 insertions(+), 35 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 2aef409c1..cd16e15a3 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -32,7 +32,7 @@ public enum ClusterState implements Serializable {
/** cluster unknown */
UNKNOWN(4),
STARTING(5),
- CANCELING(6),
+ CANCELLING(6),
FAILED(7),
KILLED(8);
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
index 18d53a35f..e9178bc19 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
@@ -111,4 +111,11 @@ object CommonConfig {
classType = classOf[String],
description = "The maximum size of the default read log")
+ val SPRING_PROFILES_ACTIVE: InternalOption = InternalOption(
+ key = "spring.profiles.active",
+ defaultValue = "h2",
+ classType = classOf[String],
+ description = "Use the database type"
+ )
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index b9a148349..cff27da10 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -49,6 +49,7 @@ public class AlertTemplate implements Serializable {
private Integer restartIndex;
private Integer totalRestart;
private boolean atAll = false;
+ private Integer allJobs;
private Integer affectedJobs;
public static AlertTemplate of(Application application, FlinkAppState
appState) {
@@ -96,7 +97,8 @@ public class AlertTemplate implements Serializable {
.setSubject(
String.format("StreamPark Alert: %s %s", cluster.getClusterName(),
clusterState))
.setStatus(clusterState.name())
- .setAffectedJobs(cluster.getJobs())
+ .setAllJobs(cluster.getAllJobs())
+ .setAffectedJobs(cluster.getAffectedJobs())
.build();
}
@@ -217,8 +219,13 @@ public class AlertTemplate implements Serializable {
return this;
}
- public AlertTemplateBuilder setAffectedJobs(Integer jobs) {
- alertTemplate.setAffectedJobs(jobs);
+ public AlertTemplateBuilder setAllJobs(Integer allJobs) {
+ alertTemplate.setAllJobs(allJobs);
+ return this;
+ }
+
+ public AlertTemplateBuilder setAffectedJobs(Integer affectedJobs) {
+ alertTemplate.setAffectedJobs(affectedJobs);
return this;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 148cd554b..a2b0048c2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.controller;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.bean.ResponseResult;
@@ -92,6 +93,7 @@ public class FlinkClusterController {
@Operation(summary = "Start flink cluster")
@PostMapping("start")
public RestResponse start(FlinkCluster cluster) {
+ flinkClusterService.updateClusterState(cluster.getId(),
ClusterState.STARTING);
flinkClusterService.start(cluster);
return RestResponse.success();
}
@@ -99,7 +101,10 @@ public class FlinkClusterController {
@Operation(summary = "Shutdown flink cluster")
@PostMapping("shutdown")
public RestResponse shutdown(FlinkCluster cluster) {
- flinkClusterService.shutdown(cluster);
+ if (flinkClusterService.allowShutdownCluster(cluster)) {
+ flinkClusterService.updateClusterState(cluster.getId(),
ClusterState.CANCELLING);
+ flinkClusterService.shutdown(cluster);
+ }
return RestResponse.success();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index d48c17861..23a7ec726 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -106,12 +106,15 @@ public class FlinkCluster implements Serializable {
private Date startTime;
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
private Date endTime;
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer alertId;
- private transient Integer jobs = 0;
+ private transient Integer allJobs = 0;
+
+ private transient Integer affectedJobs = 0;
@JsonIgnore
public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 55c2ee72f..033f7f106 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -67,4 +67,7 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
Integer countJobsByClusterId(@Param("clusterId") Long clusterId);
+
+ Integer countAffectedJobsByClusterId(
+ @Param("clusterId") Long clusterId, @Param("dbType") String dbType);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 8e51c155f..97f9b0bcc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -104,6 +104,8 @@ public interface ApplicationService extends
IService<Application> {
Integer countJobsByClusterId(Long clusterId);
+ Integer countAffectedJobsByClusterId(Long clusterId, String dbType);
+
boolean existsJobByFlinkEnvId(Long flinkEnvId);
List<String> getRecentK8sNamespace();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 5f47c60a5..18393545a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -41,6 +41,8 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
void shutdown(FlinkCluster flinkCluster);
+ Boolean allowShutdownCluster(FlinkCluster flinkCluster);
+
Boolean existsByClusterId(String clusterId, Long id);
Boolean existsByClusterName(String clusterName, Long id);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index ee2d996d5..65bee297b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -532,6 +532,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return baseMapper.countJobsByClusterId(clusterId);
}
+ @Override
+ public Integer countAffectedJobsByClusterId(Long clusterId, String dbType) {
+ return baseMapper.countAffectedJobsByClusterId(clusterId, dbType);
+ }
+
@Override
public boolean existsJobByFlinkEnvId(Long flinkEnvId) {
LambdaQueryWrapper<Application> lambdaQueryWrapper =
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 7111e2758..f5780124f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -161,7 +161,6 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Transactional(rollbackFor = {Exception.class})
public void start(FlinkCluster cluster) {
FlinkCluster flinkCluster = getById(cluster.getId());
- updateClusterState(cluster.getId(), ClusterState.STARTING);
try {
DeployResponse deployResponse = deployInternal(flinkCluster);
ApiAlertException.throwIfNull(
@@ -227,23 +226,10 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Override
public void shutdown(FlinkCluster cluster) {
FlinkCluster flinkCluster = this.getById(cluster.getId());
- // 1) check mode
- String clusterId = flinkCluster.getClusterId();
- ApiAlertException.throwIfTrue(
- StringUtils.isBlank(clusterId), "The clusterId can not be empty!");
-
- // 2) check cluster is active
- checkActiveIfNeeded(flinkCluster);
- // 3) check job if running on cluster
- boolean existsRunningJob =
applicationService.existsRunningJobByClusterId(flinkCluster.getId());
- ApiAlertException.throwIfTrue(
- existsRunningJob, "Some app is running on this cluster, the cluster
cannot be shutdown");
-
- updateClusterState(flinkCluster.getId(), ClusterState.CANCELING);
try {
- // 4) shutdown
- ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster,
clusterId);
+ ShutDownResponse shutDownResponse =
+ shutdownInternal(flinkCluster, flinkCluster.getClusterId());
ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response
failed");
flinkCluster.setClusterState(ClusterState.CANCELED.getValue());
flinkCluster.setEndTime(new Date());
@@ -258,6 +244,24 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
}
+ public Boolean allowShutdownCluster(FlinkCluster cluster) {
+ FlinkCluster flinkCluster = this.getById(cluster.getId());
+ // 1) check mode
+ String clusterId = flinkCluster.getClusterId();
+ ApiAlertException.throwIfTrue(
+ StringUtils.isBlank(clusterId), "The clusterId can not be empty!");
+
+ // 2) check cluster is active
+ checkActiveIfNeeded(flinkCluster);
+
+ // 3) check job if running on cluster
+ boolean existsRunningJob =
applicationService.existsRunningJobByClusterId(flinkCluster.getId());
+ ApiAlertException.throwIfTrue(
+ existsRunningJob, "Some app is running on this cluster, the cluster
cannot be shutdown");
+
+ return true;
+ }
+
@Override
public Boolean existsByClusterId(String clusterId, Long id) {
return this.baseMapper.existsByClusterId(clusterId, id);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 3762acd37..c4480aadf 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.core.task;
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.HadoopUtils;
@@ -130,7 +132,10 @@ public class FlinkClusterWatcher {
private void alert(FlinkCluster cluster, ClusterState state) {
if (cluster.getAlertId() != null) {
-
cluster.setJobs(applicationService.countJobsByClusterId(cluster.getId()));
+
cluster.setAllJobs(applicationService.countJobsByClusterId(cluster.getId()));
+ cluster.setAffectedJobs(
+ applicationService.countAffectedJobsByClusterId(
+ cluster.getId(),
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
cluster.setClusterState(state.getValue());
cluster.setEndTime(new Date());
alertService.alert(cluster, state);
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
index dd7331bac..799923c7c 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-dingTalk.ftl
@@ -32,7 +32,8 @@
- **Start Time:${startTime}**
- **End Time:${endTime}**
- **Duration:${duration}**
-- **Affected Jobs:${affectedJobs}**
+- **All Jobs:${allJobs}**
+- **About Affected Jobs:${affectedJobs}**
</#if>
> Best Wishes!
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
index a3d622858..17a44f140 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-email.ftl
@@ -1012,7 +1012,16 @@
<tr>
<td
style="border-bottom-style:solid; border-bottom-width: 1px;
border-bottom-color: rgba(169,169,169,.5); border-left-style:solid;
border-left-width: 1px; border-left-color: rgba(169,169,169,.5);
border-top-style:solid; border-top-width: 1px; border-top-color:
rgba(169,169,169,.5); padding: 1em">
-
Affected Jobs
+
All Jobs
+ </td>
+ <td
style="border: 1px solid rgba(169,169,169,.5); padding: 1em">
+
${mail.allJobs}
+ </td>
+ </tr>
+
+ <tr>
+ <td
style="border-bottom-style:solid; border-bottom-width: 1px;
border-bottom-color: rgba(169,169,169,.5); border-left-style:solid;
border-left-width: 1px; border-left-color: rgba(169,169,169,.5);
border-top-style:solid; border-top-width: 1px; border-top-color:
rgba(169,169,169,.5); padding: 1em">
+
About Affected Jobs
</td>
<td
style="border: 1px solid rgba(169,169,169,.5); padding: 1em">
${mail.affectedJobs}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
index a33b5811d..fb45eda6f 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-lark.ftl
@@ -153,7 +153,14 @@
{
"is_short": false,
"text": {
- "content": "**Affected Jobs:${affectedJobs}**",
+ "content": "**All Jobs:${allJobs}**",
+ "tag": "lark_md"
+ }
+ },
+ {
+ "is_short": false,
+ "text": {
+ "content": "**About Affected Jobs:${affectedJobs}**",
"tag": "lark_md"
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
index d72a46872..731d1adaa 100644
---
a/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
+++
b/streampark-console/streampark-console-service/src/main/resources/alert-template/alert-weCom.ftl
@@ -32,7 +32,8 @@
- **Start Time:${startTime}**
- **End Time:${endTime}**
- **Duration:${duration}**
-- **Affected Jobs:${affectedJobs}**
+- **All Jobs:${allJobs}**
+- **About Affected Jobs:${affectedJobs}**
</#if>
> Best Wishes!
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 04005c724..d8fa3f7dc 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -141,6 +141,24 @@
limit 1
</select>
+ <select id="countAffectedJobsByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
+ select
+ count(1)
+ from t_flink_app
+ where
+ flink_cluster_id = #{clusterId}
+ and state in (5, 7, 9)
+ <choose>
+ <when test="dbType == 'pgsql'">
+ and (end_time IS NULL or EXTRACT(EPOCH FROM (NOW() -
end_time)) <= 2)
+ </when>
+ <when test="dbType == 'mysql' or dbType == 'h2'">
+ and (end_time IS NULL or TIMESTAMPDIFF(SECOND, end_time,
NOW()) <= 2)
+ </when>
+ </choose>
+ limit 1
+ </select>
+
<select id="getByProjectId"
resultType="org.apache.streampark.console.core.entity.Application"
parameterType="java.lang.Long">
select * from t_flink_app where project_id=#{projectId}
</select>
diff --git
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index 2c485c592..2ed2b1eb6 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -140,11 +140,17 @@ export enum ClusterStateEnum {
/** The cluster was just created but not started */
CREATED = 0,
/** cluster started */
- STARTED = 1,
+ RUNNING = 1,
/** cluster canceled */
CANCELED = 2,
/** cluster lost */
LOST = 3,
+ /** cluster unknown */
+ UNKNOWN = 4,
+ STARTING = 5,
+ CANCELLING = 6,
+ FAILED = 7,
+ KILLED = 8,
}
export enum AppTypeEnum {
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
index 2e7c48e91..eec73f51d 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
@@ -24,6 +24,7 @@ export default {
form: {
clusterName: 'Cluster Name',
address: 'Cluster URL',
+ runState: 'Run State',
internal: 'internal cluster',
executionMode: 'Execution Mode',
versionId: 'Flink Version',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
index 4d9974736..4e926d749 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
@@ -24,6 +24,7 @@ export default {
form: {
clusterName: '集群名称',
address: '集群URL',
+ runState: '运行状态',
executionMode: '执行模式',
versionId: 'Flink版本',
addType: '添加类型',
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 7b1c8b197..727c9b5d2 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -143,7 +143,7 @@
const cluster =
unref(flinkClusters).filter((c) => {
if (flinkClusterId) {
- return c.id == flinkClusterId && c.clusterState ===
ClusterStateEnum.STARTED;
+ return c.id == flinkClusterId && c.clusterState ===
ClusterStateEnum.RUNNING;
}
})[0] || null;
if (cluster) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 63b517f79..e559a8d37 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -254,7 +254,7 @@
if (params.executionMode == ExecModeEnum.KUBERNETES_SESSION) {
const cluster =
unref(flinkClusters).filter((c) => {
- return c.id == params.flinkClusterId && c.clusterState ===
ClusterStateEnum.STARTED;
+ return c.id == params.flinkClusterId && c.clusterState ===
ClusterStateEnum.RUNNING;
})[0] || null;
if (cluster) {
Object.assign(params, { clusterId: cluster.clusterId });
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 597e1c890..e47222d0f 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -82,7 +82,7 @@ export const useCreateAndEditSchema = (
const [registerConfDrawer, { openDrawer: openConfDrawer }] = useDrawer();
- /*
+ /*
!The original item is also unassigned
*/
function getConfigSchemas() {
@@ -98,7 +98,7 @@ export const useCreateAndEditSchema = (
.filter((o) => {
// Edit mode has one more filter condition
if (edit?.mode) {
- return o.executionMode == executionMode && o.clusterState ===
ClusterStateEnum.STARTED;
+ return o.executionMode == executionMode && o.clusterState ===
ClusterStateEnum.RUNNING;
} else {
return o.executionMode == executionMode;
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/State.less
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/State.less
new file mode 100644
index 000000000..9051b9a43
--- /dev/null
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/State.less
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+.status-processing-starting {
+ animation: starting-color 800ms ease-out infinite alternate;
+}
+
+.status-processing-running {
+ animation: running-color 800ms ease-out infinite alternate;
+}
+
+.status-processing-cancelling {
+ animation: cancelling-color 800ms ease-out infinite alternate;
+}
+
+@keyframes starting-color {
+ 0% {
+ border-color: #1ab58e;
+ box-shadow: 0 0 1px #1ab58e, inset 0 0 2px #1ab58e;
+ }
+
+ 100% {
+ border-color: #1ab58e;
+ box-shadow: 0 0 10px #1ab58e, inset 0 0 5px #1ab58e;
+ }
+}
+
+@keyframes running-color {
+ 0% {
+ border-color: #52c41a;
+ box-shadow: 0 0 1px #52c41a, inset 0 0 2px #52c41a;
+ }
+
+ 100% {
+ border-color: #52c41a;
+ box-shadow: 0 0 10px #52c41a, inset 0 0 5px #52c41a;
+ }
+}
+
+@keyframes cancelling-color {
+ 0% {
+ border-color: #faad14;
+ box-shadow: 0 0 1px #faad14, inset 0 0 2px #faad14;
+ }
+
+ 100% {
+ border-color: #faad14;
+ box-shadow: 0 0 10px #faad14, inset 0 0 5px #faad14;
+ }
+}
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/State.tsx
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/State.tsx
new file mode 100644
index 000000000..e45cf2262
--- /dev/null
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/State.tsx
@@ -0,0 +1,77 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+import { defineComponent, toRefs, unref } from 'vue';
+import { Tag } from 'ant-design-vue';
+import './State.less';
+import { ClusterStateEnum } from '/@/enums/flinkEnum';
+
+/* state map*/
+const stateMap = {
+ [ClusterStateEnum.CREATED]: { color: '#2f54eb', title: 'CREATED' },
+ [ClusterStateEnum.STARTING]: {
+ color: '#1AB58E',
+ title: 'STARTING',
+ class: 'status-processing-starting',
+ },
+ [ClusterStateEnum.RUNNING]: {
+ color: '#52c41a',
+ title: 'RUNNING',
+ class: 'status-processing-running',
+ },
+ [ClusterStateEnum.FAILED]: { color: '#f5222d', title: 'FAILED' },
+ [ClusterStateEnum.CANCELLING]: {
+ color: '#faad14',
+ title: 'CANCELLING',
+ class: 'status-processing-cancelling',
+ },
+ [ClusterStateEnum.CANCELED]: { color: '#fa8c16', title: 'CANCELED' },
+ [ClusterStateEnum.KILLED]: { color: '#fa8c16', title: 'KILLED' },
+ [ClusterStateEnum.LOST]: { color: '#99A3A4', title: 'LOST' },
+ [ClusterStateEnum.UNKNOWN]: { color: '#000000', title: 'UNKNOWN' },
+};
+
+export default defineComponent({
+ name: 'State',
+ props: {
+ option: {
+ type: String,
+ default: 'state',
+ },
+ data: {
+ type: Object as PropType<Recordable>,
+ default: () => ({}),
+ },
+ },
+ setup(props) {
+ const { data } = toRefs(props);
+ const renderTag = (map: Recordable, key: number) => {
+ if (!Reflect.has(map, key)) {
+ return;
+ }
+ return <Tag {...map[key]}>{map[key].title}</Tag>;
+ };
+
+ const renderState = () => {
+ return <div class="bold-tag">{renderTag(stateMap,
unref(data).clusterState)}</div>;
+ };
+
+ return () => {
+ return <span>{renderState()}</span>;
+ };
+ },
+});
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
index 8cd7b8dfa..73345d065 100644
---
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
@@ -48,6 +48,7 @@
import { useI18n } from '/@/hooks/web/useI18n';
import { PageWrapper } from '/@/components/Page';
import { BasicTitle } from '/@/components/Basic';
+ import State from './State.tsx'
const ListItem = List.Item;
const ListItemMeta = ListItem.Meta;
@@ -58,7 +59,7 @@
const clusters = ref<FlinkCluster[]>([]);
const loading = ref(false);
function handleIsStart(item) {
- return item.clusterState === ClusterStateEnum.STARTED;
+ return item.clusterState === ClusterStateEnum.RUNNING;
}
/* Go to edit cluster */
@@ -155,7 +156,7 @@
<SvgIcon class="avatar p-15px" name="flink" size="60" />
</template>
</ListItemMeta>
- <div class="list-content" style="width: 20%">
+ <div class="list-content" style="width: 10%">
<div class="list-content-item">
<span>{{ t('setting.flinkCluster.form.executionMode') }}</span>
<p style="margin-top: 10px">
@@ -165,7 +166,7 @@
</div>
<div
class="list-content"
- style="width: 30%"
+ style="width: 40%"
v-if="
item.executionMode === ExecModeEnum.REMOTE ||
item.executionMode === ExecModeEnum.YARN_SESSION
@@ -180,6 +181,21 @@
</p>
</div>
</div>
+ <div
+ class="list-content"
+ style="width: 10%"
+ v-if="
+ item.executionMode === ExecModeEnum.REMOTE ||
+ item.executionMode === ExecModeEnum.YARN_SESSION
+ "
+ >
+ <div class="list-content-item">
+ <span>{{ t('setting.flinkCluster.form.runState') }}</span>
+ <p style="margin-top: 10px">
+ <State :data="{ clusterState: item.clusterState }" />
+ </p>
+ </div>
+ </div>
<template #actions>
<Tooltip :title="t('setting.flinkCluster.edit')">
<a-button