This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch run_state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 855f3daf24de531b2c884bcff2918a51d2c1750f
Author: benjobs <[email protected]>
AuthorDate: Sun Feb 11 21:27:24 2024 +0800

    [Improve] update job run state improvement
---
 .../core/controller/ApplicationController.java     | 32 -------------
 .../console/core/entity/Application.java           |  1 +
 .../core/service/impl/ApplicationServiceImpl.java  | 54 ++++++++++++++++++----
 .../console/core/task/FlinkAppHttpWatcher.java     | 47 ++++++++++++++++++-
 .../streampark/console/core/QueueTestCase.java     |  2 +
 5 files changed, 94 insertions(+), 42 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 721338cbd..1eacf882d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -26,7 +26,6 @@ import 
org.apache.streampark.console.base.exception.InternalException;
 import org.apache.streampark.console.core.annotation.ApiAccess;
 import org.apache.streampark.console.core.annotation.AppUpdated;
 import org.apache.streampark.console.core.annotation.PermissionAction;
-import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationBackUp;
 import org.apache.streampark.console.core.entity.ApplicationLog;
@@ -36,7 +35,6 @@ import 
org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ApplicationBackUpService;
 import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.ApplicationService;
-import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 
@@ -61,9 +59,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 @Tag(name = "FLINK_APPLICATION_TAG")
 @Slf4j
@@ -152,34 +148,6 @@ public class ApplicationController {
   @RequiresPermissions("app:view")
   public RestResponse list(Application app, RestRequest request) {
     IPage<Application> applicationList = applicationService.page(app, request);
-    List<Application> appRecords = applicationList.getRecords();
-    List<Long> appIds = 
appRecords.stream().map(Application::getId).collect(Collectors.toList());
-    Map<Long, PipelineStatus> pipeStates = 
appBuildPipeService.listPipelineStatus(appIds);
-
-    // add building pipeline status info and app control info
-    appRecords =
-        appRecords.stream()
-            .peek(
-                e -> {
-                  if (pipeStates.containsKey(e.getId())) {
-                    e.setBuildStatus(pipeStates.get(e.getId()).getCode());
-                  }
-                })
-            .peek(
-                e -> {
-                  AppControl appControl =
-                      new AppControl()
-                          .setAllowBuild(
-                              e.getBuildStatus() == null
-                                  || 
!PipelineStatus.running.getCode().equals(e.getBuildStatus()))
-                          .setAllowStart(
-                              !e.shouldBeTrack()
-                                  && 
PipelineStatus.success.getCode().equals(e.getBuildStatus()))
-                          .setAllowStop(e.isRunning());
-                  e.setAppControl(appControl);
-                })
-            .collect(Collectors.toList());
-    applicationList.setRecords(appRecords);
     return RestResponse.success(applicationList);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 0808afd57..c6c30fd73 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -161,6 +161,7 @@ public class Application implements Serializable {
   @TableField(updateStrategy = FieldStrategy.IGNORED)
   private Date endTime;
 
+  @TableField(updateStrategy = FieldStrategy.IGNORED)
   private Long duration;
 
   /** checkpoint max failure interval */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index bfc65de78..4339335a1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -41,6 +41,7 @@ import 
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.CommonUtils;
 import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.bean.MavenDependency;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
 import org.apache.streampark.console.core.entity.Application;
@@ -91,6 +92,7 @@ import 
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
+import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
 
 import org.apache.commons.io.FileUtils;
@@ -517,26 +519,56 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     this.baseMapper.page(page, appParam);
     List<Application> records = page.getRecords();
     long now = System.currentTimeMillis();
-    List<Application> newRecords =
+
+    List<Long> appIds = 
records.stream().map(Application::getId).collect(Collectors.toList());
+    Map<Long, PipelineStatus> pipeStates = 
appBuildPipeService.listPipelineStatus(appIds);
+
+    // add building pipeline status info and app control info
+    records =
         records.stream()
             .peek(
                 record -> {
-                  // status of flink job on kubernetes mode had been 
automatically persisted to db
-                  // in time.
+                  // 1) running Duration
+                  if (record.getTracking() == 1
+                      && record.getFlinkAppStateEnum() == 
FlinkAppState.RUNNING) {
+                    record.setDuration(now - record.getStartTime().getTime());
+                  }
+                  // 2) k8s restURL
                   if (record.isKubernetesModeJob()) {
                     // set duration
                     String restUrl =
                         
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
                     record.setFlinkRestUrl(restUrl);
-                    if (record.getTracking() == 1
-                        && record.getStartTime() != null
-                        && record.getStartTime().getTime() > 0) {
-                      record.setDuration(now - 
record.getStartTime().getTime());
-                    }
                   }
                 })
+            .peek(
+                record -> {
+                  // 3) buildStatus
+                  if (pipeStates.containsKey(record.getId())) {
+                    
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
+                  }
+                })
+            .peek(
+                record -> {
+                  // 4) appControl
+                  AppControl appControl =
+                      new AppControl()
+                          .setAllowBuild(
+                              record.getBuildStatus() == null
+                                  || !PipelineStatus.running
+                                      .getCode()
+                                      .equals(record.getBuildStatus()))
+                          .setAllowStart(
+                              !record.shouldBeTrack()
+                                  && PipelineStatus.success
+                                      .getCode()
+                                      .equals(record.getBuildStatus()))
+                          .setAllowStop(record.isRunning());
+                  record.setAppControl(appControl);
+                })
             .collect(Collectors.toList());
-    page.setRecords(newRecords);
+
+    page.setRecords(records);
     return page;
   }
 
@@ -1417,6 +1449,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   @Override
   public void persistMetrics(Application appParam) {
+    if (appParam.getFlinkAppStateEnum() == FlinkAppState.RUNNING) {
+      appParam.setEndTime(null);
+      appParam.setDuration(null);
+    }
     this.baseMapper.persistMetrics(appParam);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 63f5c18c0..83f3befca 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -45,6 +45,8 @@ import 
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -58,6 +60,7 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -133,6 +136,9 @@ public class FlinkAppHttpWatcher {
   private static final Cache<Long, Byte> CANCELING_CACHE =
       Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
 
+  private static final Cache<Long, StateChangeEvent> PREVIOUS_STATUS =
+      Caffeine.newBuilder().expireAfterWrite(24, TimeUnit.HOURS).build();
+
   /**
    * Task canceled tracking list, record who cancelled the tracking task 
Map<applicationId,userId>
    */
@@ -441,7 +447,11 @@ public class FlinkAppHttpWatcher {
     } else {
       WATCHING_APPS.put(application.getId(), application);
     }
-    applicationService.persistMetrics(application);
+    StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
+    StateChangeEvent nowEvent = StateChangeEvent.of(application);
+    if (!nowEvent.equals(event)) {
+      applicationService.persistMetrics(application);
+    }
   }
 
   /**
@@ -801,4 +811,39 @@ public class FlinkAppHttpWatcher {
   interface Callback<T, R> {
     R call(T e) throws Exception;
   }
+
+  @Getter
+  @Setter
+  static class StateChangeEvent {
+    private Long id;
+    private FlinkAppState appState;
+    private OptionState optionState;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof StateChangeEvent)) {
+        return false;
+      }
+      StateChangeEvent event = (StateChangeEvent) o;
+      return Objects.equals(id, event.id)
+          && appState == event.appState
+          && optionState == event.optionState;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(id, appState, optionState);
+    }
+
+    public static StateChangeEvent of(Application application) {
+      StateChangeEvent event = new StateChangeEvent();
+      event.setId(application.getId());
+      event.setOptionState(OptionState.of(application.getOptionState()));
+      event.setAppState(application.getFlinkAppStateEnum());
+      return event;
+    }
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/QueueTestCase.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/QueueTestCase.java
new file mode 100644
index 000000000..2a0f27f5a
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/QueueTestCase.java
@@ -0,0 +1,2 @@
+package org.apache.streampark.console.core;public class QueueTestCase {
+}

Reply via email to