This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 0bc49a098 [Improve] update job run state improvement (#3543)
0bc49a098 is described below
commit 0bc49a098c3ba305397082d4a850eb8791a12988
Author: benjobs <[email protected]>
AuthorDate: Sun Feb 11 23:48:42 2024 +0800
[Improve] update job run state improvement (#3543)
* [Improve] update job run state improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../core/controller/ApplicationController.java | 32 -------------
.../console/core/entity/Application.java | 1 +
.../core/service/impl/ApplicationServiceImpl.java | 54 ++++++++++++++++++----
.../console/core/task/FlinkAppHttpWatcher.java | 47 ++++++++++++++++++-
4 files changed, 92 insertions(+), 42 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 721338cbd..1eacf882d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -26,7 +26,6 @@ import
org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.ApiAccess;
import org.apache.streampark.console.core.annotation.AppUpdated;
import org.apache.streampark.console.core.annotation.PermissionAction;
-import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationBackUp;
import org.apache.streampark.console.core.entity.ApplicationLog;
@@ -36,7 +35,6 @@ import
org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
-import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -61,9 +59,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
@Tag(name = "FLINK_APPLICATION_TAG")
@Slf4j
@@ -152,34 +148,6 @@ public class ApplicationController {
@RequiresPermissions("app:view")
public RestResponse list(Application app, RestRequest request) {
IPage<Application> applicationList = applicationService.page(app, request);
- List<Application> appRecords = applicationList.getRecords();
- List<Long> appIds =
appRecords.stream().map(Application::getId).collect(Collectors.toList());
- Map<Long, PipelineStatus> pipeStates =
appBuildPipeService.listPipelineStatus(appIds);
-
- // add building pipeline status info and app control info
- appRecords =
- appRecords.stream()
- .peek(
- e -> {
- if (pipeStates.containsKey(e.getId())) {
- e.setBuildStatus(pipeStates.get(e.getId()).getCode());
- }
- })
- .peek(
- e -> {
- AppControl appControl =
- new AppControl()
- .setAllowBuild(
- e.getBuildStatus() == null
- ||
!PipelineStatus.running.getCode().equals(e.getBuildStatus()))
- .setAllowStart(
- !e.shouldBeTrack()
- &&
PipelineStatus.success.getCode().equals(e.getBuildStatus()))
- .setAllowStop(e.isRunning());
- e.setAppControl(appControl);
- })
- .collect(Collectors.toList());
- applicationList.setRecords(appRecords);
return RestResponse.success(applicationList);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 0808afd57..c6c30fd73 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -161,6 +161,7 @@ public class Application implements Serializable {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Date endTime;
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
private Long duration;
/** checkpoint max failure interval */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index bfc65de78..4339335a1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -41,6 +41,7 @@ import
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.MavenDependency;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.Application;
@@ -91,6 +92,7 @@ import
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
+import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
import org.apache.commons.io.FileUtils;
@@ -517,26 +519,56 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
this.baseMapper.page(page, appParam);
List<Application> records = page.getRecords();
long now = System.currentTimeMillis();
- List<Application> newRecords =
+
+ List<Long> appIds =
records.stream().map(Application::getId).collect(Collectors.toList());
+ Map<Long, PipelineStatus> pipeStates =
appBuildPipeService.listPipelineStatus(appIds);
+
+ // add building pipeline status info and app control info
+ records =
records.stream()
.peek(
record -> {
- // status of flink job on kubernetes mode had been
automatically persisted to db
- // in time.
+ // 1) running Duration
+ if (record.getTracking() == 1
+ && record.getFlinkAppStateEnum() ==
FlinkAppState.RUNNING) {
+ record.setDuration(now - record.getStartTime().getTime());
+ }
+ // 2) k8s restURL
if (record.isKubernetesModeJob()) {
// set duration
String restUrl =
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
record.setFlinkRestUrl(restUrl);
- if (record.getTracking() == 1
- && record.getStartTime() != null
- && record.getStartTime().getTime() > 0) {
- record.setDuration(now -
record.getStartTime().getTime());
- }
}
})
+ .peek(
+ record -> {
+ // 3) buildStatus
+ if (pipeStates.containsKey(record.getId())) {
+
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
+ }
+ })
+ .peek(
+ record -> {
+ // 4) appControl
+ AppControl appControl =
+ new AppControl()
+ .setAllowBuild(
+ record.getBuildStatus() == null
+ || !PipelineStatus.running
+ .getCode()
+ .equals(record.getBuildStatus()))
+ .setAllowStart(
+ !record.shouldBeTrack()
+ && PipelineStatus.success
+ .getCode()
+ .equals(record.getBuildStatus()))
+ .setAllowStop(record.isRunning());
+ record.setAppControl(appControl);
+ })
.collect(Collectors.toList());
- page.setRecords(newRecords);
+
+ page.setRecords(records);
return page;
}
@@ -1417,6 +1449,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Override
public void persistMetrics(Application appParam) {
+ if (appParam.getFlinkAppStateEnum() == FlinkAppState.RUNNING) {
+ appParam.setEndTime(null);
+ appParam.setDuration(null);
+ }
this.baseMapper.persistMetrics(appParam);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 63f5c18c0..83f3befca 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -45,6 +45,8 @@ import
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
@@ -58,6 +60,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -133,6 +136,9 @@ public class FlinkAppHttpWatcher {
private static final Cache<Long, Byte> CANCELING_CACHE =
Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
+ private static final Cache<Long, StateChangeEvent> PREVIOUS_STATUS =
+ Caffeine.newBuilder().expireAfterWrite(24, TimeUnit.HOURS).build();
+
/**
* Task canceled tracking list, record who cancelled the tracking task
Map<applicationId,userId>
*/
@@ -441,7 +447,11 @@ public class FlinkAppHttpWatcher {
} else {
WATCHING_APPS.put(application.getId(), application);
}
- applicationService.persistMetrics(application);
+ StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
+ StateChangeEvent nowEvent = StateChangeEvent.of(application);
+ if (!nowEvent.equals(event)) {
+ applicationService.persistMetrics(application);
+ }
}
/**
@@ -801,4 +811,39 @@ public class FlinkAppHttpWatcher {
interface Callback<T, R> {
R call(T e) throws Exception;
}
+
+ @Getter
+ @Setter
+ static class StateChangeEvent {
+ private Long id;
+ private FlinkAppState appState;
+ private OptionState optionState;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof StateChangeEvent)) {
+ return false;
+ }
+ StateChangeEvent event = (StateChangeEvent) o;
+ return Objects.equals(id, event.id)
+ && appState == event.appState
+ && optionState == event.optionState;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, appState, optionState);
+ }
+
+ public static StateChangeEvent of(Application application) {
+ StateChangeEvent event = new StateChangeEvent();
+ event.setId(application.getId());
+ event.setOptionState(OptionState.of(application.getOptionState()));
+ event.setAppState(application.getFlinkAppStateEnum());
+ return event;
+ }
+ }
}