This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch improve in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit a53e4867347df6224993c7feb08078ebc9c851f5 Author: benjobs <[email protected]> AuthorDate: Sun Sep 17 09:51:02 2023 +0800 [Improve] application state improvement --- .../console/core/annotation/AppUpdated.java | 5 +-- .../console/core/aspect/StreamParkAspect.java | 6 ++-- .../console/core/bean/AlertProbeMsg.java | 29 ++++++++------- .../console/core/entity/Application.java | 24 ++++--------- .../impl/ApplicationActionServiceImpl.java | 18 +++++----- .../impl/ApplicationInfoServiceImpl.java | 15 ++++---- .../impl/ApplicationManageServiceImpl.java | 6 ++-- .../core/service/impl/AppBuildPipeServiceImpl.java | 12 +++---- .../core/service/impl/ProjectServiceImpl.java | 8 ++--- .../core/service/impl/SavePointServiceImpl.java | 10 +++--- ...nkHttpWatcher.java => FlinkAppHttpWatcher.java} | 42 +++++++++++----------- ...thProbingTask.java => FlinkAppLostWatcher.java} | 40 +++++++++------------ ...rocessor.java => FlinkCheckpointProcessor.java} | 6 ++-- .../core/task/FlinkK8sChangeEventListener.java | 6 ++-- .../console/core/task/FlinkK8sWatcherWrapper.java | 2 +- .../resources/mapper/core/ApplicationMapper.xml | 6 ++-- .../service/ApplicationManageServiceITest.java | 8 ++--- 17 files changed, 114 insertions(+), 129 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java index 97c77a79d..cfb2a5563 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java @@ -17,6 +17,8 @@ package org.apache.streampark.console.core.annotation; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; + import org.aspectj.lang.ProceedingJoinPoint; import java.lang.annotation.ElementType; @@ -29,8 +31,7 @@ import java.lang.annotation.Target; * application state update, need to add this annotation, This annotation marks which methods will * cause the application to be updated, Will work together with {@link * org.apache.streampark.console.core.aspect.StreamParkAspect#appUpdated(ProceedingJoinPoint)}, The - * final purpose will be refresh {@link - * org.apache.streampark.console.core.task.FlinkHttpWatcher#WATCHING_APPS}, Make the state of the + * final purpose will be refresh {@link FlinkAppHttpWatcher#WATCHING_APPS}, Make the state of the * job consistent with the database */ @Target(ElementType.METHOD) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java index 6d14119d5..a15583333 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java @@ -28,7 +28,7 @@ import org.apache.streampark.console.core.enums.PermissionType; import org.apache.streampark.console.core.enums.UserType; import org.apache.streampark.console.core.service.CommonService; import org.apache.streampark.console.core.service.application.ApplicationManageService; -import org.apache.streampark.console.core.task.FlinkHttpWatcher; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.console.system.entity.AccessToken; import org.apache.streampark.console.system.entity.Member; import org.apache.streampark.console.system.entity.User; @@ -58,7 +58,7 @@ import java.util.Objects; @Aspect public class StreamParkAspect { - @Autowired private FlinkHttpWatcher flinkHttpWatcher; + @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher; @Autowired private CommonService commonService; @Autowired private MemberService memberService; @Autowired private ApplicationManageService applicationManageService; @@ -93,7 +93,7 @@ public class StreamParkAspect { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); log.debug("appUpdated aspect, method:{}", methodSignature.getName()); Object target = joinPoint.proceed(); - flinkHttpWatcher.init(); + flinkAppHttpWatcher.init(); return target; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java index d42537746..f9bb31604 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java @@ -17,6 +17,8 @@ package org.apache.streampark.console.core.bean; +import org.apache.streampark.console.core.enums.FlinkAppState; + import lombok.Data; import lombok.NoArgsConstructor; @@ -38,19 +40,20 @@ public class AlertProbeMsg { private Integer cancelledJobs = 0; - public void incrementProbeJobs() { + public void compute(FlinkAppState state) { this.probeJobs++; - } - - public void incrementFailedJobs() { - this.failedJobs++; - } - - public void incrementLostJobs() { - this.lostJobs++; - } - - public void incrementCancelledJobs() { - this.cancelledJobs++; + switch (state) { + case LOST: + this.lostJobs++; + break; + case FAILED: + failedJobs++; + break; + case CANCELED: + cancelledJobs++; + break; + default: + break; + } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index f0b283be1..98fd8dd53 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -49,8 +49,6 @@ import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import javax.annotation.Nonnull; - import java.io.Serializable; import java.util.Collections; import java.util.Date; @@ -59,8 +57,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import static org.apache.streampark.console.core.enums.FlinkAppState.of; - @Data @TableName("t_flink_app") @Slf4j @@ -275,8 +271,7 @@ public class Application implements Serializable { public void setState(Integer state) { this.state = state; - FlinkAppState appState = of(this.state); - this.tracking = shouldTracking(appState); + this.tracking = shouldTracking() ? 1 : 0; } public void setYarnQueueByHotParams() { @@ -301,8 +296,8 @@ public class Application implements Serializable { * * @return 1: need to be tracked | 0: no need to be tracked. */ - public static Integer shouldTracking(@Nonnull FlinkAppState state) { - switch (state) { + public Boolean shouldTracking() { + switch (getStateEnum()) { case ADDED: case CREATED: case FINISHED: @@ -310,9 +305,9 @@ public class Application implements Serializable { case CANCELED: case TERMINATED: case POS_TERMINATED: - return 0; + return false; default: - return 1; + return true; } } @@ -322,8 +317,7 @@ public class Application implements Serializable { * @return true: can start | false: can not start. */ public boolean isCanBeStart() { - FlinkAppState state = FlinkAppState.of(getState()); - switch (state) { + switch (getStateEnum()) { case ADDED: case CREATED: case FAILED: @@ -340,10 +334,6 @@ public class Application implements Serializable { } } - public boolean shouldBeTrack() { - return shouldTracking(FlinkAppState.of(getState())) == 1; - } - @JsonIgnore public ReleaseState getReleaseState() { return ReleaseState.of(release); @@ -355,7 +345,7 @@ public class Application implements Serializable { } @JsonIgnore - public FlinkAppState getFlinkAppStateEnum() { + public FlinkAppState getStateEnum() { return FlinkAppState.of(state); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index 451ffc453..728f3b7f4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -65,7 +65,7 @@ import org.apache.streampark.console.core.service.VariableService; import org.apache.streampark.console.core.service.application.ApplicationActionService; import org.apache.streampark.console.core.service.application.ApplicationInfoService; import org.apache.streampark.console.core.service.application.ApplicationManageService; -import org.apache.streampark.console.core.task.FlinkHttpWatcher; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub; import org.apache.streampark.flink.client.FlinkClient; import org.apache.streampark.flink.client.bean.CancelRequest; @@ -227,7 +227,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, @Override public void cancel(Application appParam) throws Exception { - FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING); + FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING); Application application = getById(appParam.getId()); application.setState(FlinkAppState.CANCELLING.getValue()); @@ -239,7 +239,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, applicationLog.setYarnAppId(application.getClusterId()); if (appParam.getSavePointed()) { - FlinkHttpWatcher.addSavepoint(application.getId()); + FlinkAppHttpWatcher.addSavepoint(application.getId()); application.setOptionState(OptionState.SAVEPOINTING.getValue()); } else { application.setOptionState(OptionState.CANCELLING.getValue()); @@ -250,7 +250,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, Long userId = commonService.getUserId(); if (!application.getUserId().equals(userId)) { - FlinkHttpWatcher.addCanceledApp(application.getId(), userId); + FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId); } FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId()); @@ -357,7 +357,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, k8SFlinkTrackMonitor.unWatching(id); k8SFlinkTrackMonitor.doWatching(id); } else { - FlinkHttpWatcher.unWatching(application.getId()); + FlinkAppHttpWatcher.unWatching(application.getId()); } String exception = Utils.stringifyException(e); @@ -508,8 +508,8 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, if (isKubernetesApp(application)) { k8SFlinkTrackMonitor.doWatching(toTrackId(application)); } else { - FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.STARTING); - FlinkHttpWatcher.doWatching(application); + FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionState.STARTING); + FlinkAppHttpWatcher.doWatching(application); } applicationLog.setSuccess(true); @@ -530,7 +530,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, if (isKubernetesApp(app)) { k8SFlinkTrackMonitor.unWatching(toTrackId(app)); } else { - FlinkHttpWatcher.unWatching(appParam.getId()); + FlinkAppHttpWatcher.unWatching(appParam.getId()); } } }) @@ -750,7 +750,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, k8SFlinkTrackMonitor.unWatching(id); k8SFlinkTrackMonitor.doWatching(id); } else { - FlinkHttpWatcher.unWatching(application.getId()); + FlinkAppHttpWatcher.unWatching(application.getId()); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java index a292cde75..ca7ad1e04 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java @@ -40,8 +40,8 @@ import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.SavePointService; import org.apache.streampark.console.core.service.application.ApplicationInfoService; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.console.core.task.FlinkClusterWatcher; -import org.apache.streampark.console.core.task.FlinkHttpWatcher; import org.apache.streampark.console.core.task.FlinkK8sObserverStub; import org.apache.streampark.flink.core.conf.ParameterCli; import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; @@ -115,7 +115,7 @@ public class ApplicationInfoServiceImpl extends ServiceImpl<ApplicationMapper, A Integer runningJob = 0; // stat metrics from other than kubernetes mode - for (Application app : FlinkHttpWatcher.getWatchingApps()) { + for (Application app : FlinkAppHttpWatcher.getWatchingApps()) { if (!teamId.equals(app.getTeamId())) { continue; } @@ -220,11 +220,10 @@ public class ApplicationInfoServiceImpl extends ServiceImpl<ApplicationMapper, A @Override public boolean checkAlter(Application appParam) { Long appId = appParam.getId(); - FlinkAppState state = FlinkAppState.of(appParam.getState()); - if (!FlinkAppState.CANCELED.equals(state)) { + if (FlinkAppState.CANCELED != appParam.getStateEnum()) { return false; } - long cancelUserId = FlinkHttpWatcher.getCanceledJobUserId(appId); + long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId); long appUserId = appParam.getUserId(); return cancelUserId != -1 && cancelUserId != appUserId; } @@ -244,11 +243,11 @@ public class ApplicationInfoServiceImpl extends ServiceImpl<ApplicationMapper, A @Override public boolean existsRunningByClusterId(Long clusterId) { return baseMapper.existsRunningJobByClusterId(clusterId) - || FlinkHttpWatcher.getWatchingApps().stream() + || FlinkAppHttpWatcher.getWatchingApps().stream() .anyMatch( application -> clusterId.equals(application.getFlinkClusterId()) - && FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())); + && FlinkAppState.RUNNING.equals(application.getStateEnum())); } @Override @@ -467,7 +466,7 @@ public class ApplicationInfoServiceImpl extends ServiceImpl<ApplicationMapper, A flinkK8sObserver.watchApplication(application); } } else { - FlinkHttpWatcher.doWatching(application); + FlinkAppHttpWatcher.doWatching(application); } return mapping; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java index e89b16117..2dc38afb2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java @@ -53,7 +53,7 @@ import org.apache.streampark.console.core.service.SavePointService; import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.YarnQueueService; import org.apache.streampark.console.core.service.application.ApplicationManageService; -import org.apache.streampark.console.core.task.FlinkHttpWatcher; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.console.core.task.FlinkK8sObserverStub; import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub; import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; @@ -189,7 +189,7 @@ public class ApplicationManageServiceImpl extends ServiceImpl<ApplicationMapper, flinkK8sObserver.unWatchById(application.getId()); } } else { - FlinkHttpWatcher.unWatching(appParam.getId()); + FlinkAppHttpWatcher.unWatching(appParam.getId()); } return true; } @@ -264,7 +264,7 @@ public class ApplicationManageServiceImpl extends ServiceImpl<ApplicationMapper, .getCode() .equals(record.getBuildStatus())) .setAllowStart( - !record.shouldBeTrack() + !record.shouldTracking() && PipelineStatus.success .getCode() .equals(record.getBuildStatus())) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 147137800..a1fb0b4e8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -59,7 +59,7 @@ import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.application.ApplicationActionService; import org.apache.streampark.console.core.service.application.ApplicationInfoService; import org.apache.streampark.console.core.service.application.ApplicationManageService; -import org.apache.streampark.console.core.task.FlinkHttpWatcher; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.flink.packer.docker.DockerConf; import org.apache.streampark.flink.packer.maven.Artifact; import org.apache.streampark.flink.packer.maven.DependencyInfo; @@ -141,7 +141,7 @@ public class AppBuildPipeServiceImpl @Autowired private ApplicationLogService applicationLogService; - @Autowired private FlinkHttpWatcher flinkHttpWatcher; + @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher; @Autowired private ApplicationConfigService applicationConfigService; @@ -226,8 +226,8 @@ public class AppBuildPipeServiceImpl app.setRelease(ReleaseState.RELEASING.get()); applicationManageService.updateRelease(app); - if (flinkHttpWatcher.isWatchingApp(app.getId())) { - flinkHttpWatcher.init(); + if (flinkAppHttpWatcher.isWatchingApp(app.getId())) { + flinkAppHttpWatcher.init(); } // 1) checkEnv @@ -358,8 +358,8 @@ public class AppBuildPipeServiceImpl } applicationManageService.updateRelease(app); applicationLogService.save(applicationLog); - if (flinkHttpWatcher.isWatchingApp(app.getId())) { - flinkHttpWatcher.init(); + if (flinkAppHttpWatcher.isWatchingApp(app.getId())) { + flinkAppHttpWatcher.init(); } } }); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index c14b66b5e..254818045 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -38,7 +38,7 @@ import org.apache.streampark.console.core.enums.ReleaseState; import org.apache.streampark.console.core.mapper.ProjectMapper; import org.apache.streampark.console.core.service.ProjectService; import org.apache.streampark.console.core.service.application.ApplicationManageService; -import org.apache.streampark.console.core.task.FlinkHttpWatcher; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.console.core.task.ProjectBuildTask; import org.apache.flink.configuration.MemorySize; @@ -82,7 +82,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> @Autowired private ApplicationManageService applicationManageService; - @Autowired private FlinkHttpWatcher flinkHttpWatcher; + @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher; private final ExecutorService executorService = new ThreadPoolExecutor( @@ -207,7 +207,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> if (buildState == BuildState.SUCCESSFUL) { baseMapper.updateBuildTime(id); } - flinkHttpWatcher.init(); + flinkAppHttpWatcher.init(); }, fileLogger -> { List<Application> applications = @@ -222,7 +222,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> app.setBuild(true); this.applicationManageService.updateRelease(app); }); - flinkHttpWatcher.init(); + flinkAppHttpWatcher.init(); }); CompletableFuture<Void> buildTask = CompletableFuture.runAsync(projectBuildTask, executorService); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index 766d5a74f..16d09a246 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -43,7 +43,7 @@ import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.SavePointService; import org.apache.streampark.console.core.service.application.ApplicationManageService; -import org.apache.streampark.console.core.task.FlinkHttpWatcher; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.flink.client.FlinkClient; import org.apache.streampark.flink.client.bean.SavepointResponse; import org.apache.streampark.flink.client.bean.TriggerSavepointRequest; @@ -101,7 +101,7 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint @Autowired private ApplicationLogService applicationLogService; - @Autowired private FlinkHttpWatcher flinkHttpWatcher; + @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher; private final ExecutorService executorService = new ThreadPoolExecutor( @@ -176,12 +176,12 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint applicationLog.setOptionTime(new Date()); applicationLog.setYarnAppId(application.getClusterId()); - FlinkHttpWatcher.addSavepoint(application.getId()); + FlinkAppHttpWatcher.addSavepoint(application.getId()); application.setOptionState(OptionState.SAVEPOINTING.getValue()); application.setOptionTime(new Date()); this.applicationManageService.updateById(application); - flinkHttpWatcher.init(); + flinkAppHttpWatcher.init(); FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId()); @@ -266,7 +266,7 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint application.setOptionState(OptionState.NONE.getValue()); application.setOptionTime(new Date()); applicationManageService.update(application); - flinkHttpWatcher.init(); + flinkAppHttpWatcher.init(); }); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java similarity index 94% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java index d61018564..ce60a1919 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java @@ -75,7 +75,7 @@ import java.util.stream.Collectors; /** This implementation is currently used for tracing flink job on yarn,standalone,remote mode */ @Slf4j @Component -public class FlinkHttpWatcher { +public class FlinkAppHttpWatcher { @Autowired private ApplicationManageService applicationManageService; @Autowired private ApplicationActionService applicationActionService; @@ -83,7 +83,7 @@ public class FlinkHttpWatcher { @Autowired private AlertService alertService; - @Autowired private CheckpointProcessor checkpointProcessor; + @Autowired private FlinkCheckpointProcessor checkpointProcessor; @Autowired private FlinkClusterService flinkClusterService; @@ -182,7 +182,7 @@ public class FlinkHttpWatcher { @PreDestroy public void doStop() { log.info( - "FlinkHttpWatcher StreamPark Console will be shutdown,persistent application to database."); + "FlinkAppHttpWatcher StreamPark Console will be shutdown,persistent application to database."); WATCHING_APPS.forEach((k, v) -> applicationInfoService.persistMetrics(v)); } @@ -209,7 +209,7 @@ public class FlinkHttpWatcher { @VisibleForTesting public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) { Application app = WATCHING_APPS.get(appId); - return (app == null || app.getState() == null) ? null : FlinkAppState.of(app.getState()); + return (app == null || app.getState() == null) ? null : app.getStateEnum(); } private void watch(Long id, Application application) { @@ -234,7 +234,7 @@ public class FlinkHttpWatcher { // non-mapping if (application.getState() != FlinkAppState.MAPPING.getValue()) { log.error( - "FlinkHttpWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); + "FlinkAppHttpWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); if (StopFrom.NONE.equals(stopFrom)) { savePointService.expire(application.getId()); application.setState(FlinkAppState.LOST.getValue()); @@ -252,9 +252,9 @@ public class FlinkHttpWatcher { cleanSavepoint(application); cleanOptioning(optionState, id); doPersistMetrics(application, true); - FlinkAppState appState = FlinkAppState.of(application.getState()); - if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { - doAlert(application, FlinkAppState.of(application.getState())); + FlinkAppState appState = application.getStateEnum(); + if (appState == FlinkAppState.FAILED || appState == FlinkAppState.LOST) { + doAlert(application, application.getStateEnum()); if (appState.equals(FlinkAppState.FAILED)) { try { applicationActionService.start(application, true); @@ -456,14 +456,14 @@ public class FlinkHttpWatcher { break; case CANCELED: log.info( - "FlinkHttpWatcher getFromFlinkRestApi, job state {}, stop tracking and delete stopFrom!", + "FlinkAppHttpWatcher getFromFlinkRestApi, job state {}, stop tracking and delete stopFrom!", currentState.name()); cleanSavepoint(application); application.setState(currentState.getValue()); if (StopFrom.NONE.equals(stopFrom) || applicationInfoService.checkAlter(application)) { if (StopFrom.NONE.equals(stopFrom)) { log.info( - "FlinkHttpWatcher getFromFlinkRestApi, job cancel is not form StreamPark,savePoint expired!"); + "FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not form StreamPark,savePoint expired!"); savePointService.expire(application.getId()); } stopCanceledJob(application.getId()); @@ -483,7 +483,7 @@ public class FlinkHttpWatcher { break; case RESTARTING: log.info( - "FlinkHttpWatcher getFromFlinkRestApi, job state {},add to starting", + "FlinkAppHttpWatcher getFromFlinkRestApi, job state {},add to starting", currentState.name()); STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE); break; @@ -501,7 +501,7 @@ public class FlinkHttpWatcher { * @param stopFrom stopFrom */ private void getFromYarnRestApi(Application application, StopFrom stopFrom) throws Exception { - log.debug("FlinkHttpWatcher getFromYarnRestApi starting..."); + log.debug("FlinkAppHttpWatcher getFromYarnRestApi starting..."); OptionState optionState = OPTIONING.get(application.getId()); /* @@ -511,10 +511,10 @@ public class FlinkHttpWatcher { */ Byte flag = CANCELING_CACHE.getIfPresent(application.getId()); if (flag != null) { - log.info("FlinkHttpWatcher previous state: canceling."); + log.info("FlinkAppHttpWatcher previous state: canceling."); if (StopFrom.NONE.equals(stopFrom)) { log.error( - "FlinkHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!"); + "FlinkAppHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!"); savePointService.expire(application.getId()); } application.setState(FlinkAppState.CANCELED.getValue()); @@ -526,7 +526,7 @@ public class FlinkHttpWatcher { YarnAppInfo yarnAppInfo = httpYarnAppInfo(application); if (yarnAppInfo == null) { if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) { - throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi failed "); + throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi failed "); } } else { try { @@ -538,7 +538,7 @@ public class FlinkHttpWatcher { if (FlinkAppState.KILLED.equals(flinkAppState)) { if (StopFrom.NONE.equals(stopFrom)) { log.error( - "FlinkHttpWatcher getFromYarnRestApi,job was killed and stopFrom NotFound,savePoint expired!"); + "FlinkAppHttpWatcher getFromYarnRestApi,job was killed and stopFrom NotFound,savePoint expired!"); savePointService.expire(application.getId()); } flinkAppState = FlinkAppState.CANCELED; @@ -563,7 +563,7 @@ public class FlinkHttpWatcher { } } catch (Exception e) { if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) { - throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi error,", e); + throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi error,", e); } } } @@ -587,7 +587,7 @@ public class FlinkHttpWatcher { if (isKubernetesApp(appId)) { return; } - log.info("FlinkHttpWatcher setOptioning"); + log.info("FlinkAppHttpWatcher setOptioning"); OPTIONING.put(appId, state); if (state.equals(OptionState.CANCELLING)) { STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK); @@ -598,7 +598,7 @@ public class FlinkHttpWatcher { if (isKubernetesApp(application)) { return; } - log.info("FlinkHttpWatcher add app to tracking,appId:{}", application.getId()); + log.info("FlinkAppHttpWatcher add app to tracking,appId:{}", application.getId()); WATCHING_APPS.put(application.getId(), application); STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE); } @@ -607,7 +607,7 @@ public class FlinkHttpWatcher { if (isKubernetesApp(appId)) { return; } - log.info("FlinkHttpWatcher add app to savepoint,appId:{}", appId); + log.info("FlinkAppHttpWatcher add app to savepoint,appId:{}", appId); SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE); } @@ -615,7 +615,7 @@ public class FlinkHttpWatcher { if (isKubernetesApp(appId)) { return; } - log.info("FlinkHttpWatcher stop app,appId:{}", appId); + log.info("FlinkAppHttpWatcher stop app,appId:{}", appId); WATCHING_APPS.remove(appId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java similarity index 83% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java index d19865f34..d9f7381fd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java @@ -36,15 +36,17 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.streampark.console.core.enums.FlinkAppState.LOST; import static org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId; import static org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp; /** This implementation is currently used for probe on yarn,remote,K8s mode */ @Slf4j @Component -public class AutoHealthProbingTask { +public class FlinkAppLostWatcher { @Autowired private ApplicationManageService applicationManageService; @@ -63,14 +65,14 @@ public class AutoHealthProbingTask { private long lastWatchTime = 0L; - private Boolean isProbing = false; + private AtomicBoolean isProbing = new AtomicBoolean(false); private Short retryAttempts = PROBE_RETRY_COUNT; @Scheduled(fixedDelay = 1000) - private void schedule() { + private void start() { long timeMillis = System.currentTimeMillis(); - if (isProbing) { + if (isProbing.get()) { if (timeMillis - lastWatchTime >= PROBE_WAIT_INTERVAL.toMillis()) { handleProbeResults(); lastWatchTime = timeMillis; @@ -78,28 +80,28 @@ public class AutoHealthProbingTask { } else { if (timeMillis - lastWatchTime >= PROBE_INTERVAL.toMillis()) { lastWatchTime = timeMillis; - probe(Collections.emptyList()); + watch(Collections.emptyList()); } } } - public void probe(List<Application> applications) { + public void watch(List<Application> applications) { List<Application> probeApplication = applications.isEmpty() ? applicationManageService.getProbeApps() : applications; if (probeApplication.isEmpty()) { log.info("there is no application that needs to be probe"); return; } - isProbing = true; + isProbing.set(true); probeApplication = probeApplication.stream() .filter(application -> FlinkAppState.isLost(application.getState())) .collect(Collectors.toList()); - updateProbingState(probeApplication); + updateState(probeApplication); probeApplication.stream().forEach(this::monitorApplication); } - private void updateProbingState(List<Application> applications) { + private void updateState(List<Application> applications) { applications.stream() .filter(application -> FlinkAppState.isLost(application.getState())) .forEach( @@ -113,7 +115,7 @@ public class AutoHealthProbingTask { private void handleProbeResults() { List<Application> probeApps = applicationManageService.getProbeApps(); if (shouldRetry(probeApps)) { - probe(probeApps); + watch(probeApps); } else { List<AlertProbeMsg> alertProbeMsgs = generateProbeResults(probeApps); alertProbeMsgs.stream().forEach(this::alert); @@ -129,7 +131,7 @@ public class AutoHealthProbingTask { }); applicationManageService.updateBatchById(applications); retryAttempts = PROBE_RETRY_COUNT; - isProbing = false; + isProbing.set(false); } private void alert(AlertProbeMsg alertProbeMsg) { @@ -155,17 +157,9 @@ public class AutoHealthProbingTask { apps.forEach( app -> { alertProbeMsg.setUser(app.getUserName()); - alertProbeMsg.incrementProbeJobs(); - if (app.getState() == FlinkAppState.LOST.getValue()) { - alertProbeMsg.incrementLostJobs(); - } else if (app.getState() == FlinkAppState.FAILED.getValue()) { - alertProbeMsg.incrementFailedJobs(); - } else if (app.getState() == FlinkAppState.CANCELED.getValue()) { - alertProbeMsg.incrementCancelledJobs(); - } + alertProbeMsg.compute(app.getStateEnum()); alertIds.add(app.getAlertId()); }); - alertProbeMsg.setAlertId(alertIds); return alertProbeMsg; }))) @@ -176,14 +170,12 @@ public class AutoHealthProbingTask { if (isKubernetesApp(application)) { k8SFlinkTrackMonitor.doWatching(toTrackId(application)); } else { - FlinkHttpWatcher.doWatching(application); + FlinkAppHttpWatcher.doWatching(application); } } private Boolean shouldRetry(List<Application> applications) { - return applications.stream() - .anyMatch( - application -> FlinkAppState.LOST.getValue() == application.getState().intValue()) + return applications.stream().anyMatch(application -> application.getStateEnum() == LOST) && (retryAttempts-- > 0); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java index 02d4f8ca1..5e30557c9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java @@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Component -public class CheckpointProcessor { +public class FlinkCheckpointProcessor { private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0"); private static final Integer SAVEPOINT_CACHE_HOUR = 1; @@ -70,7 +70,7 @@ public class CheckpointProcessor { @Autowired private SavePointService savePointService; - @Autowired private FlinkHttpWatcher flinkHttpWatcher; + @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher; public void process(Application application, @Nonnull CheckPoints checkPoints) { checkPoints.getLatestCheckpoint().forEach(checkPoint -> process(application, checkPoint)); @@ -86,7 +86,7 @@ public class CheckpointProcessor { if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) { savepointedCache.put(checkPointKey.getSavePointId(), DEFAULT_FLAG_BYTE); saveSavepoint(checkPoint, application.getId()); - flinkHttpWatcher.cleanSavepoint(application); + flinkAppHttpWatcher.cleanSavepoint(application); return; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java index 3006a1b6b..45560c35f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java @@ -72,7 +72,7 @@ public class FlinkK8sChangeEventListener { @Lazy @Autowired private AlertService alertService; - @Lazy @Autowired private CheckpointProcessor checkpointProcessor; + @Lazy @Autowired private FlinkCheckpointProcessor checkpointProcessor; private final ExecutorService executor = new ThreadPoolExecutor( @@ -104,7 +104,7 @@ public class FlinkK8sChangeEventListener { applicationInfoService.persistMetrics(app); // email alerts when necessary - FlinkAppState state = FlinkAppState.of(app.getState()); + FlinkAppState state = app.getStateEnum(); if (FlinkAppState.FAILED.equals(state) || FlinkAppState.LOST.equals(state) || FlinkAppState.RESTARTING.equals(state) @@ -175,7 +175,7 @@ public class FlinkK8sChangeEventListener { // infer the final flink job state Enumeration.Value state = FlinkJobStatusWatcher.inferFlinkJobStateFromPersist( - jobStatus.jobState(), toK8sFlinkJobState(FlinkAppState.of(app.getState()))); + jobStatus.jobState(), toK8sFlinkJobState(app.getStateEnum())); // corrective start-time / end-time / duration long preStartTime = app.getStartTime() != null ? app.getStartTime().getTime() : 0; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java index 7840d06e6..2d5a1a339 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java @@ -116,7 +116,7 @@ public class FlinkK8sWatcherWrapper { } // filter out the application that should be tracking return k8sApplication.stream() - .filter(app -> !FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()))) + .filter(app -> !FlinkJobState.isEndState(toK8sFlinkJobState(app.getStateEnum()))) .map(Bridge::toTrackId) .collect(Collectors.toList()); } diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml index c25a504a4..f57b8efe7 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml @@ -265,9 +265,9 @@ else u.nick_name end as nick_name from t_flink_app t - inner join t_user u - on t.user_id = u.user_id - where (t.tracking = 1 and t.state = 13) or t.probing = 1 + inner join t_user u + on t.user_id = u.user_id + where t.probing = 1 or (t.tracking = 1 and t.state = 13) </select> <update id="mapping" parameterType="org.apache.streampark.console.core.entity.Application"> diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java index 58e36b501..79ee5ca43 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java @@ -29,7 +29,7 @@ import org.apache.streampark.console.core.enums.ReleaseState; import org.apache.streampark.console.core.service.application.ApplicationActionService; import org.apache.streampark.console.core.service.application.ApplicationManageService; import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl; -import org.apache.streampark.console.core.task.FlinkHttpWatcher; +import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.testcontainer.flink.FlinkStandaloneSessionCluster; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -45,7 +45,7 @@ import java.util.Base64; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import static org.apache.streampark.console.core.task.FlinkHttpWatcher.WATCHING_INTERVAL; +import static org.apache.streampark.console.core.task.FlinkAppHttpWatcher.WATCHING_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; /** @@ -70,7 +70,7 @@ class ApplicationManageServiceITest extends SpringIntegrationTestBase { @Autowired private FlinkSqlService sqlService; - @Autowired private FlinkHttpWatcher flinkHttpWatcher; + @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher; @BeforeAll static void setup() { @@ -145,7 +145,7 @@ class ApplicationManageServiceITest extends SpringIntegrationTestBase { CompletableFuture.supplyAsync( () -> { while (true) { - if (flinkHttpWatcher.tryQueryFlinkAppState(application.getId()) + if (flinkAppHttpWatcher.tryQueryFlinkAppState(application.getId()) == FlinkAppState.RUNNING) { break; }
