This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch cluster-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/cluster-state by this push:
     new 6a9d1d346 Flink cluster state monitoring improve
6a9d1d346 is described below

commit 6a9d1d346d2b589524af8c899c1ca5f641c55fde
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 2 18:03:52 2023 +0800

    Flink cluster state monitoring improve
---
 .../streampark/common/enums/ClusterState.java      | 20 +++--------
 .../console/core/mapper/ApplicationMapper.java     |  2 +-
 .../console/core/service/ApplicationService.java   |  2 +-
 .../console/core/service/FlinkClusterService.java  |  3 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  4 +--
 .../core/service/impl/FlinkClusterServiceImpl.java |  8 ++---
 .../console/core/task/FlinkClusterWatcher.java     | 41 +++++++++++++---------
 .../console/core/task/FlinkRESTAPIWatcher.java     | 27 +++++++-------
 .../resources/mapper/core/ApplicationMapper.xml    |  2 +-
 9 files changed, 54 insertions(+), 55 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index decfc770a..d9a715f7e 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -60,23 +60,13 @@ public enum ClusterState implements Serializable {
     return value;
   }
 
-  public static boolean isCreateState(ClusterState state) {
-    return CREATED.equals(state);
-  }
-
-  public static boolean isRunningState(ClusterState state) {
+  public static boolean isRunning(ClusterState state) {
     return RUNNING.equals(state);
   }
 
-  public static boolean isStoppedState(ClusterState state) {
-    return STOPPED.equals(state);
-  }
-
-  public static boolean isLostState(ClusterState state) {
-    return LOST.equals(state);
-  }
-
-  public static boolean isUnknownState(ClusterState state) {
-    return UNKNOWN.equals(state);
+  public static boolean isFailed(ClusterState state) {
+    return state == ClusterState.STOPPED
+        || state == ClusterState.LOST
+        || state == ClusterState.UNKNOWN;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index d1b59fa07..55c2ee72f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -66,5 +66,5 @@ public interface ApplicationMapper extends 
BaseMapper<Application> {
 
   boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
 
-  Integer getAffectedJobsByClusterId(@Param("clusterId") Long clusterId);
+  Integer countJobsByClusterId(@Param("clusterId") Long clusterId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index a64922afa..cba588c65 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -105,7 +105,7 @@ public interface ApplicationService extends 
IService<Application> {
 
   boolean existsJobByClusterId(Long clusterId);
 
-  Integer getAffectedJobsByClusterId(Long clusterId);
+  Integer countJobsByClusterId(Long clusterId);
 
   boolean existsJobByFlinkEnvId(Long flinkEnvId);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 788596c5d..6d4559735 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service;
 
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -48,5 +49,5 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
 
   List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode> 
executionModes);
 
-  void updateClusterToStopped(Long id);
+  void updateClusterFinalState(Long id, ClusterState state);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 98253bb9f..ef9150fe4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -545,8 +545,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   }
 
   @Override
-  public Integer getAffectedJobsByClusterId(Long clusterId) {
-    return baseMapper.getAffectedJobsByClusterId(clusterId);
+  public Integer countJobsByClusterId(Long clusterId) {
+    return baseMapper.countJobsByClusterId(clusterId);
   }
 
   @Override
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index a8512a203..1dfc09e13 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -279,11 +279,11 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public void updateClusterToStopped(Long id) {
+  public void updateClusterFinalState(Long id, ClusterState state) {
     LambdaUpdateWrapper<FlinkCluster> updateWrapper =
         new LambdaUpdateWrapper<FlinkCluster>()
             .eq(FlinkCluster::getId, id)
-            .set(FlinkCluster::getClusterState, 
ClusterState.STOPPED.getValue())
+            .set(FlinkCluster::getClusterState, state.getValue())
             .set(FlinkCluster::getEndTime, new Date());
     update(updateWrapper);
   }
@@ -297,7 +297,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())
         || 
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode())) {
       ApiAlertException.throwIfTrue(
-          ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
+          ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
           "Flink cluster is running, cannot be delete, please check.");
     }
 
@@ -388,7 +388,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
     if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
       ApiAlertException.throwIfFalse(
-          ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
+          ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
           "Current cluster is not active, please check!");
       if (!flinkCluster.verifyClusterConnection()) {
         flinkCluster.setClusterState(ClusterState.LOST.getValue());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 13de7a3fe..39af495dc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -34,6 +34,8 @@ import 
org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hc.client5.http.config.RequestConfig;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -62,7 +64,7 @@ public class FlinkClusterWatcher {
 
   @Autowired private ApplicationService applicationService;
 
-  private Long lastWatchingTime = 0L;
+  private Long lastWatchTime = 0L;
 
   // Track interval  every 30 seconds
   private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);
@@ -70,6 +72,9 @@ public class FlinkClusterWatcher {
   /** Watcher cluster lists */
   private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new 
ConcurrentHashMap<>(8);
 
+  private static final Cache<Long, ClusterState> FAILED_STATES =
+      Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build();
+
   private boolean immediateWatch = false;
 
   /** Thread pool for processing status monitoring for each cluster */
@@ -97,24 +102,18 @@ public class FlinkClusterWatcher {
   @Scheduled(fixedDelay = 1000)
   private void start() {
     if (immediateWatch
-        || System.currentTimeMillis() - lastWatchingTime >= 
WATCHER_INTERVAL.toMillis()) {
-      lastWatchingTime = System.currentTimeMillis();
+        || System.currentTimeMillis() - lastWatchTime >= 
WATCHER_INTERVAL.toMillis()) {
+      lastWatchTime = System.currentTimeMillis();
       immediateWatch = false;
       for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
         EXECUTOR.execute(
             () -> {
               FlinkCluster flinkCluster = entry.getValue();
               ClusterState state = getClusterState(flinkCluster);
-              switch (state) {
-                case STOPPED:
-                  
flinkClusterService.updateClusterToStopped(flinkCluster.getId());
-                  break;
-                  // fall through
-                case LOST:
-                case UNKNOWN:
-                  unWatching(flinkCluster);
-                  alert(flinkCluster, state);
-                  break;
+              if (ClusterState.isFailed(state)) {
+                
flinkClusterService.updateClusterFinalState(flinkCluster.getId(), state);
+                unWatching(flinkCluster);
+                alert(flinkCluster, state);
               }
             });
       }
@@ -123,7 +122,7 @@ public class FlinkClusterWatcher {
 
   private void alert(FlinkCluster cluster, ClusterState state) {
     if (cluster.getAlertId() != null) {
-      
cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId()));
+      
cluster.setJobs(applicationService.countJobsByClusterId(cluster.getId()));
       cluster.setClusterState(state.getValue());
       cluster.setEndTime(new Date());
       alertService.alert(cluster, state);
@@ -136,8 +135,11 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    * @return
    */
-  public synchronized ClusterState getClusterState(FlinkCluster flinkCluster) {
-    ClusterState state;
+  public ClusterState getClusterState(FlinkCluster flinkCluster) {
+    ClusterState state = FAILED_STATES.getIfPresent(flinkCluster.getId());
+    if (state != null) {
+      return state;
+    }
     switch (flinkCluster.getExecutionModeEnum()) {
       case REMOTE:
         state = httpRemoteClusterState(flinkCluster);
@@ -149,7 +151,12 @@ public class FlinkClusterWatcher {
         state = ClusterState.UNKNOWN;
         break;
     }
-    immediateWatch = !ClusterState.isRunningState(state);
+    if (ClusterState.isRunning(state)) {
+      FAILED_STATES.invalidate(flinkCluster.getId());
+    } else {
+      immediateWatch = true;
+      FAILED_STATES.put(flinkCluster.getId(), state);
+    }
     return state;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index ff6ee225b..6af9d0cb0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -54,6 +54,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
@@ -84,9 +85,10 @@ public class FlinkRESTAPIWatcher {
   @Autowired private FlinkClusterWatcher flinkClusterWatcher;
 
   // track interval  every 5 seconds
-  private static final long WATCHING_INTERVAL = 1000L * 5;
+  private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+
   // option interval within 10 seconds
-  private static final long OPTION_INTERVAL = 1000L * 10;
+  private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
 
   /**
    *
@@ -139,7 +141,7 @@ public class FlinkRESTAPIWatcher {
 
   private static final Map<Long, OptionState> OPTIONING = new 
ConcurrentHashMap<>(0);
 
-  private Long lastWatchingTime = 0L;
+  private Long lastWatchTime = 0L;
 
   private Long lastOptionTime = 0L;
 
@@ -188,20 +190,19 @@ public class FlinkRESTAPIWatcher {
   public void start() {
     // The application has been started at the first time, or the front-end is 
operating start/stop,
     // need to return status info immediately.
-    if (lastWatchingTime == null || !OPTIONING.isEmpty()) {
-      doWatch();
-    } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL) 
{
-      // The last operation time is less than option interval.(10 seconds)
-      doWatch();
-    } else if (System.currentTimeMillis() - lastWatchingTime >= 
WATCHING_INTERVAL) {
-      // Normal information obtain, check if there is 5 seconds interval 
between this time and the
-      // last time.(once every 5 seconds)
+    if (lastWatchTime == null || !OPTIONING.isEmpty()) {
       doWatch();
+    } else {
+      Long timeMillis = System.currentTimeMillis();
+      if (timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
+          || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
+        lastWatchTime = timeMillis;
+        doWatch();
+      }
     }
   }
 
   private void doWatch() {
-    lastWatchingTime = System.currentTimeMillis();
     for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) {
       watch(entry.getKey(), entry.getValue());
     }
@@ -796,7 +797,7 @@ public class FlinkRESTAPIWatcher {
       case REMOTE:
         FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
         ClusterState clusterState = 
flinkClusterWatcher.getClusterState(flinkCluster);
-        if (ClusterState.isRunningState(clusterState)) {
+        if (ClusterState.isRunning(clusterState)) {
           log.info(
               "application with id {} is yarn session or remote and flink 
cluster with id {} is alive, application send alert",
               app.getId(),
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index cc0173387..04005c724 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -133,7 +133,7 @@
              limit 1
     </select>
 
-    <select id="getAffectedJobsByClusterId" resultType="java.lang.Integer" 
parameterType="java.lang.Long">
+    <select id="countJobsByClusterId" resultType="java.lang.Integer" 
parameterType="java.lang.Long">
         select
             count(1)
         from t_flink_app

Reply via email to