This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 136431498 [Improve][Zeta] Speed up listAllJob function (#4852)
136431498 is described below
commit 13643149885db942a28999316136614400bd492a
Author: Jia Fan <[email protected]>
AuthorDate: Tue May 30 15:52:26 2023 +0800
[Improve][Zeta] Speed up listAllJob function (#4852)
---
.../engine/server/master/JobHistoryService.java | 91 ++++++++++++----------
1 file changed, 50 insertions(+), 41 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 0e7bbd5bc..12dcae40c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -102,7 +102,8 @@ public class JobHistoryService {
public String listAllJob() {
List<JobStatusData> status = new ArrayList<>();
Stream.concat(
-
runningJobMasterMap.values().stream().map(this::toJobStateMapper),
+ runningJobMasterMap.values().stream()
+ .map(master -> toJobStateMapper(master, true)),
finishedJobStateImap.values().stream())
.forEach(
jobState -> {
@@ -126,7 +127,7 @@ public class JobHistoryService {
// Get detailed status of a single job
public JobState getJobDetailState(Long jobId) {
return runningJobMasterMap.containsKey(jobId)
- ? toJobStateMapper(runningJobMasterMap.get(jobId))
+ ? toJobStateMapper(runningJobMasterMap.get(jobId), false)
: finishedJobStateImap.getOrDefault(jobId, null);
}
@@ -158,7 +159,7 @@ public class JobHistoryService {
@SuppressWarnings("checkstyle:MagicNumber")
public void storeFinishedJobState(JobMaster jobMaster) {
- JobState jobState = toJobStateMapper(jobMaster);
+ JobState jobState = toJobStateMapper(jobMaster, false);
jobState.setFinishTime(System.currentTimeMillis());
finishedJobStateImap.put(jobState.jobId, jobState, 14, TimeUnit.DAYS);
}
@@ -170,47 +171,55 @@ public class JobHistoryService {
finishedJobMetricsImap.put(jobId, newMetrics, 14, TimeUnit.DAYS);
}
- private JobState toJobStateMapper(JobMaster jobMaster) {
+ private JobState toJobStateMapper(JobMaster jobMaster, boolean simple) {
Long jobId = jobMaster.getJobImmutableInformation().getJobId();
Map<PipelineLocation, PipelineStateData> pipelineStateMapperMap = new
HashMap<>();
-
- jobMaster
- .getPhysicalPlan()
- .getPipelineList()
- .forEach(
- pipeline -> {
- PipelineLocation pipelineLocation =
pipeline.getPipelineLocation();
- PipelineStatus pipelineState =
- (PipelineStatus)
runningJobStateIMap.get(pipelineLocation);
- Map<TaskGroupLocation, ExecutionState>
taskStateMap = new HashMap<>();
- pipeline.getCoordinatorVertexList()
- .forEach(
- coordinator -> {
- TaskGroupLocation
taskGroupLocation =
-
coordinator.getTaskGroupLocation();
- taskStateMap.put(
- taskGroupLocation,
- (ExecutionState)
-
runningJobStateIMap.get(
-
taskGroupLocation));
- });
- pipeline.getPhysicalVertexList()
- .forEach(
- task -> {
- TaskGroupLocation
taskGroupLocation =
-
task.getTaskGroupLocation();
- taskStateMap.put(
- taskGroupLocation,
- (ExecutionState)
-
runningJobStateIMap.get(
-
taskGroupLocation));
- });
-
- PipelineStateData pipelineStateData =
- new PipelineStateData(pipelineState,
taskStateMap);
- pipelineStateMapperMap.put(pipelineLocation,
pipelineStateData);
- });
+ if (!simple) {
+ try {
+ jobMaster
+ .getPhysicalPlan()
+ .getPipelineList()
+ .forEach(
+ pipeline -> {
+ PipelineLocation pipelineLocation =
+ pipeline.getPipelineLocation();
+ PipelineStatus pipelineState =
+ (PipelineStatus)
+
runningJobStateIMap.get(pipelineLocation);
+ Map<TaskGroupLocation, ExecutionState>
taskStateMap =
+ new HashMap<>();
+ pipeline.getCoordinatorVertexList()
+ .forEach(
+ coordinator -> {
+ TaskGroupLocation
taskGroupLocation =
+
coordinator.getTaskGroupLocation();
+ taskStateMap.put(
+
taskGroupLocation,
+
(ExecutionState)
+
runningJobStateIMap.get(
+
taskGroupLocation));
+ });
+ pipeline.getPhysicalVertexList()
+ .forEach(
+ task -> {
+ TaskGroupLocation
taskGroupLocation =
+
task.getTaskGroupLocation();
+ taskStateMap.put(
+
taskGroupLocation,
+
(ExecutionState)
+
runningJobStateIMap.get(
+
taskGroupLocation));
+ });
+
+ PipelineStateData pipelineStateData =
+ new
PipelineStateData(pipelineState, taskStateMap);
+
pipelineStateMapperMap.put(pipelineLocation, pipelineStateData);
+ });
+ } catch (Exception e) {
+ logger.warning("get job pipeline state err", e);
+ }
+ }
JobStatus jobStatus = (JobStatus) runningJobStateIMap.get(jobId);
String jobName = jobMaster.getJobImmutableInformation().getJobName();
long submitTime =
jobMaster.getJobImmutableInformation().getCreateTime();