This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 809920f42 [ISSUE-2498][Feature] [SubTask] The cluster supports remote
and yarn session heartbeat monitoring (#2675)
809920f42 is described below
commit 809920f4260132fb13cdebbd8b6ff10effbe7fa9
Author: xujiangfeng001 <[email protected]>
AuthorDate: Wed May 31 12:43:13 2023 +0800
[ISSUE-2498][Feature] [SubTask] The cluster supports remote and yarn
session heartbeat monitoring (#2675)
* [ISSUE-2498][Feature] [SubTask] The cluster supports remote and yarn
session heartbeat monitoring
---
.../streampark/common/enums/ClusterState.java | 34 ++-
.../main/assembly/script/schema/mysql-schema.sql | 3 +-
.../main/assembly/script/schema/pgsql-schema.sql | 4 +-
.../main/assembly/script/upgrade/mysql/2.1.0.sql | 1 -
.../main/assembly/script/upgrade/mysql/2.2.0.sql | 3 +
.../main/assembly/script/upgrade/pgsql/2.2.0.sql | 3 +
.../console/core/entity/FlinkCluster.java | 3 +
.../core/service/impl/FlinkClusterServiceImpl.java | 23 +-
.../console/core/task/FlinkClusterWatcher.java | 285 +++++++++++++++++++++
.../src/main/resources/db/schema-h2.sql | 3 +-
.../resources/mapper/core/FlinkClusterMapper.xml | 1 +
11 files changed, 348 insertions(+), 15 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 9dbc3aa4b..cefe347c7 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -24,12 +24,13 @@ public enum ClusterState implements Serializable {
/** The cluster was just created but not started */
CREATED(0),
/** cluster started */
- STARTED(1),
+ RUNNING(1),
/** cluster stopped */
STOPPED(2),
-
/** cluster lost */
- LOST(3);
+ LOST(3),
+ /** cluster unknown */
+ UNKNOWN(4);
private final Integer value;
@@ -43,14 +44,35 @@ public enum ClusterState implements Serializable {
return clusterState;
}
}
- return null;
+ return ClusterState.UNKNOWN;
+ }
+
+ public static ClusterState of(String value) {
+ for (ClusterState clusterState : values()) {
+ if (clusterState.name().equals(value)) {
+ return clusterState;
+ }
+ }
+ return ClusterState.UNKNOWN;
}
public Integer getValue() {
return value;
}
- public static boolean isStarted(ClusterState state) {
- return STARTED.equals(state);
+ public static boolean isCreateState(ClusterState state) {
+ return CREATED.equals(state);
+ }
+
+ public static boolean isRunningState(ClusterState state) {
+ return RUNNING.equals(state);
+ }
+
+ public static boolean isStoppedState(ClusterState state) {
+ return STOPPED.equals(state);
+ }
+
+ public static boolean isLostState(ClusterState state) {
+ return LOST.equals(state);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 45e2294eb..3a8f9ce4f 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -437,7 +437,8 @@ create table `t_app_build_pipe`(
drop table if exists `t_flink_cluster`;
create table `t_flink_cluster` (
`id` bigint not null auto_increment,
- `address` varchar(150) default null comment 'url address of jobmanager',
+ `address` varchar(150) default null comment 'url address of cluster',
+ `job_manager_url` varchar(150) default null comment 'url address of
jobmanager',
`cluster_id` varchar(45) default null comment 'clusterid of session
mode(yarn-session:application-id,k8s-session:cluster-id)',
`cluster_name` varchar(128) not null comment 'cluster name',
`options` text comment 'json form of parameter collection ',
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index b613b852b..a8706ebe5 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -271,6 +271,7 @@ create sequence "public"."streampark_t_flink_cluster_id_seq"
create table "public"."t_flink_cluster" (
"id" int8 not null default
nextval('streampark_t_flink_cluster_id_seq'::regclass),
"address" varchar(150) collate "pg_catalog"."default",
+ "job_manager_url" varchar(150) collate "pg_catalog"."default",
"cluster_id" varchar(45) collate "pg_catalog"."default",
"cluster_name" varchar(128) collate "pg_catalog"."default" not null,
"options" text collate "pg_catalog"."default",
@@ -292,7 +293,8 @@ create table "public"."t_flink_cluster" (
"create_time" timestamp(6) not null default timezone('UTC-8'::text,
(now())::timestamp(0) without time zone)
)
;
-comment on column "public"."t_flink_cluster"."address" is 'url address of
jobmanager';
+comment on column "public"."t_flink_cluster"."address" is 'url address of
cluster';
+comment on column "public"."t_flink_cluster"."job_manager_url" is 'url address
of jobmanager';
comment on column "public"."t_flink_cluster"."cluster_id" is 'clusterid of
session mode(yarn-session:application-id,k8s-session:cluster-id)';
comment on column "public"."t_flink_cluster"."cluster_name" is 'cluster name';
comment on column "public"."t_flink_cluster"."options" is 'parameter
collection json form';
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
index c744b30eb..042b09a53 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
@@ -121,7 +121,6 @@ alter table `t_flink_cluster`
alter table `t_access_token`
modify column `description` varchar(255) character set utf8mb4 collate
utf8mb4_general_ci default null comment 'description';
-
-- menu script
delete from `t_menu`;
-- menu level 1
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 38081fb95..581f6c145 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -43,6 +43,9 @@ create table `t_resource` (
alter table `t_flink_sql`
add column `team_resource` varchar(64) default null;
+alter table `t_flink_cluster`
+ add column `job_manager_url` varchar(150) default null comment 'url
address of jobmanager' after `address`;
+
-- menu level 2
insert into `t_menu` values (120400, 120000, 'menu.resource',
'/flink/resource', 'flink/resource/View', null, 'apartment', '0', 1, 3, now(),
now());
-- menu level 3
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index f60672c61..6178798fd 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -56,6 +56,9 @@ create index "un_team_dname_inx" on "public"."t_resource"
using btree (
alter table "public"."t_flink_sql"
add column "team_resource" varchar(64) default null;
+alter table "public"."t_flink_cluster"
+ add column "job_manager_url" varchar(150) collate "pg_catalog"."default";
+
insert into "public"."t_menu" values (120400, 120000, 'menu.resource',
'/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3,
now(), now());
insert into "public"."t_menu" values (110401, 110400, 'add', null, null,
'token:add', null, '1', '1', null, now(), now());
insert into "public"."t_menu" values (110402, 110400, 'delete', null, null,
'token:delete', null, '1', '1', null, now(), now());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 57672218b..62cedea94 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -64,6 +64,9 @@ public class FlinkCluster implements Serializable {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String address;
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
+ private String jobManagerUrl;
+
private String clusterId;
private String clusterName;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index a1ed45c75..80e527494 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -31,6 +31,7 @@ import
org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkClusterWatcher;
import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.DeployRequest;
@@ -141,11 +142,15 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
successful, String.format(ERROR_CLUSTER_QUEUE_HINT,
flinkCluster.getYarnQueue()));
flinkCluster.setCreateTime(new Date());
if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
- flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+ flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
} else {
flinkCluster.setClusterState(ClusterState.CREATED.getValue());
}
- return save(flinkCluster);
+ boolean ret = save(flinkCluster);
+ if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
+ FlinkClusterWatcher.addFlinkCluster(flinkCluster);
+ }
+ return ret;
}
@Override
@@ -160,16 +165,20 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
if
(ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
String address = YarnUtils.getRMWebAppURL() + "/proxy/" +
deployResponse.clusterId() + "/";
flinkCluster.setAddress(address);
+ flinkCluster.setJobManagerUrl(deployResponse.address());
} else {
flinkCluster.setAddress(deployResponse.address());
}
flinkCluster.setClusterId(deployResponse.clusterId());
- flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+ flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
flinkCluster.setException(null);
+ FlinkClusterWatcher.addFlinkCluster(flinkCluster);
updateById(flinkCluster);
FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
+ flinkCluster.setAddress(null);
+ flinkCluster.setJobManagerUrl(null);
flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
flinkCluster.setException(e.toString());
updateById(flinkCluster);
@@ -214,6 +223,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response
failed");
flinkCluster.setAddress(null);
flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+ FlinkClusterWatcher.removeFlinkCluster(flinkCluster);
updateById(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
@@ -262,7 +272,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())
||
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode())) {
ApiAlertException.throwIfTrue(
- ClusterState.isStarted(flinkCluster.getClusterStateEnum()),
+ ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
"Flink cluster is running, cannot be delete, please check.");
}
@@ -353,10 +363,11 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
ApiAlertException.throwIfFalse(
- ClusterState.isStarted(flinkCluster.getClusterStateEnum()),
+ ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
"Current cluster is not active, please check!");
if (!flinkCluster.verifyClusterConnection()) {
flinkCluster.setAddress(null);
+ flinkCluster.setJobManagerUrl(null);
flinkCluster.setClusterState(ClusterState.LOST.getValue());
updateById(flinkCluster);
throw new ApiAlertException("Current cluster is not active, please
check!");
@@ -369,8 +380,10 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setDescription(cluster.getDescription());
if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
flinkCluster.setAddress(cluster.getAddress());
+ flinkCluster.setJobManagerUrl(cluster.getAddress());
} else {
flinkCluster.setAddress(null);
+ flinkCluster.setJobManagerUrl(null);
flinkCluster.setClusterId(cluster.getClusterId());
flinkCluster.setVersionId(cluster.getVersionId());
flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
new file mode 100644
index 000000000..2ecb2c7de
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hc.client5.http.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on
yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+ @Autowired private FlinkClusterService flinkClusterService;
+
+ private Long lastWatcheringTime = 0L;
+
+ // Track interval every 30 seconds
+ private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);
+
+ /** Watcher cluster lists */
+ private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new
ConcurrentHashMap<>(8);
+
+ /** Thread pool for processing status monitoring for each cluster */
+ private static final ExecutorService EXECUTOR =
+ new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors() * 5,
+ Runtime.getRuntime().availableProcessors() * 10,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(1024),
+ ThreadUtils.threadFactory("flink-cluster-watching-executor"));
+
+ /** Initialize cluster cache */
+ @PostConstruct
+ private void init() {
+ WATCHER_CLUSTERS.clear();
+ List<FlinkCluster> flinkClusters =
+ flinkClusterService.list(
+ new LambdaQueryWrapper<FlinkCluster>()
+ .eq(FlinkCluster::getClusterState,
ClusterState.RUNNING.getValue())
+ .notIn(FlinkCluster::getExecutionMode,
ExecutionMode.getKubernetesMode()));
+ flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(),
cluster));
+ }
+
+ /** flinkcluster persistent */
+ @PreDestroy
+ private void stop() {
+ // TODO: flinkcluster persistent
+ }
+
+ @Scheduled(fixedDelay = 1000)
+ private void start() {
+ if (System.currentTimeMillis() - lastWatcheringTime >=
WATCHER_INTERVAL.toMillis()) {
+ lastWatcheringTime = System.currentTimeMillis();
+ watcher();
+ }
+ }
+
+ private void watcher() {
+ for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
+ EXECUTOR.execute(
+ () -> {
+ FlinkCluster flinkCluster = entry.getValue();
+ Integer clusterExecutionMode = flinkCluster.getExecutionMode();
+ if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
+ ClusterState state = getClusterState(flinkCluster);
+ handleClusterState(flinkCluster, state);
+ } else {
+ // TODO: K8s Session status monitoring
+ }
+ });
+ }
+ }
+
+ /**
+ * cluster get state from flink or yarn api
+ *
+ * @param flinkCluster
+ * @return
+ */
+ private ClusterState getClusterState(FlinkCluster flinkCluster) {
+ ClusterState state = getClusterStateFromFlinkAPI(flinkCluster);
+ if (ClusterState.isRunningState(state)) {
+ return state;
+ } else {
+ return getClusterStateFromYarnAPI(flinkCluster);
+ }
+ }
+
+ /**
+ * cluster get state from flink rest api
+ *
+ * @param flinkCluster
+ * @return
+ */
+ private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) {
+ final String address = flinkCluster.getAddress();
+ final String jobManagerUrl = flinkCluster.getJobManagerUrl();
+ if (StringUtils.isEmpty(address)) {
+ return ClusterState.STOPPED;
+ }
+ final String flinkUrl =
+ StringUtils.isEmpty(jobManagerUrl)
+ ? address.concat("/overview")
+ : jobManagerUrl.concat("/overview");
+ try {
+ String res =
+ HttpClientUtils.httpGetRequest(
+ flinkUrl,
+ RequestConfig.custom().setConnectTimeout(5000,
TimeUnit.MILLISECONDS).build());
+
+ JacksonUtils.read(res, Overview.class);
+ return ClusterState.RUNNING;
+ } catch (Exception ignored) {
+ log.error("cluster id:{} get state from flink api failed",
flinkCluster.getId());
+ }
+ return ClusterState.UNKNOWN;
+ }
+
+ /**
+ * cluster get state from yarn rest api
+ *
+ * @param flinkCluster
+ * @return
+ */
+ private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) {
+ if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
+ return ClusterState.STOPPED;
+ }
+ String clusterId = flinkCluster.getClusterId();
+ if (StringUtils.isEmpty(clusterId)) {
+ return ClusterState.STOPPED;
+ }
+ String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
+ try {
+ String result = YarnUtils.restRequest(yarnUrl);
+ if (null == result) {
+ return ClusterState.UNKNOWN;
+ }
+ YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class);
+ FinalApplicationStatus status =
+
stringConvertFinalApplicationStatus(yarnAppInfo.getApp().getFinalStatus());
+ if (status == null) {
+ log.error(
+ "cluster id:{} final application status convert failed, invalid
string ",
+ flinkCluster.getId());
+ return ClusterState.UNKNOWN;
+ }
+ return finalApplicationStatusConvertClusterState(status);
+ } catch (Exception e) {
+ return ClusterState.LOST;
+ }
+ }
+
+ /**
+ * process cluster state
+ *
+ * @param flinkCluster
+ * @param state
+ */
+ private void handleClusterState(FlinkCluster flinkCluster, ClusterState
state) {
+ LambdaUpdateWrapper<FlinkCluster> updateWrapper =
+ new LambdaUpdateWrapper<FlinkCluster>()
+ .eq(FlinkCluster::getId, flinkCluster.getId())
+ .set(FlinkCluster::getClusterState, state.getValue());
+ switch (state) {
+ case STOPPED:
+ {
+ updateWrapper
+ .set(FlinkCluster::getAddress, null)
+ .set(FlinkCluster::getJobManagerUrl, null);
+ }
+ // fall through
+ case LOST:
+ case UNKNOWN:
+ {
+ removeFlinkCluster(flinkCluster);
+ break;
+ }
+ }
+ flinkClusterService.update(updateWrapper);
+ }
+
+ /**
+ * Add a cluster to cache
+ *
+ * @param flinkCluster
+ */
+ public static void addFlinkCluster(FlinkCluster flinkCluster) {
+ if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+ return;
+ }
+ log.info("add the cluster with id:{} to watcher cluster cache",
flinkCluster.getId());
+ WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
+ }
+
+ /**
+ * Remove a cluster from cache
+ *
+ * @param flinkCluster
+ */
+ public static void removeFlinkCluster(FlinkCluster flinkCluster) {
+ if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+ return;
+ }
+ log.info("remove the cluster with id:{} from watcher cluster cache",
flinkCluster.getId());
+ WATCHER_CLUSTERS.remove(flinkCluster.getId());
+ }
+
+ /**
+ * string conver final application status
+ *
+ * @param value
+ * @return
+ */
+ private FinalApplicationStatus stringConvertFinalApplicationStatus(String
value) {
+ for (FinalApplicationStatus status : FinalApplicationStatus.values()) {
+ if (status.name().equals(value)) {
+ return status;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * final application status convert cluster state
+ *
+ * @param status
+ * @return
+ */
+ private ClusterState
finalApplicationStatusConvertClusterState(FinalApplicationStatus status) {
+ switch (status) {
+ case UNDEFINED:
+ return ClusterState.RUNNING;
+ default:
+ return ClusterState.STOPPED;
+ }
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 3e9f1c454..f2f22da5e 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -390,7 +390,8 @@ create table if not exists `t_app_build_pipe` (
-- ----------------------------
create table if not exists `t_flink_cluster` (
`id` bigint generated by default as identity not null,
- `address` varchar(150) default null comment 'url address of jobmanager',
+ `address` varchar(150) default null comment 'url address of cluster',
+ `job_manager_url` varchar(150) default null comment 'url address of
jobmanager',
`cluster_id` varchar(45) default null comment 'clusterId of session
mode(yarn-session:application-id,k8s-session:cluster-id)',
`cluster_name` varchar(128) not null comment 'cluster name',
`options` text comment 'json form of parameter collection ',
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index b869326dd..91e791847 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -20,6 +20,7 @@
<resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.FlinkCluster">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="address" jdbcType="VARCHAR" property="address"/>
+ <result column="job_manager_url" jdbcType="VARCHAR"
property="jobManagerUrl"/>
<result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
<result column="cluster_name" jdbcType="VARCHAR"
property="clusterName"/>
<result column="options" jdbcType="LONGVARCHAR" property="options"/>