This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 9f825706d [Improve] Flink cluster status monitoring improvement (#2826)
9f825706d is described below
commit 9f825706d2ef5ad354d11adbe53ed25a71f87087
Author: benjobs <[email protected]>
AuthorDate: Tue Jul 11 23:53:29 2023 +0800
[Improve] Flink cluster status monitoring improvement (#2826)
* [Improve] Flink cluster status monitoring improvement
* flink cluster jobManagerUrl improvement
---
.../streampark/common/enums/ClusterState.java | 20 +--
.../main/assembly/script/upgrade/mysql/2.2.0.sql | 26 +--
.../main/assembly/script/upgrade/pgsql/2.2.0.sql | 22 +--
.../console/core/annotation/AppUpdated.java | 2 +-
.../console/core/aspect/StreamParkAspect.java | 6 +-
.../console/core/mapper/ApplicationMapper.java | 2 +-
.../console/core/service/ApplicationService.java | 2 +-
.../console/core/service/FlinkClusterService.java | 3 +
.../core/service/impl/AppBuildPipeServiceImpl.java | 12 +-
.../core/service/impl/ApplicationServiceImpl.java | 32 ++--
.../core/service/impl/FlinkClusterServiceImpl.java | 86 +++++----
.../core/service/impl/ProjectServiceImpl.java | 8 +-
.../core/service/impl/SavePointServiceImpl.java | 10 +-
.../console/core/task/CheckpointProcessor.java | 4 +-
.../console/core/task/FlinkClusterWatcher.java | 194 +++++++--------------
...nkRESTAPIWatcher.java => FlinkHttpWatcher.java} | 127 +++++++-------
.../resources/mapper/core/ApplicationMapper.xml | 2 +-
17 files changed, 242 insertions(+), 316 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index decfc770a..d9a715f7e 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -60,23 +60,13 @@ public enum ClusterState implements Serializable {
return value;
}
- public static boolean isCreateState(ClusterState state) {
- return CREATED.equals(state);
- }
-
- public static boolean isRunningState(ClusterState state) {
+ public static boolean isRunning(ClusterState state) {
return RUNNING.equals(state);
}
- public static boolean isStoppedState(ClusterState state) {
- return STOPPED.equals(state);
- }
-
- public static boolean isLostState(ClusterState state) {
- return LOST.equals(state);
- }
-
- public static boolean isUnknownState(ClusterState state) {
- return UNKNOWN.equals(state);
+ public static boolean isFailed(ClusterState state) {
+ return state == ClusterState.STOPPED
+ || state == ClusterState.LOST
+ || state == ClusterState.UNKNOWN;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 16404952d..757ac6cd7 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -25,19 +25,19 @@ set foreign_key_checks = 0;
-- ----------------------------
drop table if exists `t_resource`;
create table `t_resource` (
- `id` bigint not null auto_increment,
- `resource_name` varchar(128) collate
utf8mb4_general_ci not null comment 'The name of the resource file',
- `resource_type` int not null comment '0:app
1:common 2:connector 3:format 4:udf',
- `resource` text collate utf8mb4_general_ci
comment 'resource content, including jars and poms',
- `engine_type` int not null comment 'compute
engine type, 0:apache flink 1:apache spark',
- `main_class` varchar(255) collate
utf8mb4_general_ci default null,
- `description` text collate utf8mb4_general_ci
default null comment 'More detailed description of resource',
- `creator_id` bigint collate utf8mb4_general_ci
not null comment 'user id of creator',
- `team_id` bigint collate utf8mb4_general_ci
not null comment 'team id',
- `create_time` datetime not null default
current_timestamp comment 'create time',
- `modify_time` datetime not null default
current_timestamp on update current_timestamp comment 'modify time',
- primary key (`id`) using btree,
- unique key `un_team_vcode_inx`
(`team_id`,`resource_name`) using btree
+`id` bigint not null auto_increment,
+`resource_name` varchar(128) collate utf8mb4_general_ci not null comment 'The
name of the resource file',
+`resource_type` int not null comment '0:app 1:common 2:connector 3:format
4:udf',
+`resource` text collate utf8mb4_general_ci comment 'resource content,
including jars and poms',
+`engine_type` int not null comment 'compute engine type, 0:apache flink
1:apache spark',
+`main_class` varchar(255) collate utf8mb4_general_ci default null,
+`description` text collate utf8mb4_general_ci default null comment 'More
detailed description of resource',
+`creator_id` bigint collate utf8mb4_general_ci not null comment 'user id of
creator',
+`team_id` bigint collate utf8mb4_general_ci not null comment 'team id',
+`create_time` datetime not null default current_timestamp comment 'create
time',
+`modify_time` datetime not null default current_timestamp on update
current_timestamp comment 'modify time',
+primary key (`id`) using btree,
+unique key `un_team_vcode_inx` (`team_id`,`resource_name`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
alter table `t_flink_sql`
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index 56b055ce8..7f5a54f7d 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -22,17 +22,17 @@ create sequence "public"."streampark_t_resource_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue
9223372036854775807;
create table "public"."t_resource" (
- "id" int8 not null default
nextval('streampark_t_resource_id_seq'::regclass),
- "resource_name" varchar(128) collate
"pg_catalog"."default" not null,
- "resource_type" int4,
- "resource" text collate
"pg_catalog"."default",
- "engine_type" int4,
- "main_class" varchar(255) collate
"pg_catalog"."default",
- "description" text collate
"pg_catalog"."default" default null,
- "creator_id" int8 not null,
- "team_id" int8 not null,
- "create_time" timestamp(6) not null
default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
- "modify_time" timestamp(6) not null
default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
+"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
+"resource_name" varchar(128) collate "pg_catalog"."default" not null,
+"resource_type" int4,
+"resource" text collate "pg_catalog"."default",
+"engine_type" int4,
+"main_class" varchar(255) collate "pg_catalog"."default",
+"description" text collate "pg_catalog"."default" default null,
+"creator_id" int8 not null,
+"team_id" int8 not null,
+"create_time" timestamp(6) not null default timezone('UTC-8'::text,
(now())::timestamp(0) without time zone),
+"modify_time" timestamp(6) not null default timezone('UTC-8'::text,
(now())::timestamp(0) without time zone)
)
;
comment on column "public"."t_resource"."id" is 'Resource id';
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
index 345a361c7..97c77a79d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
@@ -30,7 +30,7 @@ import java.lang.annotation.Target;
* cause the application to be updated, Will work together with {@link
*
org.apache.streampark.console.core.aspect.StreamParkAspect#appUpdated(ProceedingJoinPoint)},
The
* final purpose will be refresh {@link
- * org.apache.streampark.console.core.task.FlinkRESTAPIWatcher#WATCHING_APPS},
Make the state of the
+ * org.apache.streampark.console.core.task.FlinkHttpWatcher#WATCHING_APPS},
Make the state of the
* job consistent with the database
*/
@Target(ElementType.METHOD)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
index 6646ead55..2392885bd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
@@ -28,7 +28,7 @@ import
org.apache.streampark.console.core.enums.PermissionType;
import org.apache.streampark.console.core.enums.UserType;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.console.system.entity.AccessToken;
import org.apache.streampark.console.system.entity.Member;
import org.apache.streampark.console.system.entity.User;
@@ -58,7 +58,7 @@ import java.util.Objects;
@Aspect
public class StreamParkAspect {
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkHttpWatcher flinkHttpWatcher;
@Autowired private CommonService commonService;
@Autowired private MemberService memberService;
@Autowired private ApplicationService applicationService;
@@ -93,7 +93,7 @@ public class StreamParkAspect {
MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
log.debug("appUpdated aspect, method:{}", methodSignature.getName());
Object target = joinPoint.proceed();
- flinkRESTAPIWatcher.init();
+ flinkHttpWatcher.init();
return target;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index d1b59fa07..55c2ee72f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -66,5 +66,5 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
- Integer getAffectedJobsByClusterId(@Param("clusterId") Long clusterId);
+ Integer countJobsByClusterId(@Param("clusterId") Long clusterId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index a64922afa..cba588c65 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -105,7 +105,7 @@ public interface ApplicationService extends
IService<Application> {
boolean existsJobByClusterId(Long clusterId);
- Integer getAffectedJobsByClusterId(Long clusterId);
+ Integer countJobsByClusterId(Long clusterId);
boolean existsJobByFlinkEnvId(Long flinkEnvId);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d149152af..6d4559735 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -47,4 +48,6 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
Boolean existsByFlinkEnvId(Long id);
List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode>
executionModes);
+
+ void updateClusterFinalState(Long id, ClusterState state);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index ac4917bc7..2a38308cb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -55,7 +55,7 @@ import
org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.MessageService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SettingService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.packer.docker.DockerConf;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.DependencyInfo;
@@ -131,7 +131,7 @@ public class AppBuildPipeServiceImpl
@Autowired private ApplicationLogService applicationLogService;
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkHttpWatcher flinkHttpWatcher;
@Autowired private ApplicationConfigService applicationConfigService;
@@ -217,8 +217,8 @@ public class AppBuildPipeServiceImpl
app.setRelease(ReleaseState.RELEASING.get());
applicationService.updateRelease(app);
- if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
- flinkRESTAPIWatcher.init();
+ if (flinkHttpWatcher.isWatchingApp(app.getId())) {
+ flinkHttpWatcher.init();
}
// 1) checkEnv
@@ -341,8 +341,8 @@ public class AppBuildPipeServiceImpl
}
applicationService.updateRelease(app);
applicationLogService.save(applicationLog);
- if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
- flinkRESTAPIWatcher.init();
+ if (flinkHttpWatcher.isWatchingApp(app.getId())) {
+ flinkHttpWatcher.init();
}
}
});
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 98253bb9f..886756770 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -78,7 +78,7 @@ import
org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.YarnQueueService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
import org.apache.streampark.flink.client.bean.CancelResponse;
@@ -238,7 +238,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Integer runningJob = 0;
// stat metrics from other than kubernetes mode
- for (Application app : FlinkRESTAPIWatcher.getWatchingApps()) {
+ for (Application app : FlinkHttpWatcher.getWatchingApps()) {
if (!teamId.equals(app.getTeamId())) {
continue;
}
@@ -400,7 +400,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
} else {
- FlinkRESTAPIWatcher.unWatching(paramApp.getId());
+ FlinkHttpWatcher.unWatching(paramApp.getId());
}
return true;
}
@@ -449,7 +449,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (!FlinkAppState.CANCELED.equals(state)) {
return false;
}
- long cancelUserId = FlinkRESTAPIWatcher.getCanceledJobUserId(appId);
+ long cancelUserId = FlinkHttpWatcher.getCanceledJobUserId(appId);
long appUserId = application.getUserId();
return cancelUserId != -1 && cancelUserId != appUserId;
}
@@ -530,7 +530,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (exists) {
return true;
}
- for (Application application : FlinkRESTAPIWatcher.getWatchingApps()) {
+ for (Application application : FlinkHttpWatcher.getWatchingApps()) {
if (clusterId.equals(application.getFlinkClusterId())
&& FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()))
{
return true;
@@ -545,8 +545,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
@Override
- public Integer getAffectedJobsByClusterId(Long clusterId) {
- return baseMapper.getAffectedJobsByClusterId(clusterId);
+ public Integer countJobsByClusterId(Long clusterId) {
+ return baseMapper.countJobsByClusterId(clusterId);
}
@Override
@@ -1208,14 +1208,14 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
} else {
- FlinkRESTAPIWatcher.doWatching(application);
+ FlinkHttpWatcher.doWatching(application);
}
return mapping;
}
@Override
public void cancel(Application appParam) throws Exception {
- FlinkRESTAPIWatcher.setOptionState(appParam.getId(),
OptionState.CANCELLING);
+ FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING);
Application application = getById(appParam.getId());
application.setState(FlinkAppState.CANCELLING.getValue());
@@ -1227,7 +1227,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
applicationLog.setYarnAppId(application.getClusterId());
if (appParam.getSavePointed()) {
- FlinkRESTAPIWatcher.addSavepoint(application.getId());
+ FlinkHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionState.CANCELLING.getValue());
@@ -1238,7 +1238,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Long userId = commonService.getUserId();
if (!application.getUserId().equals(userId)) {
- FlinkRESTAPIWatcher.addCanceledApp(application.getId(), userId);
+ FlinkHttpWatcher.addCanceledApp(application.getId(), userId);
}
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -1343,7 +1343,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
} else {
- FlinkRESTAPIWatcher.unWatching(application.getId());
+ FlinkHttpWatcher.unWatching(application.getId());
}
String exception = Utils.stringifyException(e);
@@ -1606,8 +1606,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
} else {
- FlinkRESTAPIWatcher.setOptionState(appParam.getId(),
OptionState.STARTING);
- FlinkRESTAPIWatcher.doWatching(application);
+ FlinkHttpWatcher.setOptionState(appParam.getId(),
OptionState.STARTING);
+ FlinkHttpWatcher.doWatching(application);
}
applicationLog.setSuccess(true);
@@ -1628,7 +1628,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (isKubernetesApp(app)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(app));
} else {
- FlinkRESTAPIWatcher.unWatching(appParam.getId());
+ FlinkHttpWatcher.unWatching(appParam.getId());
}
}
})
@@ -1734,7 +1734,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
} else {
- FlinkRESTAPIWatcher.unWatching(application.getId());
+ FlinkHttpWatcher.unWatching(application.getId());
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 769ddf626..ad6af6617 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
@@ -149,7 +150,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
boolean ret = save(flinkCluster);
if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
- FlinkClusterWatcher.addFlinkCluster(flinkCluster);
+ FlinkClusterWatcher.addWatching(flinkCluster);
}
return ret;
}
@@ -165,7 +166,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
"Deploy cluster failed, unknown reason,please check you params or
StreamPark error log");
if
(ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
String address =
- YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
+ String.format(
+ "%s/proxy/%s/", YarnUtils.getRMWebAppURL(true),
deployResponse.clusterId());
flinkCluster.setAddress(address);
flinkCluster.setJobManagerUrl(deployResponse.address());
} else {
@@ -176,12 +178,10 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setException(null);
flinkCluster.setStartTime(new Date());
flinkCluster.setEndTime(null);
- FlinkClusterWatcher.addFlinkCluster(flinkCluster);
+ FlinkClusterWatcher.addWatching(flinkCluster);
updateById(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
- flinkCluster.setAddress(null);
- flinkCluster.setJobManagerUrl(null);
flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
flinkCluster.setException(e.toString());
updateById(flinkCluster);
@@ -190,18 +190,31 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public void update(FlinkCluster cluster) {
- FlinkCluster flinkCluster = getById(cluster.getId());
- boolean success = validateQueueIfNeeded(flinkCluster, cluster);
+ public void update(FlinkCluster paramOfCluster) {
+ FlinkCluster flinkCluster = getById(paramOfCluster.getId());
+ boolean success = validateQueueIfNeeded(flinkCluster, paramOfCluster);
ApiAlertException.throwIfFalse(
- success, String.format(ERROR_CLUSTER_QUEUE_HINT,
cluster.getYarnQueue()));
- updateCluster(cluster, flinkCluster);
- try {
- updateById(flinkCluster);
- } catch (Exception e) {
- throw new ApiDetailException(
- "Update cluster failed, Caused By: " +
ExceptionUtils.getStackTrace(e));
+ success, String.format(ERROR_CLUSTER_QUEUE_HINT,
paramOfCluster.getYarnQueue()));
+
+ flinkCluster.setClusterName(paramOfCluster.getClusterName());
+ flinkCluster.setDescription(paramOfCluster.getDescription());
+ if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
+ flinkCluster.setAddress(paramOfCluster.getAddress());
+ } else {
+ flinkCluster.setClusterId(paramOfCluster.getClusterId());
+ flinkCluster.setVersionId(paramOfCluster.getVersionId());
+ flinkCluster.setDynamicProperties(paramOfCluster.getDynamicProperties());
+ flinkCluster.setOptions(paramOfCluster.getOptions());
+ flinkCluster.setResolveOrder(paramOfCluster.getResolveOrder());
+
flinkCluster.setK8sHadoopIntegration(paramOfCluster.getK8sHadoopIntegration());
+ flinkCluster.setK8sConf(paramOfCluster.getK8sConf());
+ flinkCluster.setK8sNamespace(paramOfCluster.getK8sNamespace());
+
flinkCluster.setK8sRestExposedType(paramOfCluster.getK8sRestExposedType());
+ flinkCluster.setServiceAccount(paramOfCluster.getServiceAccount());
+ flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
+ flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
}
+ updateById(flinkCluster);
}
@Override
@@ -224,10 +237,9 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
// 4) shutdown
ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster,
clusterId);
ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response
failed");
- flinkCluster.setAddress(null);
flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
flinkCluster.setEndTime(new Date());
- FlinkClusterWatcher.removeFlinkCluster(flinkCluster);
+ FlinkClusterWatcher.unWatching(flinkCluster);
updateById(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
@@ -267,6 +279,16 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
.collect(Collectors.toSet())));
}
+ @Override
+ public void updateClusterFinalState(Long id, ClusterState state) {
+ LambdaUpdateWrapper<FlinkCluster> updateWrapper =
+ new LambdaUpdateWrapper<FlinkCluster>()
+ .eq(FlinkCluster::getId, id)
+ .set(FlinkCluster::getClusterState, state.getValue())
+ .set(FlinkCluster::getEndTime, new Date());
+ update(updateWrapper);
+ }
+
@Override
public void delete(FlinkCluster cluster) {
Long id = cluster.getId();
@@ -276,7 +298,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())
||
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode())) {
ApiAlertException.throwIfTrue(
- ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
+ ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Flink cluster is running, cannot be delete, please check.");
}
@@ -367,11 +389,9 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
ApiAlertException.throwIfFalse(
- ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
+ ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Current cluster is not active, please check!");
if (!flinkCluster.verifyClusterConnection()) {
- flinkCluster.setAddress(null);
- flinkCluster.setJobManagerUrl(null);
flinkCluster.setClusterState(ClusterState.LOST.getValue());
updateById(flinkCluster);
throw new ApiAlertException("Current cluster is not active, please
check!");
@@ -379,30 +399,6 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
}
- private void updateCluster(FlinkCluster cluster, FlinkCluster flinkCluster) {
- flinkCluster.setClusterName(cluster.getClusterName());
- flinkCluster.setDescription(cluster.getDescription());
- if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
- flinkCluster.setAddress(cluster.getAddress());
- flinkCluster.setJobManagerUrl(cluster.getAddress());
- } else {
- flinkCluster.setAddress(null);
- flinkCluster.setJobManagerUrl(null);
- flinkCluster.setClusterId(cluster.getClusterId());
- flinkCluster.setVersionId(cluster.getVersionId());
- flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
- flinkCluster.setOptions(cluster.getOptions());
- flinkCluster.setResolveOrder(cluster.getResolveOrder());
- flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
- flinkCluster.setK8sConf(cluster.getK8sConf());
- flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
- flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
- flinkCluster.setServiceAccount(cluster.getServiceAccount());
- flinkCluster.setFlinkImage(cluster.getFlinkImage());
- flinkCluster.setYarnQueue(cluster.getYarnQueue());
- }
- }
-
@Nullable
private KubernetesDeployParam getKubernetesDeployDesc(
@Nonnull FlinkCluster flinkCluster, String action) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 7d2b53d43..e190b6e6a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -39,7 +39,7 @@ import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.mapper.ProjectMapper;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.ProjectService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.console.core.task.ProjectBuildTask;
import org.apache.flink.configuration.MemorySize;
@@ -81,7 +81,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Autowired private ApplicationService applicationService;
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkHttpWatcher flinkHttpWatcher;
private final ExecutorService executorService =
new ThreadPoolExecutor(
@@ -206,7 +206,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
if (buildState == BuildState.SUCCESSFUL) {
baseMapper.updateBuildTime(id);
}
- flinkRESTAPIWatcher.init();
+ flinkHttpWatcher.init();
},
fileLogger -> {
List<Application> applications =
@@ -221,7 +221,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
app.setBuild(true);
this.applicationService.updateRelease(app);
});
- flinkRESTAPIWatcher.init();
+ flinkHttpWatcher.init();
});
CompletableFuture<Void> buildTask =
CompletableFuture.runAsync(projectBuildTask, executorService);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 22be1d5e1..2ed5bb3b5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -43,7 +43,7 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.SavePointService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.SavepointResponse;
import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
@@ -101,7 +101,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Autowired private ApplicationLogService applicationLogService;
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkHttpWatcher flinkHttpWatcher;
private final ExecutorService executorService =
new ThreadPoolExecutor(
@@ -176,12 +176,12 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
applicationLog.setOptionTime(new Date());
applicationLog.setYarnAppId(application.getClusterId());
- FlinkRESTAPIWatcher.addSavepoint(application.getId());
+ FlinkHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
application.setOptionTime(new Date());
this.applicationService.updateById(application);
- flinkRESTAPIWatcher.init();
+ flinkHttpWatcher.init();
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -266,7 +266,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
application.setOptionState(OptionState.NONE.getValue());
application.setOptionTime(new Date());
applicationService.update(application);
- flinkRESTAPIWatcher.init();
+ flinkHttpWatcher.init();
});
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index f96217b92..2c17f51f2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -69,7 +69,7 @@ public class CheckpointProcessor {
@Autowired private SavePointService savePointService;
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkHttpWatcher flinkHttpWatcher;
public void process(Application application, @Nonnull CheckPoints
checkPoints) {
checkPoints.getLatestCheckpoint().forEach(checkPoint ->
process(application, checkPoint));
@@ -85,7 +85,7 @@ public class CheckpointProcessor {
if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
savepointedCache.put(checkPointKey.getSavePointId(),
DEFAULT_FLAG_BYTE);
saveSavepoint(checkPoint, application.getId());
- flinkRESTAPIWatcher.cleanSavepoint(application);
+ flinkHttpWatcher.cleanSavepoint(application);
return;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 29131f651..ed545b0dd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -35,14 +35,14 @@ import
org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hc.client5.http.config.RequestConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Date;
@@ -65,7 +65,7 @@ public class FlinkClusterWatcher {
@Autowired private ApplicationService applicationService;
- private Long lastWatcheringTime = 0L;
+ private Long lastWatchTime = 0L;
// Track interval every 30 seconds
private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);
@@ -73,6 +73,11 @@ public class FlinkClusterWatcher {
/** Watcher cluster lists */
private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new
ConcurrentHashMap<>(8);
+ private static final Cache<Long, ClusterState> FAILED_STATES =
+ Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build();
+
+ private boolean immediateWatch = false;
+
/** Thread pool for processing status monitoring for each cluster */
private static final ExecutorService EXECUTOR =
new ThreadPoolExecutor(
@@ -95,71 +100,33 @@ public class FlinkClusterWatcher {
flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(),
cluster));
}
- /** flinkcluster persistent */
- @PreDestroy
- private void stop() {
- // TODO: flinkcluster persistent
- }
-
@Scheduled(fixedDelay = 1000)
private void start() {
- if (System.currentTimeMillis() - lastWatcheringTime >=
WATCHER_INTERVAL.toMillis()) {
- lastWatcheringTime = System.currentTimeMillis();
- watcher();
- }
- }
-
- private void watcher() {
- for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
- EXECUTOR.execute(
- () -> {
- FlinkCluster flinkCluster = entry.getValue();
- updateClusterState(flinkCluster);
- });
- }
- }
-
- private ClusterState updateClusterState(FlinkCluster flinkCluster) {
- Integer clusterExecutionMode = flinkCluster.getExecutionMode();
- if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
- ClusterState state = getClusterState(flinkCluster);
- handleClusterState(flinkCluster, state);
- return state;
- } else {
- // TODO: K8s Session status monitoring
- return ClusterState.UNKNOWN;
- }
- }
-
- public synchronized boolean verifyClusterValidByClusterId(Long clusterId) {
- FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
- ClusterState state = ClusterState.of(flinkCluster.getClusterState());
- if (!ClusterState.isRunningState(state)) {
- return false;
- }
- state = updateClusterState(flinkCluster);
- if (!ClusterState.isRunningState(state)) {
- return false;
+ Long timeMillis = System.currentTimeMillis();
+ if (immediateWatch || timeMillis - lastWatchTime >=
WATCHER_INTERVAL.toMillis()) {
+ lastWatchTime = timeMillis;
+ immediateWatch = false;
+ WATCHER_CLUSTERS.forEach(
+ (aLong, flinkCluster) ->
+ EXECUTOR.execute(
+ () -> {
+ ClusterState state = getClusterState(flinkCluster);
+ if (ClusterState.isFailed(state)) {
+
flinkClusterService.updateClusterFinalState(flinkCluster.getId(), state);
+ unWatching(flinkCluster);
+ alert(flinkCluster, state);
+ }
+ }));
}
- return true;
- }
-
- public boolean checkAlert(Long clusterId) {
- FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
- if (flinkCluster.getAlertId() == null) {
- return false;
- }
- return true;
}
private void alert(FlinkCluster cluster, ClusterState state) {
- if (!checkAlert(cluster.getId())) {
- return;
+ if (cluster.getAlertId() != null) {
+
cluster.setJobs(applicationService.countJobsByClusterId(cluster.getId()));
+ cluster.setClusterState(state.getValue());
+ cluster.setEndTime(new Date());
+ alertService.alert(cluster, state);
}
-
cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId()));
- cluster.setClusterState(state.getValue());
- cluster.setEndTime(new Date());
- alertService.alert(cluster, state);
}
/**
@@ -168,13 +135,29 @@ public class FlinkClusterWatcher {
* @param flinkCluster
* @return
*/
- private ClusterState getClusterState(FlinkCluster flinkCluster) {
- ClusterState state = getClusterStateFromFlinkAPI(flinkCluster);
- if (ClusterState.isRunningState(state)) {
+ public ClusterState getClusterState(FlinkCluster flinkCluster) {
+ ClusterState state = FAILED_STATES.getIfPresent(flinkCluster.getId());
+ if (state != null) {
return state;
+ }
+ switch (flinkCluster.getExecutionModeEnum()) {
+ case REMOTE:
+ state = httpRemoteClusterState(flinkCluster);
+ break;
+ case YARN_SESSION:
+ state = httpYarnSessionClusterState(flinkCluster);
+ break;
+ default:
+ state = ClusterState.UNKNOWN;
+ break;
+ }
+ if (ClusterState.isRunning(state)) {
+ FAILED_STATES.invalidate(flinkCluster.getId());
} else {
- return getClusterStateFromYarnAPI(flinkCluster);
+ immediateWatch = true;
+ FAILED_STATES.put(flinkCluster.getId(), state);
}
+ return state;
}
/**
@@ -183,12 +166,12 @@ public class FlinkClusterWatcher {
* @param flinkCluster
* @return
*/
- private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) {
+ private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
final String address = flinkCluster.getAddress();
- final String jobManagerUrl = flinkCluster.getJobManagerUrl();
if (StringUtils.isEmpty(address)) {
return ClusterState.STOPPED;
}
+ final String jobManagerUrl = flinkCluster.getJobManagerUrl();
final String flinkUrl =
StringUtils.isEmpty(jobManagerUrl)
? address.concat("/overview")
@@ -198,13 +181,12 @@ public class FlinkClusterWatcher {
HttpClientUtils.httpGetRequest(
flinkUrl,
RequestConfig.custom().setConnectTimeout(5000,
TimeUnit.MILLISECONDS).build());
-
JacksonUtils.read(res, Overview.class);
return ClusterState.RUNNING;
} catch (Exception ignored) {
log.error("cluster id:{} get state from flink api failed",
flinkCluster.getId());
}
- return ClusterState.UNKNOWN;
+ return ClusterState.LOST;
}
/**
@@ -213,14 +195,7 @@ public class FlinkClusterWatcher {
* @param flinkCluster
* @return
*/
- private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) {
- if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
- return ClusterState.LOST;
- }
- String clusterId = flinkCluster.getClusterId();
- if (StringUtils.isEmpty(clusterId)) {
- return ClusterState.STOPPED;
- }
+ private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
try {
String result = YarnUtils.restRequest(yarnUrl);
@@ -243,64 +218,27 @@ public class FlinkClusterWatcher {
}
/**
- * process cluster state
+ * add flinkCluster to watching
*
* @param flinkCluster
- * @param state
*/
- private void handleClusterState(FlinkCluster flinkCluster, ClusterState
state) {
- LambdaUpdateWrapper<FlinkCluster> updateWrapper =
- new LambdaUpdateWrapper<FlinkCluster>()
- .eq(FlinkCluster::getId, flinkCluster.getId())
- .set(FlinkCluster::getClusterState, state.getValue());
- switch (state) {
- case STOPPED:
- {
- updateWrapper
- .set(FlinkCluster::getAddress, null)
- .set(FlinkCluster::getJobManagerUrl, null)
- .set(FlinkCluster::getEndTime, new Date());
- }
- // fall through
- case LOST:
- case UNKNOWN:
- {
- removeFlinkCluster(flinkCluster);
- alert(flinkCluster, state);
- break;
- }
+ public static void addWatching(FlinkCluster flinkCluster) {
+ if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+ log.info("add the cluster with id:{} to watcher cluster cache",
flinkCluster.getId());
+ WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
}
- flinkClusterService.update(updateWrapper);
}
- /**
- * Add a cluster to cache
- *
- * @param flinkCluster
- */
- public static void addFlinkCluster(FlinkCluster flinkCluster) {
+ /** @param flinkCluster */
+ public static void unWatching(FlinkCluster flinkCluster) {
if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
- return;
+ log.info("remove the cluster with id:{} from watcher cluster cache",
flinkCluster.getId());
+ WATCHER_CLUSTERS.remove(flinkCluster.getId());
}
- log.info("add the cluster with id:{} to watcher cluster cache",
flinkCluster.getId());
- WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
}
/**
- * Remove a cluster from cache
- *
- * @param flinkCluster
- */
- public static void removeFlinkCluster(FlinkCluster flinkCluster) {
- if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
- return;
- }
- log.info("remove the cluster with id:{} from watcher cluster cache",
flinkCluster.getId());
- WATCHER_CLUSTERS.remove(flinkCluster.getId());
- }
-
- /**
- * string conver final application status
+ * string converse final application status
*
* @param value
* @return
@@ -321,11 +259,9 @@ public class FlinkClusterWatcher {
* @return
*/
private ClusterState
finalApplicationStatusConvertClusterState(FinalApplicationStatus status) {
- switch (status) {
- case UNDEFINED:
- return ClusterState.RUNNING;
- default:
- return ClusterState.STOPPED;
+ if (status == FinalApplicationStatus.UNDEFINED) {
+ return ClusterState.RUNNING;
}
+ return ClusterState.STOPPED;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
similarity index 88%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index 737cc1bc6..f6ed5c96c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.task;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
@@ -53,6 +54,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collection;
import java.util.Date;
import java.util.List;
@@ -68,7 +70,7 @@ import java.util.stream.Collectors;
/** This implementation is currently used for tracing flink job on
yarn,standalone,remote mode */
@Slf4j
@Component
-public class FlinkRESTAPIWatcher {
+public class FlinkHttpWatcher {
@Autowired private ApplicationService applicationService;
@@ -83,9 +85,10 @@ public class FlinkRESTAPIWatcher {
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
// track interval every 5 seconds
- private static final long WATCHING_INTERVAL = 1000L * 5;
+ private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+
// option interval within 10 seconds
- private static final long OPTION_INTERVAL = 1000L * 10;
+ private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
/**
*
@@ -138,7 +141,7 @@ public class FlinkRESTAPIWatcher {
private static final Map<Long, OptionState> OPTIONING = new
ConcurrentHashMap<>(0);
- private Long lastWatchingTime = 0L;
+ private Long lastWatchTime = 0L;
private Long lastOptionTime = 0L;
@@ -161,13 +164,17 @@ public class FlinkRESTAPIWatcher {
new LambdaQueryWrapper<Application>()
.eq(Application::getTracking, 1)
.notIn(Application::getExecutionMode,
ExecutionMode.getKubernetesMode()));
- applications.forEach((app) -> WATCHING_APPS.put(app.getId(), app));
+ applications.forEach(
+ (app) -> {
+ WATCHING_APPS.put(app.getId(), app);
+ STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
+ });
}
@PreDestroy
public void doStop() {
log.info(
- "FlinkRESTAPIWatcher StreamPark Console will be shutdown,persistent
application to database.");
+ "FlinkHttpWatcher StreamPark Console will be shutdown,persistent
application to database.");
WATCHING_APPS.forEach((k, v) -> applicationService.persistMetrics(v));
}
@@ -181,35 +188,22 @@ public class FlinkRESTAPIWatcher {
*/
@Scheduled(fixedDelay = 1000)
public void start() {
- // The application has been started at the first time, or the front-end is
operating start/stop,
- // need to return status info immediately.
- if (lastWatchingTime == null || !OPTIONING.isEmpty()) {
- doWatch();
- } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL)
{
- // The last operation time is less than option interval.(10 seconds)
- doWatch();
- } else if (System.currentTimeMillis() - lastWatchingTime >=
WATCHING_INTERVAL) {
- // Normal information obtain, check if there is 5 seconds interval
between this time and the
- // last time.(once every 5 seconds)
- doWatch();
+ Long timeMillis = System.currentTimeMillis();
+ if (lastWatchTime == null
+ || !OPTIONING.isEmpty()
+ || timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
+ || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
+ lastWatchTime = timeMillis;
+ WATCHING_APPS.forEach(this::watch);
}
}
- private void doWatch() {
- lastWatchingTime = System.currentTimeMillis();
- for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) {
- watch(entry.getKey(), entry.getValue());
- }
- }
-
- private void watch(Long key, Application application) {
+ private void watch(Long id, Application application) {
EXECUTOR.execute(
() -> {
final StopFrom stopFrom =
- STOP_FROM_MAP.getOrDefault(key, null) == null
- ? StopFrom.NONE
- : STOP_FROM_MAP.get(key);
- final OptionState optionState = OPTIONING.get(key);
+ STOP_FROM_MAP.getOrDefault(id, null) == null ? StopFrom.NONE :
STOP_FROM_MAP.get(id);
+ final OptionState optionState = OPTIONING.get(id);
try {
// query status from flink rest api
getFromFlinkRestApi(application, stopFrom);
@@ -226,11 +220,11 @@ public class FlinkRESTAPIWatcher {
// non-mapping
if (application.getState() !=
FlinkAppState.MAPPING.getValue()) {
log.error(
- "FlinkRESTAPIWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
+ "FlinkHttpWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
if (StopFrom.NONE.equals(stopFrom)) {
savePointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
- alert(application, FlinkAppState.LOST);
+ doAlert(application, FlinkAppState.LOST);
} else {
application.setState(FlinkAppState.CANCELED.getValue());
}
@@ -242,11 +236,11 @@ public class FlinkRESTAPIWatcher {
*/
application.setEndTime(new Date());
cleanSavepoint(application);
- cleanOptioning(optionState, key);
+ cleanOptioning(optionState, id);
doPersistMetrics(application, true);
FlinkAppState appState =
FlinkAppState.of(application.getState());
if (appState.equals(FlinkAppState.FAILED) ||
appState.equals(FlinkAppState.LOST)) {
- alert(application, FlinkAppState.of(application.getState()));
+ doAlert(application,
FlinkAppState.of(application.getState()));
if (appState.equals(FlinkAppState.FAILED)) {
try {
applicationService.start(application, true);
@@ -341,13 +335,13 @@ public class FlinkRESTAPIWatcher {
// get overview info at the first start time
if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
+ STARTING_CACHE.invalidate(application.getId());
Overview override = httpOverview(application);
if (override != null && override.getSlotsTotal() > 0) {
application.setTotalTM(override.getTaskmanagers());
application.setTotalSlot(override.getSlotsTotal());
application.setAvailableSlot(override.getSlotsAvailable());
}
- STARTING_CACHE.invalidate(application.getId());
}
}
@@ -447,18 +441,18 @@ public class FlinkRESTAPIWatcher {
break;
case CANCELED:
log.info(
- "FlinkRESTAPIWatcher getFromFlinkRestApi, job state {}, stop
tracking and delete stopFrom!",
+ "FlinkHttpWatcher getFromFlinkRestApi, job state {}, stop tracking
and delete stopFrom!",
currentState.name());
cleanSavepoint(application);
application.setState(currentState.getValue());
if (StopFrom.NONE.equals(stopFrom) ||
applicationService.checkAlter(application)) {
if (StopFrom.NONE.equals(stopFrom)) {
log.info(
- "FlinkRESTAPIWatcher getFromFlinkRestApi, job cancel is not
form StreamPark,savePoint expired!");
+ "FlinkHttpWatcher getFromFlinkRestApi, job cancel is not form
StreamPark,savePoint expired!");
savePointService.expire(application.getId());
}
stopCanceledJob(application.getId());
- alert(application, FlinkAppState.CANCELED);
+ doAlert(application, FlinkAppState.CANCELED);
}
STOP_FROM_MAP.remove(application.getId());
doPersistMetrics(application, true);
@@ -469,12 +463,12 @@ public class FlinkRESTAPIWatcher {
STOP_FROM_MAP.remove(application.getId());
application.setState(FlinkAppState.FAILED.getValue());
doPersistMetrics(application, true);
- alert(application, FlinkAppState.FAILED);
+ doAlert(application, FlinkAppState.FAILED);
applicationService.start(application, true);
break;
case RESTARTING:
log.info(
- "FlinkRESTAPIWatcher getFromFlinkRestApi, job state {},add to
starting",
+ "FlinkHttpWatcher getFromFlinkRestApi, job state {},add to
starting",
currentState.name());
STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
break;
@@ -492,7 +486,7 @@ public class FlinkRESTAPIWatcher {
* @param stopFrom stopFrom
*/
private void getFromYarnRestApi(Application application, StopFrom stopFrom)
throws Exception {
- log.debug("FlinkRESTAPIWatcher getFromYarnRestApi starting...");
+ log.debug("FlinkHttpWatcher getFromYarnRestApi starting...");
OptionState optionState = OPTIONING.get(application.getId());
/*
@@ -502,10 +496,10 @@ public class FlinkRESTAPIWatcher {
*/
Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
if (flag != null) {
- log.info("FlinkRESTAPIWatcher previous state: canceling.");
+ log.info("FlinkHttpWatcher previous state: canceling.");
if (StopFrom.NONE.equals(stopFrom)) {
log.error(
- "FlinkRESTAPIWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
+ "FlinkHttpWatcher query previous state was canceling and stopFrom
NotFound,savePoint expired!");
savePointService.expire(application.getId());
}
application.setState(FlinkAppState.CANCELED.getValue());
@@ -517,7 +511,7 @@ public class FlinkRESTAPIWatcher {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo == null) {
if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
- throw new RuntimeException("FlinkRESTAPIWatcher getFromYarnRestApi
failed ");
+ throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi
failed ");
}
} else {
try {
@@ -529,7 +523,7 @@ public class FlinkRESTAPIWatcher {
if (FlinkAppState.KILLED.equals(flinkAppState)) {
if (StopFrom.NONE.equals(stopFrom)) {
log.error(
- "FlinkRESTAPIWatcher getFromYarnRestApi,job was killed and
stopFrom NotFound,savePoint expired!");
+ "FlinkHttpWatcher getFromYarnRestApi,job was killed and
stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
}
flinkAppState = FlinkAppState.CANCELED;
@@ -546,7 +540,7 @@ public class FlinkRESTAPIWatcher {
|| flinkAppState.equals(FlinkAppState.LOST)
|| (flinkAppState.equals(FlinkAppState.CANCELED) &&
StopFrom.NONE.equals(stopFrom))
|| applicationService.checkAlter(application)) {
- alert(application, flinkAppState);
+ doAlert(application, flinkAppState);
stopCanceledJob(application.getId());
if (flinkAppState.equals(FlinkAppState.FAILED)) {
applicationService.start(application, true);
@@ -554,7 +548,7 @@ public class FlinkRESTAPIWatcher {
}
} catch (Exception e) {
if
(!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
- throw new RuntimeException("FlinkRESTAPIWatcher getFromYarnRestApi
error,", e);
+ throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi
error,", e);
}
}
}
@@ -578,7 +572,7 @@ public class FlinkRESTAPIWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkRESTAPIWatcher setOptioning");
+ log.info("FlinkHttpWatcher setOptioning");
OPTIONING.put(appId, state);
if (state.equals(OptionState.CANCELLING)) {
STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
@@ -589,7 +583,7 @@ public class FlinkRESTAPIWatcher {
if (isKubernetesApp(application)) {
return;
}
- log.info("FlinkRESTAPIWatcher add app to tracking,appId:{}",
application.getId());
+ log.info("FlinkHttpWatcher add app to tracking,appId:{}",
application.getId());
WATCHING_APPS.put(application.getId(), application);
STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
}
@@ -598,7 +592,7 @@ public class FlinkRESTAPIWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkRESTAPIWatcher add app to savepoint,appId:{}", appId);
+ log.info("FlinkHttpWatcher add app to savepoint,appId:{}", appId);
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
}
@@ -606,7 +600,7 @@ public class FlinkRESTAPIWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkRESTAPIWatcher stop app,appId:{}", appId);
+ log.info("FlinkHttpWatcher stop app,appId:{}", appId);
WATCHING_APPS.remove(appId);
}
@@ -781,19 +775,26 @@ public class FlinkRESTAPIWatcher {
* alarm; If the abnormal behavior of the job is caused by itself and the
flink cluster is running
* normally, the job will an alarm
*/
- private void alert(Application app, FlinkAppState appState) {
- if (ExecutionMode.isYarnPerJobOrAppMode(app.getExecutionModeEnum())
- || !flinkClusterWatcher.checkAlert(app.getFlinkClusterId())) {
- alertService.alert(app, appState);
- return;
- }
- boolean isValid =
flinkClusterWatcher.verifyClusterValidByClusterId(app.getFlinkClusterId());
- if (isValid) {
- log.info(
- "application with id {} is yarn session or remote and flink cluster
with id {} is alive, application send alert",
- app.getId(),
- app.getFlinkClusterId());
- alertService.alert(app, appState);
+ private void doAlert(Application app, FlinkAppState appState) {
+ switch (app.getExecutionModeEnum()) {
+ case YARN_APPLICATION:
+ case YARN_PER_JOB:
+ alertService.alert(app, appState);
+ return;
+ case YARN_SESSION:
+ case REMOTE:
+ FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
+ ClusterState clusterState =
flinkClusterWatcher.getClusterState(flinkCluster);
+ if (ClusterState.isRunning(clusterState)) {
+ log.info(
+ "application with id {} is yarn session or remote and flink
cluster with id {} is alive, application send alert",
+ app.getId(),
+ app.getFlinkClusterId());
+ alertService.alert(app, appState);
+ }
+ break;
+ default:
+ break;
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index cc0173387..04005c724 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -133,7 +133,7 @@
limit 1
</select>
- <select id="getAffectedJobsByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
+ <select id="countJobsByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
select
count(1)
from t_flink_app