This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 4ab4bb10c Run state (#3545)
4ab4bb10c is described below
commit 4ab4bb10c2c6a81bfe6e69119ff360f9ba56518e
Author: benjobs <[email protected]>
AuthorDate: Mon Feb 12 10:01:03 2024 +0800
Run state (#3545)
* [Improve] update job run state improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../console/core/service/impl/ApplicationServiceImpl.java | 2 +-
.../streampark/console/core/service/impl/ProjectServiceImpl.java | 2 +-
.../apache/streampark/console/core/task/FlinkAppHttpWatcher.java | 8 ++++++--
3 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 4339335a1..292ac3db9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -218,8 +218,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
private final ExecutorService bootstrapExecutor =
new ThreadPoolExecutor(
- 1,
CPU_NUM,
+ CPU_NUM * 5,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 0285329bb..c7b66435c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -86,7 +86,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
private final ExecutorService projectBuildExecutor =
new ThreadPoolExecutor(
- 1,
+ CPU_NUM,
CPU_NUM,
60L,
TimeUnit.SECONDS,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 83f3befca..f05cb2afc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -450,6 +450,7 @@ public class FlinkAppHttpWatcher {
StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
StateChangeEvent nowEvent = StateChangeEvent.of(application);
if (!nowEvent.equals(event)) {
+ PREVIOUS_STATUS.put(application.getId(), nowEvent);
applicationService.persistMetrics(application);
}
}
@@ -816,6 +817,7 @@ public class FlinkAppHttpWatcher {
@Setter
static class StateChangeEvent {
private Long id;
+ private String jobId;
private FlinkAppState appState;
private OptionState optionState;
@@ -824,18 +826,19 @@ public class FlinkAppHttpWatcher {
if (this == o) {
return true;
}
- if (!(o instanceof StateChangeEvent)) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
StateChangeEvent event = (StateChangeEvent) o;
return Objects.equals(id, event.id)
+ && Objects.equals(jobId, event.jobId)
&& appState == event.appState
&& optionState == event.optionState;
}
@Override
public int hashCode() {
- return Objects.hash(id, appState, optionState);
+ return Objects.hash(id, jobId, appState, optionState);
}
public static StateChangeEvent of(Application application) {
@@ -843,6 +846,7 @@ public class FlinkAppHttpWatcher {
event.setId(application.getId());
event.setOptionState(OptionState.of(application.getOptionState()));
event.setAppState(application.getFlinkAppStateEnum());
+ event.setJobId(application.getJobId());
return event;
}
}