This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch cluster-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/cluster-state by this push:
new 6a9d1d346 Flink cluster state monitoring improve
6a9d1d346 is described below
commit 6a9d1d346d2b589524af8c899c1ca5f641c55fde
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 2 18:03:52 2023 +0800
Flink cluster state monitoring improve
---
.../streampark/common/enums/ClusterState.java | 20 +++--------
.../console/core/mapper/ApplicationMapper.java | 2 +-
.../console/core/service/ApplicationService.java | 2 +-
.../console/core/service/FlinkClusterService.java | 3 +-
.../core/service/impl/ApplicationServiceImpl.java | 4 +--
.../core/service/impl/FlinkClusterServiceImpl.java | 8 ++---
.../console/core/task/FlinkClusterWatcher.java | 41 +++++++++++++---------
.../console/core/task/FlinkRESTAPIWatcher.java | 27 +++++++-------
.../resources/mapper/core/ApplicationMapper.xml | 2 +-
9 files changed, 54 insertions(+), 55 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index decfc770a..d9a715f7e 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -60,23 +60,13 @@ public enum ClusterState implements Serializable {
return value;
}
- public static boolean isCreateState(ClusterState state) {
- return CREATED.equals(state);
- }
-
- public static boolean isRunningState(ClusterState state) {
+ public static boolean isRunning(ClusterState state) {
return RUNNING.equals(state);
}
- public static boolean isStoppedState(ClusterState state) {
- return STOPPED.equals(state);
- }
-
- public static boolean isLostState(ClusterState state) {
- return LOST.equals(state);
- }
-
- public static boolean isUnknownState(ClusterState state) {
- return UNKNOWN.equals(state);
+ public static boolean isFailed(ClusterState state) {
+ return state == ClusterState.STOPPED
+ || state == ClusterState.LOST
+ || state == ClusterState.UNKNOWN;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index d1b59fa07..55c2ee72f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -66,5 +66,5 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
- Integer getAffectedJobsByClusterId(@Param("clusterId") Long clusterId);
+ Integer countJobsByClusterId(@Param("clusterId") Long clusterId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index a64922afa..cba588c65 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -105,7 +105,7 @@ public interface ApplicationService extends
IService<Application> {
boolean existsJobByClusterId(Long clusterId);
- Integer getAffectedJobsByClusterId(Long clusterId);
+ Integer countJobsByClusterId(Long clusterId);
boolean existsJobByFlinkEnvId(Long flinkEnvId);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 788596c5d..6d4559735 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -48,5 +49,5 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode>
executionModes);
- void updateClusterToStopped(Long id);
+ void updateClusterFinalState(Long id, ClusterState state);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 98253bb9f..ef9150fe4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -545,8 +545,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
@Override
- public Integer getAffectedJobsByClusterId(Long clusterId) {
- return baseMapper.getAffectedJobsByClusterId(clusterId);
+ public Integer countJobsByClusterId(Long clusterId) {
+ return baseMapper.countJobsByClusterId(clusterId);
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index a8512a203..1dfc09e13 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -279,11 +279,11 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public void updateClusterToStopped(Long id) {
+ public void updateClusterFinalState(Long id, ClusterState state) {
LambdaUpdateWrapper<FlinkCluster> updateWrapper =
new LambdaUpdateWrapper<FlinkCluster>()
.eq(FlinkCluster::getId, id)
- .set(FlinkCluster::getClusterState,
ClusterState.STOPPED.getValue())
+ .set(FlinkCluster::getClusterState, state.getValue())
.set(FlinkCluster::getEndTime, new Date());
update(updateWrapper);
}
@@ -297,7 +297,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())
||
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode())) {
ApiAlertException.throwIfTrue(
- ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
+ ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Flink cluster is running, cannot be delete, please check.");
}
@@ -388,7 +388,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
ApiAlertException.throwIfFalse(
- ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
+ ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Current cluster is not active, please check!");
if (!flinkCluster.verifyClusterConnection()) {
flinkCluster.setClusterState(ClusterState.LOST.getValue());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 13de7a3fe..39af495dc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -34,6 +34,8 @@ import
org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hc.client5.http.config.RequestConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
@@ -62,7 +64,7 @@ public class FlinkClusterWatcher {
@Autowired private ApplicationService applicationService;
- private Long lastWatchingTime = 0L;
+ private Long lastWatchTime = 0L;
// Track interval every 30 seconds
private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);
@@ -70,6 +72,9 @@ public class FlinkClusterWatcher {
/** Watcher cluster lists */
private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new
ConcurrentHashMap<>(8);
+ private static final Cache<Long, ClusterState> FAILED_STATES =
+ Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build();
+
private boolean immediateWatch = false;
/** Thread pool for processing status monitoring for each cluster */
@@ -97,24 +102,18 @@ public class FlinkClusterWatcher {
@Scheduled(fixedDelay = 1000)
private void start() {
if (immediateWatch
- || System.currentTimeMillis() - lastWatchingTime >=
WATCHER_INTERVAL.toMillis()) {
- lastWatchingTime = System.currentTimeMillis();
+ || System.currentTimeMillis() - lastWatchTime >=
WATCHER_INTERVAL.toMillis()) {
+ lastWatchTime = System.currentTimeMillis();
immediateWatch = false;
for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
EXECUTOR.execute(
() -> {
FlinkCluster flinkCluster = entry.getValue();
ClusterState state = getClusterState(flinkCluster);
- switch (state) {
- case STOPPED:
-
flinkClusterService.updateClusterToStopped(flinkCluster.getId());
- break;
- // fall through
- case LOST:
- case UNKNOWN:
- unWatching(flinkCluster);
- alert(flinkCluster, state);
- break;
+ if (ClusterState.isFailed(state)) {
+
flinkClusterService.updateClusterFinalState(flinkCluster.getId(), state);
+ unWatching(flinkCluster);
+ alert(flinkCluster, state);
}
});
}
@@ -123,7 +122,7 @@ public class FlinkClusterWatcher {
private void alert(FlinkCluster cluster, ClusterState state) {
if (cluster.getAlertId() != null) {
-
cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId()));
+
cluster.setJobs(applicationService.countJobsByClusterId(cluster.getId()));
cluster.setClusterState(state.getValue());
cluster.setEndTime(new Date());
alertService.alert(cluster, state);
@@ -136,8 +135,11 @@ public class FlinkClusterWatcher {
* @param flinkCluster
* @return
*/
- public synchronized ClusterState getClusterState(FlinkCluster flinkCluster) {
- ClusterState state;
+ public ClusterState getClusterState(FlinkCluster flinkCluster) {
+ ClusterState state = FAILED_STATES.getIfPresent(flinkCluster.getId());
+ if (state != null) {
+ return state;
+ }
switch (flinkCluster.getExecutionModeEnum()) {
case REMOTE:
state = httpRemoteClusterState(flinkCluster);
@@ -149,7 +151,12 @@ public class FlinkClusterWatcher {
state = ClusterState.UNKNOWN;
break;
}
- immediateWatch = !ClusterState.isRunningState(state);
+ if (ClusterState.isRunning(state)) {
+ FAILED_STATES.invalidate(flinkCluster.getId());
+ } else {
+ immediateWatch = true;
+ FAILED_STATES.put(flinkCluster.getId(), state);
+ }
return state;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index ff6ee225b..6af9d0cb0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -54,6 +54,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collection;
import java.util.Date;
import java.util.List;
@@ -84,9 +85,10 @@ public class FlinkRESTAPIWatcher {
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
// track interval every 5 seconds
- private static final long WATCHING_INTERVAL = 1000L * 5;
+ private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+
// option interval within 10 seconds
- private static final long OPTION_INTERVAL = 1000L * 10;
+ private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
/**
*
@@ -139,7 +141,7 @@ public class FlinkRESTAPIWatcher {
private static final Map<Long, OptionState> OPTIONING = new
ConcurrentHashMap<>(0);
- private Long lastWatchingTime = 0L;
+ private Long lastWatchTime = 0L;
private Long lastOptionTime = 0L;
@@ -188,20 +190,19 @@ public class FlinkRESTAPIWatcher {
public void start() {
// The application has been started at the first time, or the front-end is
operating start/stop,
// need to return status info immediately.
- if (lastWatchingTime == null || !OPTIONING.isEmpty()) {
- doWatch();
- } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL)
{
- // The last operation time is less than option interval.(10 seconds)
- doWatch();
- } else if (System.currentTimeMillis() - lastWatchingTime >=
WATCHING_INTERVAL) {
- // Normal information obtain, check if there is 5 seconds interval
between this time and the
- // last time.(once every 5 seconds)
+ if (lastWatchTime == null || !OPTIONING.isEmpty()) {
doWatch();
+ } else {
+ Long timeMillis = System.currentTimeMillis();
+ if (timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
+ || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
+ lastWatchTime = timeMillis;
+ doWatch();
+ }
}
}
private void doWatch() {
- lastWatchingTime = System.currentTimeMillis();
for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) {
watch(entry.getKey(), entry.getValue());
}
@@ -796,7 +797,7 @@ public class FlinkRESTAPIWatcher {
case REMOTE:
FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
ClusterState clusterState =
flinkClusterWatcher.getClusterState(flinkCluster);
- if (ClusterState.isRunningState(clusterState)) {
+ if (ClusterState.isRunning(clusterState)) {
log.info(
"application with id {} is yarn session or remote and flink
cluster with id {} is alive, application send alert",
app.getId(),
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index cc0173387..04005c724 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -133,7 +133,7 @@
limit 1
</select>
- <select id="getAffectedJobsByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
+ <select id="countJobsByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
select
count(1)
from t_flink_app