This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 809920f42 [ISSUE-2498][Feature] [SubTask] The cluster supports remote 
and yarn session heartbeat monitoring (#2675)
809920f42 is described below

commit 809920f4260132fb13cdebbd8b6ff10effbe7fa9
Author: xujiangfeng001 <[email protected]>
AuthorDate: Wed May 31 12:43:13 2023 +0800

    [ISSUE-2498][Feature] [SubTask] The cluster supports remote and yarn 
session heartbeat monitoring (#2675)
    
    * [ISSUE-2498][Feature] [SubTask] The cluster supports remote and yarn 
session heartbeat monitoring
---
 .../streampark/common/enums/ClusterState.java      |  34 ++-
 .../main/assembly/script/schema/mysql-schema.sql   |   3 +-
 .../main/assembly/script/schema/pgsql-schema.sql   |   4 +-
 .../main/assembly/script/upgrade/mysql/2.1.0.sql   |   1 -
 .../main/assembly/script/upgrade/mysql/2.2.0.sql   |   3 +
 .../main/assembly/script/upgrade/pgsql/2.2.0.sql   |   3 +
 .../console/core/entity/FlinkCluster.java          |   3 +
 .../core/service/impl/FlinkClusterServiceImpl.java |  23 +-
 .../console/core/task/FlinkClusterWatcher.java     | 285 +++++++++++++++++++++
 .../src/main/resources/db/schema-h2.sql            |   3 +-
 .../resources/mapper/core/FlinkClusterMapper.xml   |   1 +
 11 files changed, 348 insertions(+), 15 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 9dbc3aa4b..cefe347c7 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -24,12 +24,13 @@ public enum ClusterState implements Serializable {
   /** The cluster was just created but not started */
   CREATED(0),
   /** cluster started */
-  STARTED(1),
+  RUNNING(1),
   /** cluster stopped */
   STOPPED(2),
-
   /** cluster lost */
-  LOST(3);
+  LOST(3),
+  /** cluster unknown */
+  UNKNOWN(4);
 
   private final Integer value;
 
@@ -43,14 +44,35 @@ public enum ClusterState implements Serializable {
         return clusterState;
       }
     }
-    return null;
+    return ClusterState.UNKNOWN;
+  }
+
+  public static ClusterState of(String value) {
+    for (ClusterState clusterState : values()) {
+      if (clusterState.name().equals(value)) {
+        return clusterState;
+      }
+    }
+    return ClusterState.UNKNOWN;
   }
 
   public Integer getValue() {
     return value;
   }
 
-  public static boolean isStarted(ClusterState state) {
-    return STARTED.equals(state);
+  public static boolean isCreateState(ClusterState state) {
+    return CREATED.equals(state);
+  }
+
+  public static boolean isRunningState(ClusterState state) {
+    return RUNNING.equals(state);
+  }
+
+  public static boolean isStoppedState(ClusterState state) {
+    return STOPPED.equals(state);
+  }
+
+  public static boolean isLostState(ClusterState state) {
+    return LOST.equals(state);
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 45e2294eb..3a8f9ce4f 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -437,7 +437,8 @@ create table `t_app_build_pipe`(
 drop table if exists `t_flink_cluster`;
 create table `t_flink_cluster` (
   `id` bigint not null auto_increment,
-  `address` varchar(150) default null comment 'url address of jobmanager',
+  `address` varchar(150) default null comment 'url address of cluster',
+  `job_manager_url` varchar(150) default null comment 'url address of 
jobmanager',
   `cluster_id` varchar(45) default null comment 'clusterid of session 
mode(yarn-session:application-id,k8s-session:cluster-id)',
   `cluster_name` varchar(128) not null comment 'cluster name',
   `options` text comment 'json form of parameter collection ',
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index b613b852b..a8706ebe5 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -271,6 +271,7 @@ create sequence "public"."streampark_t_flink_cluster_id_seq"
 create table "public"."t_flink_cluster" (
   "id" int8 not null default 
nextval('streampark_t_flink_cluster_id_seq'::regclass),
   "address" varchar(150) collate "pg_catalog"."default",
+  "job_manager_url" varchar(150) collate "pg_catalog"."default",
   "cluster_id" varchar(45) collate "pg_catalog"."default",
   "cluster_name" varchar(128) collate "pg_catalog"."default" not null,
   "options" text collate "pg_catalog"."default",
@@ -292,7 +293,8 @@ create table "public"."t_flink_cluster" (
   "create_time" timestamp(6) not null default timezone('UTC-8'::text, 
(now())::timestamp(0) without time zone)
 )
 ;
-comment on column "public"."t_flink_cluster"."address" is 'url address of 
jobmanager';
+comment on column "public"."t_flink_cluster"."address" is 'url address of 
cluster';
+comment on column "public"."t_flink_cluster"."job_manager_url" is 'url address 
of jobmanager';
 comment on column "public"."t_flink_cluster"."cluster_id" is 'clusterid of 
session mode(yarn-session:application-id,k8s-session:cluster-id)';
 comment on column "public"."t_flink_cluster"."cluster_name" is 'cluster name';
 comment on column "public"."t_flink_cluster"."options" is 'parameter 
collection json form';
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
index c744b30eb..042b09a53 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
@@ -121,7 +121,6 @@ alter table `t_flink_cluster`
 alter table `t_access_token`
     modify column `description` varchar(255) character set utf8mb4 collate 
utf8mb4_general_ci default null comment 'description';
 
-
 -- menu script
 delete from `t_menu`;
 -- menu level 1
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 38081fb95..581f6c145 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -43,6 +43,9 @@ create table `t_resource` (
 alter table `t_flink_sql`
     add column `team_resource` varchar(64) default null;
 
+alter table `t_flink_cluster`
+    add column `job_manager_url` varchar(150) default null comment 'url 
address of jobmanager' after `address`;
+
 -- menu level 2
 insert into `t_menu` values (120400, 120000, 'menu.resource', 
'/flink/resource', 'flink/resource/View', null, 'apartment', '0', 1, 3, now(), 
now());
 -- menu level 3
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index f60672c61..6178798fd 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -56,6 +56,9 @@ create index "un_team_dname_inx" on "public"."t_resource" 
using btree (
 alter table "public"."t_flink_sql"
     add column "team_resource" varchar(64) default null;
 
+alter table "public"."t_flink_cluster"
+    add column "job_manager_url" varchar(150) collate "pg_catalog"."default";
+
 insert into "public"."t_menu" values (120400, 120000, 'menu.resource', 
'/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3, 
now(), now());
 insert into "public"."t_menu" values (110401, 110400, 'add', null, null, 
'token:add', null, '1', '1', null, now(), now());
 insert into "public"."t_menu" values (110402, 110400, 'delete', null, null, 
'token:delete', null, '1', '1', null, now(), now());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 57672218b..62cedea94 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -64,6 +64,9 @@ public class FlinkCluster implements Serializable {
   @TableField(updateStrategy = FieldStrategy.IGNORED)
   private String address;
 
+  @TableField(updateStrategy = FieldStrategy.IGNORED)
+  private String jobManagerUrl;
+
   private String clusterId;
 
   private String clusterName;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index a1ed45c75..80e527494 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.streampark.console.core.service.CommonService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkClusterWatcher;
 import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.DeployRequest;
@@ -141,11 +142,15 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         successful, String.format(ERROR_CLUSTER_QUEUE_HINT, 
flinkCluster.getYarnQueue()));
     flinkCluster.setCreateTime(new Date());
     if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
-      flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+      flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
     } else {
       flinkCluster.setClusterState(ClusterState.CREATED.getValue());
     }
-    return save(flinkCluster);
+    boolean ret = save(flinkCluster);
+    if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
+      FlinkClusterWatcher.addFlinkCluster(flinkCluster);
+    }
+    return ret;
   }
 
   @Override
@@ -160,16 +165,20 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       if 
(ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
         String address = YarnUtils.getRMWebAppURL() + "/proxy/" + 
deployResponse.clusterId() + "/";
         flinkCluster.setAddress(address);
+        flinkCluster.setJobManagerUrl(deployResponse.address());
       } else {
         flinkCluster.setAddress(deployResponse.address());
       }
       flinkCluster.setClusterId(deployResponse.clusterId());
-      flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+      flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
       flinkCluster.setException(null);
+      FlinkClusterWatcher.addFlinkCluster(flinkCluster);
       updateById(flinkCluster);
       FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
+      flinkCluster.setAddress(null);
+      flinkCluster.setJobManagerUrl(null);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
       flinkCluster.setException(e.toString());
       updateById(flinkCluster);
@@ -214,6 +223,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response 
failed");
       flinkCluster.setAddress(null);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+      FlinkClusterWatcher.removeFlinkCluster(flinkCluster);
       updateById(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
@@ -262,7 +272,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())
         || 
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode())) {
       ApiAlertException.throwIfTrue(
-          ClusterState.isStarted(flinkCluster.getClusterStateEnum()),
+          ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
           "Flink cluster is running, cannot be delete, please check.");
     }
 
@@ -353,10 +363,11 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
     if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
       ApiAlertException.throwIfFalse(
-          ClusterState.isStarted(flinkCluster.getClusterStateEnum()),
+          ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
           "Current cluster is not active, please check!");
       if (!flinkCluster.verifyClusterConnection()) {
         flinkCluster.setAddress(null);
+        flinkCluster.setJobManagerUrl(null);
         flinkCluster.setClusterState(ClusterState.LOST.getValue());
         updateById(flinkCluster);
         throw new ApiAlertException("Current cluster is not active, please 
check!");
@@ -369,8 +380,10 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     flinkCluster.setDescription(cluster.getDescription());
     if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
       flinkCluster.setAddress(cluster.getAddress());
+      flinkCluster.setJobManagerUrl(cluster.getAddress());
     } else {
       flinkCluster.setAddress(null);
+      flinkCluster.setJobManagerUrl(null);
       flinkCluster.setClusterId(cluster.getClusterId());
       flinkCluster.setVersionId(cluster.getVersionId());
       flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
new file mode 100644
index 000000000..2ecb2c7de
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hc.client5.http.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on 
yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = 0L;
+
+  // Track interval  every 30 seconds
+  private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);
+
+  /** Watcher cluster lists */
+  private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new 
ConcurrentHashMap<>(8);
+
+  /** Thread pool for processing status monitoring for each cluster */
+  private static final ExecutorService EXECUTOR =
+      new ThreadPoolExecutor(
+          Runtime.getRuntime().availableProcessors() * 5,
+          Runtime.getRuntime().availableProcessors() * 10,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(1024),
+          ThreadUtils.threadFactory("flink-cluster-watching-executor"));
+
+  /** Initialize cluster cache */
+  @PostConstruct
+  private void init() {
+    WATCHER_CLUSTERS.clear();
+    List<FlinkCluster> flinkClusters =
+        flinkClusterService.list(
+            new LambdaQueryWrapper<FlinkCluster>()
+                .eq(FlinkCluster::getClusterState, 
ClusterState.RUNNING.getValue())
+                .notIn(FlinkCluster::getExecutionMode, 
ExecutionMode.getKubernetesMode()));
+    flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), 
cluster));
+  }
+
+  /** flinkcluster persistent */
+  @PreDestroy
+  private void stop() {
+    // TODO: flinkcluster persistent
+  }
+
+  @Scheduled(fixedDelay = 1000)
+  private void start() {
+    if (System.currentTimeMillis() - lastWatcheringTime >= 
WATCHER_INTERVAL.toMillis()) {
+      lastWatcheringTime = System.currentTimeMillis();
+      watcher();
+    }
+  }
+
+  private void watcher() {
+    for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
+      EXECUTOR.execute(
+          () -> {
+            FlinkCluster flinkCluster = entry.getValue();
+            Integer clusterExecutionMode = flinkCluster.getExecutionMode();
+            if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
+              ClusterState state = getClusterState(flinkCluster);
+              handleClusterState(flinkCluster, state);
+            } else {
+              // TODO: K8s Session status monitoring
+            }
+          });
+    }
+  }
+
+  /**
+   * cluster get state from flink or yarn api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getClusterState(FlinkCluster flinkCluster) {
+    ClusterState state = getClusterStateFromFlinkAPI(flinkCluster);
+    if (ClusterState.isRunningState(state)) {
+      return state;
+    } else {
+      return getClusterStateFromYarnAPI(flinkCluster);
+    }
+  }
+
+  /**
+   * cluster get state from flink rest api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) {
+    final String address = flinkCluster.getAddress();
+    final String jobManagerUrl = flinkCluster.getJobManagerUrl();
+    if (StringUtils.isEmpty(address)) {
+      return ClusterState.STOPPED;
+    }
+    final String flinkUrl =
+        StringUtils.isEmpty(jobManagerUrl)
+            ? address.concat("/overview")
+            : jobManagerUrl.concat("/overview");
+    try {
+      String res =
+          HttpClientUtils.httpGetRequest(
+              flinkUrl,
+              RequestConfig.custom().setConnectTimeout(5000, 
TimeUnit.MILLISECONDS).build());
+
+      JacksonUtils.read(res, Overview.class);
+      return ClusterState.RUNNING;
+    } catch (Exception ignored) {
+      log.error("cluster id:{} get state from flink api failed", 
flinkCluster.getId());
+    }
+    return ClusterState.UNKNOWN;
+  }
+
+  /**
+   * cluster get state from yarn rest api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) {
+    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
+      return ClusterState.STOPPED;
+    }
+    String clusterId = flinkCluster.getClusterId();
+    if (StringUtils.isEmpty(clusterId)) {
+      return ClusterState.STOPPED;
+    }
+    String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
+    try {
+      String result = YarnUtils.restRequest(yarnUrl);
+      if (null == result) {
+        return ClusterState.UNKNOWN;
+      }
+      YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class);
+      FinalApplicationStatus status =
+          
stringConvertFinalApplicationStatus(yarnAppInfo.getApp().getFinalStatus());
+      if (status == null) {
+        log.error(
+            "cluster id:{} final application status convert failed, invalid 
string ",
+            flinkCluster.getId());
+        return ClusterState.UNKNOWN;
+      }
+      return finalApplicationStatusConvertClusterState(status);
+    } catch (Exception e) {
+      return ClusterState.LOST;
+    }
+  }
+
+  /**
+   * process cluster state
+   *
+   * @param flinkCluster
+   * @param state
+   */
+  private void handleClusterState(FlinkCluster flinkCluster, ClusterState 
state) {
+    LambdaUpdateWrapper<FlinkCluster> updateWrapper =
+        new LambdaUpdateWrapper<FlinkCluster>()
+            .eq(FlinkCluster::getId, flinkCluster.getId())
+            .set(FlinkCluster::getClusterState, state.getValue());
+    switch (state) {
+      case STOPPED:
+        {
+          updateWrapper
+              .set(FlinkCluster::getAddress, null)
+              .set(FlinkCluster::getJobManagerUrl, null);
+        }
+        // fall through
+      case LOST:
+      case UNKNOWN:
+        {
+          removeFlinkCluster(flinkCluster);
+          break;
+        }
+    }
+    flinkClusterService.update(updateWrapper);
+  }
+
+  /**
+   * Add a cluster to cache
+   *
+   * @param flinkCluster
+   */
+  public static void addFlinkCluster(FlinkCluster flinkCluster) {
+    if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+      return;
+    }
+    log.info("add the cluster with id:{} to watcher cluster cache", 
flinkCluster.getId());
+    WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
+  }
+
+  /**
+   * Remove a cluster from cache
+   *
+   * @param flinkCluster
+   */
+  public static void removeFlinkCluster(FlinkCluster flinkCluster) {
+    if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+      return;
+    }
+    log.info("remove the cluster with id:{} from watcher cluster cache", 
flinkCluster.getId());
+    WATCHER_CLUSTERS.remove(flinkCluster.getId());
+  }
+
+  /**
+   * string conver final application status
+   *
+   * @param value
+   * @return
+   */
+  private FinalApplicationStatus stringConvertFinalApplicationStatus(String 
value) {
+    for (FinalApplicationStatus status : FinalApplicationStatus.values()) {
+      if (status.name().equals(value)) {
+        return status;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * final application status convert cluster state
+   *
+   * @param status
+   * @return
+   */
+  private ClusterState 
finalApplicationStatusConvertClusterState(FinalApplicationStatus status) {
+    switch (status) {
+      case UNDEFINED:
+        return ClusterState.RUNNING;
+      default:
+        return ClusterState.STOPPED;
+    }
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 3e9f1c454..f2f22da5e 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -390,7 +390,8 @@ create table if not exists `t_app_build_pipe` (
 -- ----------------------------
 create table if not exists `t_flink_cluster` (
   `id` bigint  generated by default as identity not null,
-  `address` varchar(150) default null comment 'url address of jobmanager',
+  `address` varchar(150) default null comment 'url address of cluster',
+  `job_manager_url` varchar(150) default null comment 'url address of 
jobmanager',
   `cluster_id` varchar(45) default null comment 'clusterId of session 
mode(yarn-session:application-id,k8s-session:cluster-id)',
   `cluster_name` varchar(128) not null comment 'cluster name',
   `options` text comment 'json form of parameter collection ',
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index b869326dd..91e791847 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -20,6 +20,7 @@
     <resultMap id="BaseResultMap" 
type="org.apache.streampark.console.core.entity.FlinkCluster">
         <id column="id" jdbcType="BIGINT" property="id"/>
         <result column="address" jdbcType="VARCHAR" property="address"/>
+        <result column="job_manager_url" jdbcType="VARCHAR" 
property="jobManagerUrl"/>
         <result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
         <result column="cluster_name" jdbcType="VARCHAR" 
property="clusterName"/>
         <result column="options" jdbcType="LONGVARCHAR" property="options"/>

Reply via email to