This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch cluster-state in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 5eea06447233f269d8d9139e840ea4e8a41dd8f9 Author: benjobs <[email protected]> AuthorDate: Sun Jul 2 01:26:25 2023 +0800 [Improve] Flink cluster status monitoring improvement --- .../console/core/entity/FlinkCluster.java | 3 - .../console/core/service/FlinkClusterService.java | 2 + .../core/service/impl/FlinkClusterServiceImpl.java | 80 ++++----- .../console/core/task/FlinkClusterWatcher.java | 200 +++++++-------------- .../console/core/task/FlinkRESTAPIWatcher.java | 87 ++++----- 5 files changed, 132 insertions(+), 240 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index 32feec747..2cd105840 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -64,9 +64,6 @@ public class FlinkCluster implements Serializable { @TableField(updateStrategy = FieldStrategy.IGNORED) private String address; - @TableField(updateStrategy = FieldStrategy.IGNORED) - private String jobManagerUrl; - private String clusterId; private String clusterName; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java index d149152af..788596c5d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java @@ -47,4 +47,6 @@ public interface FlinkClusterService extends IService<FlinkCluster> { Boolean existsByFlinkEnvId(Long id); List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode> executionModes); + + void updateClusterToStopped(Long id); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 769ddf626..21cf5dc0b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; @@ -149,7 +150,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli } boolean ret = save(flinkCluster); if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) { - FlinkClusterWatcher.addFlinkCluster(flinkCluster); + FlinkClusterWatcher.addWatching(flinkCluster); } return ret; } @@ -167,7 +168,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli String address = YarnUtils.getRMWebAppURL(true) + "/proxy/" + deployResponse.clusterId() + "/"; flinkCluster.setAddress(address); - flinkCluster.setJobManagerUrl(deployResponse.address()); } else { flinkCluster.setAddress(deployResponse.address()); } @@ -176,12 +176,10 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli flinkCluster.setException(null); flinkCluster.setStartTime(new Date()); flinkCluster.setEndTime(null); - FlinkClusterWatcher.addFlinkCluster(flinkCluster); + FlinkClusterWatcher.addWatching(flinkCluster); updateById(flinkCluster); } catch (Exception e) { log.error(e.getMessage(), e); - flinkCluster.setAddress(null); - flinkCluster.setJobManagerUrl(null); flinkCluster.setClusterState(ClusterState.STOPPED.getValue()); flinkCluster.setException(e.toString()); updateById(flinkCluster); @@ -190,18 +188,31 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli } @Override - public void update(FlinkCluster cluster) { - FlinkCluster flinkCluster = getById(cluster.getId()); - boolean success = validateQueueIfNeeded(flinkCluster, cluster); + public void update(FlinkCluster paramOfCluster) { + FlinkCluster flinkCluster = getById(paramOfCluster.getId()); + boolean success = validateQueueIfNeeded(flinkCluster, paramOfCluster); ApiAlertException.throwIfFalse( - success, String.format(ERROR_CLUSTER_QUEUE_HINT, cluster.getYarnQueue())); - updateCluster(cluster, flinkCluster); - try { - updateById(flinkCluster); - } catch (Exception e) { - throw new ApiDetailException( - "Update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e)); + success, String.format(ERROR_CLUSTER_QUEUE_HINT, paramOfCluster.getYarnQueue())); + + flinkCluster.setClusterName(paramOfCluster.getClusterName()); + flinkCluster.setDescription(paramOfCluster.getDescription()); + if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) { + flinkCluster.setAddress(paramOfCluster.getAddress()); + } else { + flinkCluster.setClusterId(paramOfCluster.getClusterId()); + flinkCluster.setVersionId(paramOfCluster.getVersionId()); + flinkCluster.setDynamicProperties(paramOfCluster.getDynamicProperties()); + flinkCluster.setOptions(paramOfCluster.getOptions()); + flinkCluster.setResolveOrder(paramOfCluster.getResolveOrder()); + flinkCluster.setK8sHadoopIntegration(paramOfCluster.getK8sHadoopIntegration()); + flinkCluster.setK8sConf(paramOfCluster.getK8sConf()); + flinkCluster.setK8sNamespace(paramOfCluster.getK8sNamespace()); + flinkCluster.setK8sRestExposedType(paramOfCluster.getK8sRestExposedType()); + flinkCluster.setServiceAccount(paramOfCluster.getServiceAccount()); + flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage()); + flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue()); } + updateById(flinkCluster); } @Override @@ -224,10 +235,9 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli // 4) shutdown ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster, clusterId); ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response failed"); - flinkCluster.setAddress(null); flinkCluster.setClusterState(ClusterState.STOPPED.getValue()); flinkCluster.setEndTime(new Date()); - FlinkClusterWatcher.removeFlinkCluster(flinkCluster); + FlinkClusterWatcher.unWatching(flinkCluster); updateById(flinkCluster); } catch (Exception e) { log.error(e.getMessage(), e); @@ -267,6 +277,16 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli .collect(Collectors.toSet()))); } + @Override + public void updateClusterToStopped(Long id) { + LambdaUpdateWrapper<FlinkCluster> updateWrapper = + new LambdaUpdateWrapper<FlinkCluster>() + .eq(FlinkCluster::getId, id) + .set(FlinkCluster::getClusterState, ClusterState.STOPPED.getValue()) + .set(FlinkCluster::getEndTime, new Date()); + update(updateWrapper); + } + @Override public void delete(FlinkCluster cluster) { Long id = cluster.getId(); @@ -370,8 +390,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli ClusterState.isRunningState(flinkCluster.getClusterStateEnum()), "Current cluster is not active, please check!"); if (!flinkCluster.verifyClusterConnection()) { - flinkCluster.setAddress(null); - flinkCluster.setJobManagerUrl(null); flinkCluster.setClusterState(ClusterState.LOST.getValue()); updateById(flinkCluster); throw new ApiAlertException("Current cluster is not active, please check!"); @@ -379,30 +397,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli } } - private void updateCluster(FlinkCluster cluster, FlinkCluster flinkCluster) { - flinkCluster.setClusterName(cluster.getClusterName()); - flinkCluster.setDescription(cluster.getDescription()); - if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) { - flinkCluster.setAddress(cluster.getAddress()); - flinkCluster.setJobManagerUrl(cluster.getAddress()); - } else { - flinkCluster.setAddress(null); - flinkCluster.setJobManagerUrl(null); - flinkCluster.setClusterId(cluster.getClusterId()); - flinkCluster.setVersionId(cluster.getVersionId()); - flinkCluster.setDynamicProperties(cluster.getDynamicProperties()); - flinkCluster.setOptions(cluster.getOptions()); - flinkCluster.setResolveOrder(cluster.getResolveOrder()); - flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration()); - flinkCluster.setK8sConf(cluster.getK8sConf()); - flinkCluster.setK8sNamespace(cluster.getK8sNamespace()); - flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType()); - flinkCluster.setServiceAccount(cluster.getServiceAccount()); - flinkCluster.setFlinkImage(cluster.getFlinkImage()); - flinkCluster.setYarnQueue(cluster.getYarnQueue()); - } - } - @Nullable private KubernetesDeployParam getKubernetesDeployDesc( @Nonnull FlinkCluster flinkCluster, String action) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java index 29131f651..13de7a3fe 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java @@ -30,19 +30,16 @@ import org.apache.streampark.console.core.service.ApplicationService; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.alert.AlertService; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hc.client5.http.config.RequestConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Date; @@ -65,7 +62,7 @@ public class FlinkClusterWatcher { @Autowired private ApplicationService applicationService; - private Long lastWatcheringTime = 0L; + private Long lastWatchingTime = 0L; // Track interval every 30 seconds private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30); @@ -73,6 +70,8 @@ public class FlinkClusterWatcher { /** Watcher cluster lists */ private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(8); + private boolean immediateWatch = false; + /** Thread pool for processing status monitoring for each cluster */ private static final ExecutorService EXECUTOR = new ThreadPoolExecutor( @@ -95,71 +94,40 @@ public class FlinkClusterWatcher { flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster)); } - /** flinkcluster persistent */ - @PreDestroy - private void stop() { - // TODO: flinkcluster persistent - } - @Scheduled(fixedDelay = 1000) private void start() { - if (System.currentTimeMillis() - lastWatcheringTime >= WATCHER_INTERVAL.toMillis()) { - lastWatcheringTime = System.currentTimeMillis(); - watcher(); - } - } - - private void watcher() { - for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) { - EXECUTOR.execute( - () -> { - FlinkCluster flinkCluster = entry.getValue(); - updateClusterState(flinkCluster); - }); - } - } - - private ClusterState updateClusterState(FlinkCluster flinkCluster) { - Integer clusterExecutionMode = flinkCluster.getExecutionMode(); - if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) { - ClusterState state = getClusterState(flinkCluster); - handleClusterState(flinkCluster, state); - return state; - } else { - // TODO: K8s Session status monitoring - return ClusterState.UNKNOWN; - } - } - - public synchronized boolean verifyClusterValidByClusterId(Long clusterId) { - FlinkCluster flinkCluster = flinkClusterService.getById(clusterId); - ClusterState state = ClusterState.of(flinkCluster.getClusterState()); - if (!ClusterState.isRunningState(state)) { - return false; - } - state = updateClusterState(flinkCluster); - if (!ClusterState.isRunningState(state)) { - return false; - } - return true; - } - - public boolean checkAlert(Long clusterId) { - FlinkCluster flinkCluster = flinkClusterService.getById(clusterId); - if (flinkCluster.getAlertId() == null) { - return false; + if (immediateWatch + || System.currentTimeMillis() - lastWatchingTime >= WATCHER_INTERVAL.toMillis()) { + lastWatchingTime = System.currentTimeMillis(); + immediateWatch = false; + for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) { + EXECUTOR.execute( + () -> { + FlinkCluster flinkCluster = entry.getValue(); + ClusterState state = getClusterState(flinkCluster); + switch (state) { + case STOPPED: + flinkClusterService.updateClusterToStopped(flinkCluster.getId()); + break; + // fall through + case LOST: + case UNKNOWN: + unWatching(flinkCluster); + alert(flinkCluster, state); + break; + } + }); + } } - return true; } private void alert(FlinkCluster cluster, ClusterState state) { - if (!checkAlert(cluster.getId())) { - return; + if (cluster.getAlertId() != null) { + cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId())); + cluster.setClusterState(state.getValue()); + cluster.setEndTime(new Date()); + alertService.alert(cluster, state); } - cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId())); - cluster.setClusterState(state.getValue()); - cluster.setEndTime(new Date()); - alertService.alert(cluster, state); } /** @@ -168,13 +136,21 @@ public class FlinkClusterWatcher { * @param flinkCluster * @return */ - private ClusterState getClusterState(FlinkCluster flinkCluster) { - ClusterState state = getClusterStateFromFlinkAPI(flinkCluster); - if (ClusterState.isRunningState(state)) { - return state; - } else { - return getClusterStateFromYarnAPI(flinkCluster); + public synchronized ClusterState getClusterState(FlinkCluster flinkCluster) { + ClusterState state; + switch (flinkCluster.getExecutionModeEnum()) { + case REMOTE: + state = httpRemoteClusterState(flinkCluster); + break; + case YARN_SESSION: + state = httpYarnSessionClusterState(flinkCluster); + break; + default: + state = ClusterState.UNKNOWN; + break; } + immediateWatch = !ClusterState.isRunningState(state); + return state; } /** @@ -183,28 +159,20 @@ public class FlinkClusterWatcher { * @param flinkCluster * @return */ - private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) { + private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) { final String address = flinkCluster.getAddress(); - final String jobManagerUrl = flinkCluster.getJobManagerUrl(); - if (StringUtils.isEmpty(address)) { - return ClusterState.STOPPED; - } - final String flinkUrl = - StringUtils.isEmpty(jobManagerUrl) - ? address.concat("/overview") - : jobManagerUrl.concat("/overview"); + final String flinkUrl = address.concat("/overview"); try { String res = HttpClientUtils.httpGetRequest( flinkUrl, RequestConfig.custom().setConnectTimeout(5000, TimeUnit.MILLISECONDS).build()); - JacksonUtils.read(res, Overview.class); return ClusterState.RUNNING; } catch (Exception ignored) { log.error("cluster id:{} get state from flink api failed", flinkCluster.getId()); } - return ClusterState.UNKNOWN; + return ClusterState.LOST; } /** @@ -213,14 +181,7 @@ public class FlinkClusterWatcher { * @param flinkCluster * @return */ - private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) { - if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) { - return ClusterState.LOST; - } - String clusterId = flinkCluster.getClusterId(); - if (StringUtils.isEmpty(clusterId)) { - return ClusterState.STOPPED; - } + private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) { String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId()); try { String result = YarnUtils.restRequest(yarnUrl); @@ -243,64 +204,27 @@ public class FlinkClusterWatcher { } /** - * process cluster state + * add flinkCluster to watching * * @param flinkCluster - * @param state */ - private void handleClusterState(FlinkCluster flinkCluster, ClusterState state) { - LambdaUpdateWrapper<FlinkCluster> updateWrapper = - new LambdaUpdateWrapper<FlinkCluster>() - .eq(FlinkCluster::getId, flinkCluster.getId()) - .set(FlinkCluster::getClusterState, state.getValue()); - switch (state) { - case STOPPED: - { - updateWrapper - .set(FlinkCluster::getAddress, null) - .set(FlinkCluster::getJobManagerUrl, null) - .set(FlinkCluster::getEndTime, new Date()); - } - // fall through - case LOST: - case UNKNOWN: - { - removeFlinkCluster(flinkCluster); - alert(flinkCluster, state); - break; - } + public static void addWatching(FlinkCluster flinkCluster) { + if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) { + log.info("add the cluster with id:{} to watcher cluster cache", flinkCluster.getId()); + WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster); } - flinkClusterService.update(updateWrapper); } - /** - * Add a cluster to cache - * - * @param flinkCluster - */ - public static void addFlinkCluster(FlinkCluster flinkCluster) { + /** @param flinkCluster */ + public static void unWatching(FlinkCluster flinkCluster) { if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) { - return; + log.info("remove the cluster with id:{} from watcher cluster cache", flinkCluster.getId()); + WATCHER_CLUSTERS.remove(flinkCluster.getId()); } - log.info("add the cluster with id:{} to watcher cluster cache", flinkCluster.getId()); - WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster); } /** - * Remove a cluster from cache - * - * @param flinkCluster - */ - public static void removeFlinkCluster(FlinkCluster flinkCluster) { - if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) { - return; - } - log.info("remove the cluster with id:{} from watcher cluster cache", flinkCluster.getId()); - WATCHER_CLUSTERS.remove(flinkCluster.getId()); - } - - /** - * string conver final application status + * string converse final application status * * @param value * @return @@ -321,11 +245,9 @@ public class FlinkClusterWatcher { * @return */ private ClusterState finalApplicationStatusConvertClusterState(FinalApplicationStatus status) { - switch (status) { - case UNDEFINED: - return ClusterState.RUNNING; - default: - return ClusterState.STOPPED; + if (status == FinalApplicationStatus.UNDEFINED) { + return ClusterState.RUNNING; } + return ClusterState.STOPPED; } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index 737cc1bc6..207954063 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.task; +import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.util.HttpClientUtils; import org.apache.streampark.common.util.ThreadUtils; @@ -53,7 +54,6 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.IOException; -import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; @@ -161,7 +161,11 @@ public class FlinkRESTAPIWatcher { new LambdaQueryWrapper<Application>() .eq(Application::getTracking, 1) .notIn(Application::getExecutionMode, ExecutionMode.getKubernetesMode())); - applications.forEach((app) -> WATCHING_APPS.put(app.getId(), app)); + applications.forEach( + (app) -> { + WATCHING_APPS.put(app.getId(), app); + STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE); + }); } @PreDestroy @@ -230,7 +234,7 @@ public class FlinkRESTAPIWatcher { if (StopFrom.NONE.equals(stopFrom)) { savePointService.expire(application.getId()); application.setState(FlinkAppState.LOST.getValue()); - alert(application, FlinkAppState.LOST); + doAlert(application, FlinkAppState.LOST); } else { application.setState(FlinkAppState.CANCELED.getValue()); } @@ -246,7 +250,7 @@ public class FlinkRESTAPIWatcher { doPersistMetrics(application, true); FlinkAppState appState = FlinkAppState.of(application.getState()); if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { - alert(application, FlinkAppState.of(application.getState())); + doAlert(application, FlinkAppState.of(application.getState())); if (appState.equals(FlinkAppState.FAILED)) { try { applicationService.start(application, true); @@ -341,13 +345,13 @@ public class FlinkRESTAPIWatcher { // get overview info at the first start time if (STARTING_CACHE.getIfPresent(application.getId()) != null) { + STARTING_CACHE.invalidate(application.getId()); Overview override = httpOverview(application); if (override != null && override.getSlotsTotal() > 0) { application.setTotalTM(override.getTaskmanagers()); application.setTotalSlot(override.getSlotsTotal()); application.setAvailableSlot(override.getSlotsAvailable()); } - STARTING_CACHE.invalidate(application.getId()); } } @@ -458,7 +462,7 @@ public class FlinkRESTAPIWatcher { savePointService.expire(application.getId()); } stopCanceledJob(application.getId()); - alert(application, FlinkAppState.CANCELED); + doAlert(application, FlinkAppState.CANCELED); } STOP_FROM_MAP.remove(application.getId()); doPersistMetrics(application, true); @@ -469,7 +473,7 @@ public class FlinkRESTAPIWatcher { STOP_FROM_MAP.remove(application.getId()); application.setState(FlinkAppState.FAILED.getValue()); doPersistMetrics(application, true); - alert(application, FlinkAppState.FAILED); + doAlert(application, FlinkAppState.FAILED); applicationService.start(application, true); break; case RESTARTING: @@ -546,7 +550,7 @@ public class FlinkRESTAPIWatcher { || flinkAppState.equals(FlinkAppState.LOST) || (flinkAppState.equals(FlinkAppState.CANCELED) && StopFrom.NONE.equals(stopFrom)) || applicationService.checkAlter(application)) { - alert(application, flinkAppState); + doAlert(application, flinkAppState); stopCanceledJob(application.getId()); if (flinkAppState.equals(FlinkAppState.FAILED)) { applicationService.start(application, true); @@ -585,23 +589,6 @@ public class FlinkRESTAPIWatcher { } } - public static void doWatching(Application application) { - if (isKubernetesApp(application)) { - return; - } - log.info("FlinkRESTAPIWatcher add app to tracking,appId:{}", application.getId()); - WATCHING_APPS.put(application.getId(), application); - STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE); - } - - public static void addSavepoint(Long appId) { - if (isKubernetesApp(appId)) { - return; - } - log.info("FlinkRESTAPIWatcher add app to savepoint,appId:{}", appId); - SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE); - } - public static void unWatching(Long appId) { if (isKubernetesApp(appId)) { return; @@ -618,19 +605,6 @@ public class FlinkRESTAPIWatcher { CANCELLED_JOB_MAP.remove(appId); } - public static void addCanceledApp(Long appId, Long userId) { - log.info("flink job addCanceledApp app appId:{}, useId:{}", appId, userId); - CANCELLED_JOB_MAP.put(appId, userId); - } - - public static Long getCanceledJobUserId(Long appId) { - return CANCELLED_JOB_MAP.get(appId) == null ? Long.valueOf(-1) : CANCELLED_JOB_MAP.get(appId); - } - - public static Collection<Application> getWatchingApps() { - return WATCHING_APPS.values(); - } - private static boolean isKubernetesApp(Application application) { return FlinkK8sWatcherWrapper.isKubernetesApp(application); } @@ -743,10 +717,6 @@ public class FlinkRESTAPIWatcher { return JacksonUtils.read(result, clazz); } - public boolean isWatchingApp(Long id) { - return WATCHING_APPS.containsKey(id); - } - private <T> T httpRemoteCluster(Long clusterId, Callback<FlinkCluster, T> function) throws Exception { FlinkCluster flinkCluster = getFlinkRemoteCluster(clusterId, false); @@ -781,19 +751,26 @@ public class FlinkRESTAPIWatcher { * alarm; If the abnormal behavior of the job is caused by itself and the flink cluster is running * normally, the job will an alarm */ - private void alert(Application app, FlinkAppState appState) { - if (ExecutionMode.isYarnPerJobOrAppMode(app.getExecutionModeEnum()) - || !flinkClusterWatcher.checkAlert(app.getFlinkClusterId())) { - alertService.alert(app, appState); - return; - } - boolean isValid = flinkClusterWatcher.verifyClusterValidByClusterId(app.getFlinkClusterId()); - if (isValid) { - log.info( - "application with id {} is yarn session or remote and flink cluster with id {} is alive, application send alert", - app.getId(), - app.getFlinkClusterId()); - alertService.alert(app, appState); + private void doAlert(Application app, FlinkAppState appState) { + switch (app.getExecutionModeEnum()) { + case YARN_APPLICATION: + case YARN_PER_JOB: + alertService.alert(app, appState); + return; + case YARN_SESSION: + case REMOTE: + FlinkCluster flinkCluster = flinkClusterService.getById(app.getFlinkClusterId()); + ClusterState clusterState = flinkClusterWatcher.getClusterState(flinkCluster); + if (ClusterState.isRunningState(clusterState)) { + log.info( + "application with id {} is yarn session or remote and flink cluster with id {} is alive, application send alert", + app.getId(), + app.getFlinkClusterId()); + alertService.alert(app, appState); + } + break; + default: + break; } } }
