This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 4ab4bb10c Run state (#3545)
4ab4bb10c is described below

commit 4ab4bb10c2c6a81bfe6e69119ff360f9ba56518e
Author: benjobs <[email protected]>
AuthorDate: Mon Feb 12 10:01:03 2024 +0800

    Run state (#3545)
    
    * [Improve] update job run state improvement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../console/core/service/impl/ApplicationServiceImpl.java         | 2 +-
 .../streampark/console/core/service/impl/ProjectServiceImpl.java  | 2 +-
 .../apache/streampark/console/core/task/FlinkAppHttpWatcher.java  | 8 ++++++--
 3 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 4339335a1..292ac3db9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -218,8 +218,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   private final ExecutorService bootstrapExecutor =
       new ThreadPoolExecutor(
-          1,
           CPU_NUM,
+          CPU_NUM * 5,
           60L,
           TimeUnit.SECONDS,
           new LinkedBlockingQueue<>(),
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 0285329bb..c7b66435c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -86,7 +86,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
 
   private final ExecutorService projectBuildExecutor =
       new ThreadPoolExecutor(
-          1,
+          CPU_NUM,
           CPU_NUM,
           60L,
           TimeUnit.SECONDS,
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 83f3befca..f05cb2afc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -450,6 +450,7 @@ public class FlinkAppHttpWatcher {
     StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
     StateChangeEvent nowEvent = StateChangeEvent.of(application);
     if (!nowEvent.equals(event)) {
+      PREVIOUS_STATUS.put(application.getId(), nowEvent);
       applicationService.persistMetrics(application);
     }
   }
@@ -816,6 +817,7 @@ public class FlinkAppHttpWatcher {
   @Setter
   static class StateChangeEvent {
     private Long id;
+    private String jobId;
     private FlinkAppState appState;
     private OptionState optionState;
 
@@ -824,18 +826,19 @@ public class FlinkAppHttpWatcher {
       if (this == o) {
         return true;
       }
-      if (!(o instanceof StateChangeEvent)) {
+      if (o == null || getClass() != o.getClass()) {
         return false;
       }
       StateChangeEvent event = (StateChangeEvent) o;
       return Objects.equals(id, event.id)
+          && Objects.equals(jobId, event.jobId)
           && appState == event.appState
           && optionState == event.optionState;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(id, appState, optionState);
+      return Objects.hash(id, jobId, appState, optionState);
     }
 
     public static StateChangeEvent of(Application application) {
@@ -843,6 +846,7 @@ public class FlinkAppHttpWatcher {
       event.setId(application.getId());
       event.setOptionState(OptionState.of(application.getOptionState()));
       event.setAppState(application.getFlinkAppStateEnum());
+      event.setJobId(application.getJobId());
       return event;
     }
   }

Reply via email to