This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch cluster-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 5eea06447233f269d8d9139e840ea4e8a41dd8f9
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 2 01:26:25 2023 +0800

    [Improve] Flink cluster status monitoring improvement
---
 .../console/core/entity/FlinkCluster.java          |   3 -
 .../console/core/service/FlinkClusterService.java  |   2 +
 .../core/service/impl/FlinkClusterServiceImpl.java |  80 ++++-----
 .../console/core/task/FlinkClusterWatcher.java     | 200 +++++++--------------
 .../console/core/task/FlinkRESTAPIWatcher.java     |  87 ++++-----
 5 files changed, 132 insertions(+), 240 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 32feec747..2cd105840 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -64,9 +64,6 @@ public class FlinkCluster implements Serializable {
   @TableField(updateStrategy = FieldStrategy.IGNORED)
   private String address;
 
-  @TableField(updateStrategy = FieldStrategy.IGNORED)
-  private String jobManagerUrl;
-
   private String clusterId;
 
   private String clusterName;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d149152af..788596c5d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -47,4 +47,6 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
   Boolean existsByFlinkEnvId(Long id);
 
   List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode> 
executionModes);
+
+  void updateClusterToStopped(Long id);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 769ddf626..21cf5dc0b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
@@ -149,7 +150,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
     boolean ret = save(flinkCluster);
     if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
-      FlinkClusterWatcher.addFlinkCluster(flinkCluster);
+      FlinkClusterWatcher.addWatching(flinkCluster);
     }
     return ret;
   }
@@ -167,7 +168,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         String address =
             YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
         flinkCluster.setAddress(address);
-        flinkCluster.setJobManagerUrl(deployResponse.address());
       } else {
         flinkCluster.setAddress(deployResponse.address());
       }
@@ -176,12 +176,10 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setException(null);
       flinkCluster.setStartTime(new Date());
       flinkCluster.setEndTime(null);
-      FlinkClusterWatcher.addFlinkCluster(flinkCluster);
+      FlinkClusterWatcher.addWatching(flinkCluster);
       updateById(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
-      flinkCluster.setAddress(null);
-      flinkCluster.setJobManagerUrl(null);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
       flinkCluster.setException(e.toString());
       updateById(flinkCluster);
@@ -190,18 +188,31 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public void update(FlinkCluster cluster) {
-    FlinkCluster flinkCluster = getById(cluster.getId());
-    boolean success = validateQueueIfNeeded(flinkCluster, cluster);
+  public void update(FlinkCluster paramOfCluster) {
+    FlinkCluster flinkCluster = getById(paramOfCluster.getId());
+    boolean success = validateQueueIfNeeded(flinkCluster, paramOfCluster);
     ApiAlertException.throwIfFalse(
-        success, String.format(ERROR_CLUSTER_QUEUE_HINT, 
cluster.getYarnQueue()));
-    updateCluster(cluster, flinkCluster);
-    try {
-      updateById(flinkCluster);
-    } catch (Exception e) {
-      throw new ApiDetailException(
-          "Update cluster failed, Caused By: " + 
ExceptionUtils.getStackTrace(e));
+        success, String.format(ERROR_CLUSTER_QUEUE_HINT, 
paramOfCluster.getYarnQueue()));
+
+    flinkCluster.setClusterName(paramOfCluster.getClusterName());
+    flinkCluster.setDescription(paramOfCluster.getDescription());
+    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
+      flinkCluster.setAddress(paramOfCluster.getAddress());
+    } else {
+      flinkCluster.setClusterId(paramOfCluster.getClusterId());
+      flinkCluster.setVersionId(paramOfCluster.getVersionId());
+      flinkCluster.setDynamicProperties(paramOfCluster.getDynamicProperties());
+      flinkCluster.setOptions(paramOfCluster.getOptions());
+      flinkCluster.setResolveOrder(paramOfCluster.getResolveOrder());
+      
flinkCluster.setK8sHadoopIntegration(paramOfCluster.getK8sHadoopIntegration());
+      flinkCluster.setK8sConf(paramOfCluster.getK8sConf());
+      flinkCluster.setK8sNamespace(paramOfCluster.getK8sNamespace());
+      
flinkCluster.setK8sRestExposedType(paramOfCluster.getK8sRestExposedType());
+      flinkCluster.setServiceAccount(paramOfCluster.getServiceAccount());
+      flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
+      flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
     }
+    updateById(flinkCluster);
   }
 
   @Override
@@ -224,10 +235,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       // 4) shutdown
       ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster, 
clusterId);
       ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response 
failed");
-      flinkCluster.setAddress(null);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
       flinkCluster.setEndTime(new Date());
-      FlinkClusterWatcher.removeFlinkCluster(flinkCluster);
+      FlinkClusterWatcher.unWatching(flinkCluster);
       updateById(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
@@ -267,6 +277,16 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
                         .collect(Collectors.toSet())));
   }
 
+  @Override
+  public void updateClusterToStopped(Long id) {
+    LambdaUpdateWrapper<FlinkCluster> updateWrapper =
+        new LambdaUpdateWrapper<FlinkCluster>()
+            .eq(FlinkCluster::getId, id)
+            .set(FlinkCluster::getClusterState, 
ClusterState.STOPPED.getValue())
+            .set(FlinkCluster::getEndTime, new Date());
+    update(updateWrapper);
+  }
+
   @Override
   public void delete(FlinkCluster cluster) {
     Long id = cluster.getId();
@@ -370,8 +390,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
           ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
           "Current cluster is not active, please check!");
       if (!flinkCluster.verifyClusterConnection()) {
-        flinkCluster.setAddress(null);
-        flinkCluster.setJobManagerUrl(null);
         flinkCluster.setClusterState(ClusterState.LOST.getValue());
         updateById(flinkCluster);
         throw new ApiAlertException("Current cluster is not active, please 
check!");
@@ -379,30 +397,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
   }
 
-  private void updateCluster(FlinkCluster cluster, FlinkCluster flinkCluster) {
-    flinkCluster.setClusterName(cluster.getClusterName());
-    flinkCluster.setDescription(cluster.getDescription());
-    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
-      flinkCluster.setAddress(cluster.getAddress());
-      flinkCluster.setJobManagerUrl(cluster.getAddress());
-    } else {
-      flinkCluster.setAddress(null);
-      flinkCluster.setJobManagerUrl(null);
-      flinkCluster.setClusterId(cluster.getClusterId());
-      flinkCluster.setVersionId(cluster.getVersionId());
-      flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
-      flinkCluster.setOptions(cluster.getOptions());
-      flinkCluster.setResolveOrder(cluster.getResolveOrder());
-      flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
-      flinkCluster.setK8sConf(cluster.getK8sConf());
-      flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
-      flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
-      flinkCluster.setServiceAccount(cluster.getServiceAccount());
-      flinkCluster.setFlinkImage(cluster.getFlinkImage());
-      flinkCluster.setYarnQueue(cluster.getYarnQueue());
-    }
-  }
-
   @Nullable
   private KubernetesDeployParam getKubernetesDeployDesc(
       @Nonnull FlinkCluster flinkCluster, String action) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 29131f651..13de7a3fe 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -30,19 +30,16 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hc.client5.http.config.RequestConfig;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 
 import java.time.Duration;
 import java.util.Date;
@@ -65,7 +62,7 @@ public class FlinkClusterWatcher {
 
   @Autowired private ApplicationService applicationService;
 
-  private Long lastWatcheringTime = 0L;
+  private Long lastWatchingTime = 0L;
 
   // Track interval  every 30 seconds
   private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);
@@ -73,6 +70,8 @@ public class FlinkClusterWatcher {
   /** Watcher cluster lists */
   private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new 
ConcurrentHashMap<>(8);
 
+  private boolean immediateWatch = false;
+
   /** Thread pool for processing status monitoring for each cluster */
   private static final ExecutorService EXECUTOR =
       new ThreadPoolExecutor(
@@ -95,71 +94,40 @@ public class FlinkClusterWatcher {
     flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), 
cluster));
   }
 
-  /** flinkcluster persistent */
-  @PreDestroy
-  private void stop() {
-    // TODO: flinkcluster persistent
-  }
-
   @Scheduled(fixedDelay = 1000)
   private void start() {
-    if (System.currentTimeMillis() - lastWatcheringTime >= 
WATCHER_INTERVAL.toMillis()) {
-      lastWatcheringTime = System.currentTimeMillis();
-      watcher();
-    }
-  }
-
-  private void watcher() {
-    for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
-      EXECUTOR.execute(
-          () -> {
-            FlinkCluster flinkCluster = entry.getValue();
-            updateClusterState(flinkCluster);
-          });
-    }
-  }
-
-  private ClusterState updateClusterState(FlinkCluster flinkCluster) {
-    Integer clusterExecutionMode = flinkCluster.getExecutionMode();
-    if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
-      ClusterState state = getClusterState(flinkCluster);
-      handleClusterState(flinkCluster, state);
-      return state;
-    } else {
-      // TODO: K8s Session status monitoring
-      return ClusterState.UNKNOWN;
-    }
-  }
-
-  public synchronized boolean verifyClusterValidByClusterId(Long clusterId) {
-    FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
-    ClusterState state = ClusterState.of(flinkCluster.getClusterState());
-    if (!ClusterState.isRunningState(state)) {
-      return false;
-    }
-    state = updateClusterState(flinkCluster);
-    if (!ClusterState.isRunningState(state)) {
-      return false;
-    }
-    return true;
-  }
-
-  public boolean checkAlert(Long clusterId) {
-    FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
-    if (flinkCluster.getAlertId() == null) {
-      return false;
+    if (immediateWatch
+        || System.currentTimeMillis() - lastWatchingTime >= 
WATCHER_INTERVAL.toMillis()) {
+      lastWatchingTime = System.currentTimeMillis();
+      immediateWatch = false;
+      for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
+        EXECUTOR.execute(
+            () -> {
+              FlinkCluster flinkCluster = entry.getValue();
+              ClusterState state = getClusterState(flinkCluster);
+              switch (state) {
+                case STOPPED:
+                  
flinkClusterService.updateClusterToStopped(flinkCluster.getId());
+                  break;
+                  // fall through
+                case LOST:
+                case UNKNOWN:
+                  unWatching(flinkCluster);
+                  alert(flinkCluster, state);
+                  break;
+              }
+            });
+      }
     }
-    return true;
   }
 
   private void alert(FlinkCluster cluster, ClusterState state) {
-    if (!checkAlert(cluster.getId())) {
-      return;
+    if (cluster.getAlertId() != null) {
+      
cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId()));
+      cluster.setClusterState(state.getValue());
+      cluster.setEndTime(new Date());
+      alertService.alert(cluster, state);
     }
-    
cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId()));
-    cluster.setClusterState(state.getValue());
-    cluster.setEndTime(new Date());
-    alertService.alert(cluster, state);
   }
 
   /**
@@ -168,13 +136,21 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    * @return
    */
-  private ClusterState getClusterState(FlinkCluster flinkCluster) {
-    ClusterState state = getClusterStateFromFlinkAPI(flinkCluster);
-    if (ClusterState.isRunningState(state)) {
-      return state;
-    } else {
-      return getClusterStateFromYarnAPI(flinkCluster);
+  public synchronized ClusterState getClusterState(FlinkCluster flinkCluster) {
+    ClusterState state;
+    switch (flinkCluster.getExecutionModeEnum()) {
+      case REMOTE:
+        state = httpRemoteClusterState(flinkCluster);
+        break;
+      case YARN_SESSION:
+        state = httpYarnSessionClusterState(flinkCluster);
+        break;
+      default:
+        state = ClusterState.UNKNOWN;
+        break;
     }
+    immediateWatch = !ClusterState.isRunningState(state);
+    return state;
   }
 
   /**
@@ -183,28 +159,20 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    * @return
    */
-  private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) {
+  private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
     final String address = flinkCluster.getAddress();
-    final String jobManagerUrl = flinkCluster.getJobManagerUrl();
-    if (StringUtils.isEmpty(address)) {
-      return ClusterState.STOPPED;
-    }
-    final String flinkUrl =
-        StringUtils.isEmpty(jobManagerUrl)
-            ? address.concat("/overview")
-            : jobManagerUrl.concat("/overview");
+    final String flinkUrl = address.concat("/overview");
     try {
       String res =
           HttpClientUtils.httpGetRequest(
               flinkUrl,
               RequestConfig.custom().setConnectTimeout(5000, 
TimeUnit.MILLISECONDS).build());
-
       JacksonUtils.read(res, Overview.class);
       return ClusterState.RUNNING;
     } catch (Exception ignored) {
       log.error("cluster id:{} get state from flink api failed", 
flinkCluster.getId());
     }
-    return ClusterState.UNKNOWN;
+    return ClusterState.LOST;
   }
 
   /**
@@ -213,14 +181,7 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    * @return
    */
-  private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) {
-    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
-      return ClusterState.LOST;
-    }
-    String clusterId = flinkCluster.getClusterId();
-    if (StringUtils.isEmpty(clusterId)) {
-      return ClusterState.STOPPED;
-    }
+  private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
     String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
     try {
       String result = YarnUtils.restRequest(yarnUrl);
@@ -243,64 +204,27 @@ public class FlinkClusterWatcher {
   }
 
   /**
-   * process cluster state
+   * add flinkCluster to watching
    *
    * @param flinkCluster
-   * @param state
    */
-  private void handleClusterState(FlinkCluster flinkCluster, ClusterState 
state) {
-    LambdaUpdateWrapper<FlinkCluster> updateWrapper =
-        new LambdaUpdateWrapper<FlinkCluster>()
-            .eq(FlinkCluster::getId, flinkCluster.getId())
-            .set(FlinkCluster::getClusterState, state.getValue());
-    switch (state) {
-      case STOPPED:
-        {
-          updateWrapper
-              .set(FlinkCluster::getAddress, null)
-              .set(FlinkCluster::getJobManagerUrl, null)
-              .set(FlinkCluster::getEndTime, new Date());
-        }
-        // fall through
-      case LOST:
-      case UNKNOWN:
-        {
-          removeFlinkCluster(flinkCluster);
-          alert(flinkCluster, state);
-          break;
-        }
+  public static void addWatching(FlinkCluster flinkCluster) {
+    if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+      log.info("add the cluster with id:{} to watcher cluster cache", 
flinkCluster.getId());
+      WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
     }
-    flinkClusterService.update(updateWrapper);
   }
 
-  /**
-   * Add a cluster to cache
-   *
-   * @param flinkCluster
-   */
-  public static void addFlinkCluster(FlinkCluster flinkCluster) {
+  /** @param flinkCluster */
+  public static void unWatching(FlinkCluster flinkCluster) {
     if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
-      return;
+      log.info("remove the cluster with id:{} from watcher cluster cache", 
flinkCluster.getId());
+      WATCHER_CLUSTERS.remove(flinkCluster.getId());
     }
-    log.info("add the cluster with id:{} to watcher cluster cache", 
flinkCluster.getId());
-    WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
   }
 
   /**
-   * Remove a cluster from cache
-   *
-   * @param flinkCluster
-   */
-  public static void removeFlinkCluster(FlinkCluster flinkCluster) {
-    if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
-      return;
-    }
-    log.info("remove the cluster with id:{} from watcher cluster cache", 
flinkCluster.getId());
-    WATCHER_CLUSTERS.remove(flinkCluster.getId());
-  }
-
-  /**
-   * string conver final application status
+   * string converse final application status
    *
    * @param value
    * @return
@@ -321,11 +245,9 @@ public class FlinkClusterWatcher {
    * @return
    */
   private ClusterState 
finalApplicationStatusConvertClusterState(FinalApplicationStatus status) {
-    switch (status) {
-      case UNDEFINED:
-        return ClusterState.RUNNING;
-      default:
-        return ClusterState.STOPPED;
+    if (status == FinalApplicationStatus.UNDEFINED) {
+      return ClusterState.RUNNING;
     }
+    return ClusterState.STOPPED;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index 737cc1bc6..207954063 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.task;
 
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.ThreadUtils;
@@ -53,7 +54,6 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -161,7 +161,11 @@ public class FlinkRESTAPIWatcher {
             new LambdaQueryWrapper<Application>()
                 .eq(Application::getTracking, 1)
                 .notIn(Application::getExecutionMode, 
ExecutionMode.getKubernetesMode()));
-    applications.forEach((app) -> WATCHING_APPS.put(app.getId(), app));
+    applications.forEach(
+        (app) -> {
+          WATCHING_APPS.put(app.getId(), app);
+          STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
+        });
   }
 
   @PreDestroy
@@ -230,7 +234,7 @@ public class FlinkRESTAPIWatcher {
                   if (StopFrom.NONE.equals(stopFrom)) {
                     savePointService.expire(application.getId());
                     application.setState(FlinkAppState.LOST.getValue());
-                    alert(application, FlinkAppState.LOST);
+                    doAlert(application, FlinkAppState.LOST);
                   } else {
                     application.setState(FlinkAppState.CANCELED.getValue());
                   }
@@ -246,7 +250,7 @@ public class FlinkRESTAPIWatcher {
                 doPersistMetrics(application, true);
                 FlinkAppState appState = 
FlinkAppState.of(application.getState());
                 if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
-                  alert(application, FlinkAppState.of(application.getState()));
+                  doAlert(application, 
FlinkAppState.of(application.getState()));
                   if (appState.equals(FlinkAppState.FAILED)) {
                     try {
                       applicationService.start(application, true);
@@ -341,13 +345,13 @@ public class FlinkRESTAPIWatcher {
 
     // get overview info at the first start time
     if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
+      STARTING_CACHE.invalidate(application.getId());
       Overview override = httpOverview(application);
       if (override != null && override.getSlotsTotal() > 0) {
         application.setTotalTM(override.getTaskmanagers());
         application.setTotalSlot(override.getSlotsTotal());
         application.setAvailableSlot(override.getSlotsAvailable());
       }
-      STARTING_CACHE.invalidate(application.getId());
     }
   }
 
@@ -458,7 +462,7 @@ public class FlinkRESTAPIWatcher {
             savePointService.expire(application.getId());
           }
           stopCanceledJob(application.getId());
-          alert(application, FlinkAppState.CANCELED);
+          doAlert(application, FlinkAppState.CANCELED);
         }
         STOP_FROM_MAP.remove(application.getId());
         doPersistMetrics(application, true);
@@ -469,7 +473,7 @@ public class FlinkRESTAPIWatcher {
         STOP_FROM_MAP.remove(application.getId());
         application.setState(FlinkAppState.FAILED.getValue());
         doPersistMetrics(application, true);
-        alert(application, FlinkAppState.FAILED);
+        doAlert(application, FlinkAppState.FAILED);
         applicationService.start(application, true);
         break;
       case RESTARTING:
@@ -546,7 +550,7 @@ public class FlinkRESTAPIWatcher {
               || flinkAppState.equals(FlinkAppState.LOST)
               || (flinkAppState.equals(FlinkAppState.CANCELED) && 
StopFrom.NONE.equals(stopFrom))
               || applicationService.checkAlter(application)) {
-            alert(application, flinkAppState);
+            doAlert(application, flinkAppState);
             stopCanceledJob(application.getId());
             if (flinkAppState.equals(FlinkAppState.FAILED)) {
               applicationService.start(application, true);
@@ -585,23 +589,6 @@ public class FlinkRESTAPIWatcher {
     }
   }
 
-  public static void doWatching(Application application) {
-    if (isKubernetesApp(application)) {
-      return;
-    }
-    log.info("FlinkRESTAPIWatcher add app to tracking,appId:{}", 
application.getId());
-    WATCHING_APPS.put(application.getId(), application);
-    STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
-  }
-
-  public static void addSavepoint(Long appId) {
-    if (isKubernetesApp(appId)) {
-      return;
-    }
-    log.info("FlinkRESTAPIWatcher add app to savepoint,appId:{}", appId);
-    SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
-  }
-
   public static void unWatching(Long appId) {
     if (isKubernetesApp(appId)) {
       return;
@@ -618,19 +605,6 @@ public class FlinkRESTAPIWatcher {
     CANCELLED_JOB_MAP.remove(appId);
   }
 
-  public static void addCanceledApp(Long appId, Long userId) {
-    log.info("flink job addCanceledApp app appId:{}, useId:{}", appId, userId);
-    CANCELLED_JOB_MAP.put(appId, userId);
-  }
-
-  public static Long getCanceledJobUserId(Long appId) {
-    return CANCELLED_JOB_MAP.get(appId) == null ? Long.valueOf(-1) : 
CANCELLED_JOB_MAP.get(appId);
-  }
-
-  public static Collection<Application> getWatchingApps() {
-    return WATCHING_APPS.values();
-  }
-
   private static boolean isKubernetesApp(Application application) {
     return FlinkK8sWatcherWrapper.isKubernetesApp(application);
   }
@@ -743,10 +717,6 @@ public class FlinkRESTAPIWatcher {
     return JacksonUtils.read(result, clazz);
   }
 
-  public boolean isWatchingApp(Long id) {
-    return WATCHING_APPS.containsKey(id);
-  }
-
   private <T> T httpRemoteCluster(Long clusterId, Callback<FlinkCluster, T> 
function)
       throws Exception {
     FlinkCluster flinkCluster = getFlinkRemoteCluster(clusterId, false);
@@ -781,19 +751,26 @@ public class FlinkRESTAPIWatcher {
    * alarm; If the abnormal behavior of the job is caused by itself and the 
flink cluster is running
    * normally, the job will an alarm
    */
-  private void alert(Application app, FlinkAppState appState) {
-    if (ExecutionMode.isYarnPerJobOrAppMode(app.getExecutionModeEnum())
-        || !flinkClusterWatcher.checkAlert(app.getFlinkClusterId())) {
-      alertService.alert(app, appState);
-      return;
-    }
-    boolean isValid = 
flinkClusterWatcher.verifyClusterValidByClusterId(app.getFlinkClusterId());
-    if (isValid) {
-      log.info(
-          "application with id {} is yarn session or remote and flink cluster 
with id {} is alive, application send alert",
-          app.getId(),
-          app.getFlinkClusterId());
-      alertService.alert(app, appState);
+  private void doAlert(Application app, FlinkAppState appState) {
+    switch (app.getExecutionModeEnum()) {
+      case YARN_APPLICATION:
+      case YARN_PER_JOB:
+        alertService.alert(app, appState);
+        return;
+      case YARN_SESSION:
+      case REMOTE:
+        FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
+        ClusterState clusterState = 
flinkClusterWatcher.getClusterState(flinkCluster);
+        if (ClusterState.isRunningState(clusterState)) {
+          log.info(
+              "application with id {} is yarn session or remote and flink 
cluster with id {} is alive, application send alert",
+              app.getId(),
+              app.getFlinkClusterId());
+          alertService.alert(app, appState);
+        }
+        break;
+      default:
+        break;
     }
   }
 }

Reply via email to