This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch run_state in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 855f3daf24de531b2c884bcff2918a51d2c1750f Author: benjobs <[email protected]> AuthorDate: Sun Feb 11 21:27:24 2024 +0800 [Improve] update job run state improvement --- .../core/controller/ApplicationController.java | 32 ------------- .../console/core/entity/Application.java | 1 + .../core/service/impl/ApplicationServiceImpl.java | 54 ++++++++++++++++++---- .../console/core/task/FlinkAppHttpWatcher.java | 47 ++++++++++++++++++- .../streampark/console/core/QueueTestCase.java | 2 + 5 files changed, 94 insertions(+), 42 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java index 721338cbd..1eacf882d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java @@ -26,7 +26,6 @@ import org.apache.streampark.console.base.exception.InternalException; import org.apache.streampark.console.core.annotation.ApiAccess; import org.apache.streampark.console.core.annotation.AppUpdated; import org.apache.streampark.console.core.annotation.PermissionAction; -import org.apache.streampark.console.core.bean.AppControl; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ApplicationBackUp; import org.apache.streampark.console.core.entity.ApplicationLog; @@ -36,7 +35,6 @@ import org.apache.streampark.console.core.service.AppBuildPipeService; import org.apache.streampark.console.core.service.ApplicationBackUpService; import org.apache.streampark.console.core.service.ApplicationLogService; import org.apache.streampark.console.core.service.ApplicationService; -import org.apache.streampark.flink.packer.pipeline.PipelineStatus; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -61,9 +59,7 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; @Tag(name = "FLINK_APPLICATION_TAG") @Slf4j @@ -152,34 +148,6 @@ public class ApplicationController { @RequiresPermissions("app:view") public RestResponse list(Application app, RestRequest request) { IPage<Application> applicationList = applicationService.page(app, request); - List<Application> appRecords = applicationList.getRecords(); - List<Long> appIds = appRecords.stream().map(Application::getId).collect(Collectors.toList()); - Map<Long, PipelineStatus> pipeStates = appBuildPipeService.listPipelineStatus(appIds); - - // add building pipeline status info and app control info - appRecords = - appRecords.stream() - .peek( - e -> { - if (pipeStates.containsKey(e.getId())) { - e.setBuildStatus(pipeStates.get(e.getId()).getCode()); - } - }) - .peek( - e -> { - AppControl appControl = - new AppControl() - .setAllowBuild( - e.getBuildStatus() == null - || !PipelineStatus.running.getCode().equals(e.getBuildStatus())) - .setAllowStart( - !e.shouldBeTrack() - && PipelineStatus.success.getCode().equals(e.getBuildStatus())) - .setAllowStop(e.isRunning()); - e.setAppControl(appControl); - }) - .collect(Collectors.toList()); - applicationList.setRecords(appRecords); return RestResponse.success(applicationList); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 0808afd57..c6c30fd73 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -161,6 +161,7 @@ public class Application implements Serializable { @TableField(updateStrategy = FieldStrategy.IGNORED) private Date endTime; + @TableField(updateStrategy = FieldStrategy.IGNORED) private Long duration; /** checkpoint max failure interval */ diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index bfc65de78..4339335a1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -41,6 +41,7 @@ import org.apache.streampark.console.base.mybatis.pager.MybatisPager; import org.apache.streampark.console.base.util.CommonUtils; import org.apache.streampark.console.base.util.ObjectUtils; import org.apache.streampark.console.base.util.WebUtils; +import org.apache.streampark.console.core.bean.AppControl; import org.apache.streampark.console.core.bean.MavenDependency; import org.apache.streampark.console.core.entity.AppBuildPipeline; import org.apache.streampark.console.core.entity.Application; @@ -91,6 +92,7 @@ import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper; import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV; import org.apache.streampark.flink.kubernetes.model.TrackId; import org.apache.streampark.flink.packer.pipeline.BuildResult; +import org.apache.streampark.flink.packer.pipeline.PipelineStatus; import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse; import org.apache.commons.io.FileUtils; @@ -517,26 +519,56 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli this.baseMapper.page(page, appParam); List<Application> records = page.getRecords(); long now = System.currentTimeMillis(); - List<Application> newRecords = + + List<Long> appIds = records.stream().map(Application::getId).collect(Collectors.toList()); + Map<Long, PipelineStatus> pipeStates = appBuildPipeService.listPipelineStatus(appIds); + + // add building pipeline status info and app control info + records = records.stream() .peek( record -> { - // status of flink job on kubernetes mode had been automatically persisted to db - // in time. + // 1) running Duration + if (record.getTracking() == 1 + && record.getFlinkAppStateEnum() == FlinkAppState.RUNNING) { + record.setDuration(now - record.getStartTime().getTime()); + } + // 2) k8s restURL if (record.isKubernetesModeJob()) { // set duration String restUrl = flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record)); record.setFlinkRestUrl(restUrl); - if (record.getTracking() == 1 - && record.getStartTime() != null - && record.getStartTime().getTime() > 0) { - record.setDuration(now - record.getStartTime().getTime()); - } } }) + .peek( + record -> { + // 3) buildStatus + if (pipeStates.containsKey(record.getId())) { + record.setBuildStatus(pipeStates.get(record.getId()).getCode()); + } + }) + .peek( + record -> { + // 4) appControl + AppControl appControl = + new AppControl() + .setAllowBuild( + record.getBuildStatus() == null + || !PipelineStatus.running + .getCode() + .equals(record.getBuildStatus())) + .setAllowStart( + !record.shouldBeTrack() + && PipelineStatus.success + .getCode() + .equals(record.getBuildStatus())) + .setAllowStop(record.isRunning()); + record.setAppControl(appControl); + }) .collect(Collectors.toList()); - page.setRecords(newRecords); + + page.setRecords(records); return page; } @@ -1417,6 +1449,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli @Override public void persistMetrics(Application appParam) { + if (appParam.getFlinkAppStateEnum() == FlinkAppState.RUNNING) { + appParam.setEndTime(null); + appParam.setDuration(null); + } this.baseMapper.persistMetrics(appParam); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java index 63f5c18c0..83f3befca 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java @@ -45,6 +45,8 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -58,6 +60,7 @@ import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -133,6 +136,9 @@ public class FlinkAppHttpWatcher { private static final Cache<Long, Byte> CANCELING_CACHE = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build(); + private static final Cache<Long, StateChangeEvent> PREVIOUS_STATUS = + Caffeine.newBuilder().expireAfterWrite(24, TimeUnit.HOURS).build(); + /** * Task canceled tracking list, record who cancelled the tracking task Map<applicationId,userId> */ @@ -441,7 +447,11 @@ public class FlinkAppHttpWatcher { } else { WATCHING_APPS.put(application.getId(), application); } - applicationService.persistMetrics(application); + StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId()); + StateChangeEvent nowEvent = StateChangeEvent.of(application); + if (!nowEvent.equals(event)) { + applicationService.persistMetrics(application); + } } /** @@ -801,4 +811,39 @@ public class FlinkAppHttpWatcher { interface Callback<T, R> { R call(T e) throws Exception; } + + @Getter + @Setter + static class StateChangeEvent { + private Long id; + private FlinkAppState appState; + private OptionState optionState; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StateChangeEvent)) { + return false; + } + StateChangeEvent event = (StateChangeEvent) o; + return Objects.equals(id, event.id) + && appState == event.appState + && optionState == event.optionState; + } + + @Override + public int hashCode() { + return Objects.hash(id, appState, optionState); + } + + public static StateChangeEvent of(Application application) { + StateChangeEvent event = new StateChangeEvent(); + event.setId(application.getId()); + event.setOptionState(OptionState.of(application.getOptionState())); + event.setAppState(application.getFlinkAppStateEnum()); + return event; + } + } } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/QueueTestCase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/QueueTestCase.java new file mode 100644 index 000000000..2a0f27f5a --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/QueueTestCase.java @@ -0,0 +1,2 @@ +package org.apache.streampark.console.core;public class QueueTestCase { +}
