This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d80424d5bb [Fix][Zeta] Enhance job state display by including pending 
jobs in status retrieval (#9489)
d80424d5bb is described below

commit d80424d5bba4e3020de494d757a3b287bb2ea09f
Author: corgy-w <[email protected]>
AuthorDate: Mon Jun 30 19:10:11 2025 +0800

    [Fix][Zeta] Enhance job state display by including pending jobs in status 
retrieval (#9489)
---
 .../client/SeaTunnelEngineClusterRoleTest.java     |  5 ++++
 .../engine/server/CoordinatorService.java          |  7 +++++
 .../engine/server/master/JobHistoryService.java    | 30 ++++++++++++++++++++--
 .../telemetry/metrics/entity/JobCounter.java       |  1 +
 .../metrics/exports/JobMetricExports.java          |  1 +
 5 files changed, 42 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
index bed49f4167..6607e3e458 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
@@ -222,6 +222,9 @@ public class SeaTunnelEngineClusterRoleTest {
                             () ->
                                     Assertions.assertEquals(
                                             clientJobProxy.getJobStatus(), 
JobStatus.PENDING));
+            String status = seaTunnelClient.listJobStatus();
+            status.contains("PENDING");
+
             // start two worker nodes
             
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
             
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
@@ -284,6 +287,8 @@ public class SeaTunnelEngineClusterRoleTest {
                             () ->
                                     Assertions.assertEquals(
                                             clientJobProxy.getJobStatus(), 
JobStatus.PENDING));
+            String status = seaTunnelClient.listJobStatus();
+            status.contains("PENDING");
 
             // Cancel the job in the pending state
             
seaTunnelClient.getJobClient().cancelJob(clientJobProxy.getJobId());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 27129236ca..88af17e1ca 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -966,6 +966,8 @@ public class CoordinatorService {
                         "Job info detail",
                         "createdJobCount",
                         jobCounter.getCreatedJobCount(),
+                        "pendingJobCount",
+                        jobCounter.getPendingJobCount(),
                         "scheduledJobCount",
                         jobCounter.getScheduledJobCount(),
                         "runningJobCount",
@@ -986,6 +988,7 @@ public class CoordinatorService {
         AtomicLong createdJobCount = new AtomicLong();
         AtomicLong scheduledJobCount = new AtomicLong();
         AtomicLong runningJobCount = new AtomicLong();
+        AtomicLong pendingJobCount = new AtomicLong();
         AtomicLong failingJobCount = new AtomicLong();
         AtomicLong failedJobCount = new AtomicLong();
         AtomicLong cancellingJobCount = new AtomicLong();
@@ -1002,6 +1005,9 @@ public class CoordinatorService {
                                     case CREATED:
                                         createdJobCount.addAndGet(1);
                                         break;
+                                    case PENDING:
+                                        pendingJobCount.addAndGet(1);
+                                        break;
                                     case SCHEDULED:
                                         scheduledJobCount.addAndGet(1);
                                         break;
@@ -1030,6 +1036,7 @@ public class CoordinatorService {
 
         return new JobCounter(
                 createdJobCount.longValue(),
+                pendingJobCount.longValue(),
                 scheduledJobCount.longValue(),
                 runningJobCount.longValue(),
                 failingJobCount.longValue(),
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 28d66161ea..e158656712 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -144,10 +144,36 @@ public class JobHistoryService {
                         .collect(Collectors.toList());
         Set<Long> runningJonIds =
                 
runningJobStateList.stream().map(JobState::getJobId).collect(Collectors.toSet());
+
+        List<JobState> pendingJobStateList =
+                pendingJobMasterMap.entrySet().stream()
+                        .map(
+                                entry -> {
+                                    Long jobId = entry.getKey();
+                                    JobImmutableInformation 
jobImmutableInformation =
+                                            
entry.getValue()._2.getJobImmutableInformation();
+                                    return new JobState(
+                                            jobId,
+                                            
jobImmutableInformation.getJobName(),
+                                            JobStatus.PENDING,
+                                            
jobImmutableInformation.getCreateTime(),
+                                            null,
+                                            null,
+                                            null,
+                                            null);
+                                })
+                        .collect(Collectors.toList());
+        Set<Long> pendingJobIds =
+                
pendingJobStateList.stream().map(JobState::getJobId).collect(Collectors.toSet());
+
         Stream.concat(
-                        runningJobStateList.stream(),
+                        Stream.concat(runningJobStateList.stream(), 
pendingJobStateList.stream()),
                         finishedJobStateImap.values().stream()
-                                .filter(jobState -> 
!runningJonIds.contains(jobState.getJobId())))
+                                .filter(
+                                        jobState ->
+                                                
!runningJonIds.contains(jobState.getJobId())
+                                                        && 
!pendingJobIds.contains(
+                                                                
jobState.getJobId())))
                 .forEach(
                         jobState -> {
                             JobStatusData jobStatusData =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/entity/JobCounter.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/entity/JobCounter.java
index a965bed7c6..1ef9bbe22a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/entity/JobCounter.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/entity/JobCounter.java
@@ -24,6 +24,7 @@ import lombok.Data;
 @AllArgsConstructor
 public class JobCounter {
     private long createdJobCount;
+    private long pendingJobCount;
     private long scheduledJobCount;
     private long runningJobCount;
     private long failingJobCount;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobMetricExports.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobMetricExports.java
index 7a7bf7f900..53ff9e05a6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobMetricExports.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobMetricExports.java
@@ -51,6 +51,7 @@ public class JobMetricExports extends AbstractCollector {
             metricFamily.addMetric(
                     labelValues("cancelling"), 
jobCountMetrics.getCancellingJobCount());
             metricFamily.addMetric(labelValues("created"), 
jobCountMetrics.getCreatedJobCount());
+            metricFamily.addMetric(labelValues("pending"), 
jobCountMetrics.getPendingJobCount());
             metricFamily.addMetric(labelValues("failed"), 
jobCountMetrics.getFailedJobCount());
             metricFamily.addMetric(labelValues("failing"), 
jobCountMetrics.getFailingJobCount());
             metricFamily.addMetric(labelValues("finished"), 
jobCountMetrics.getFinishedJobCount());

Reply via email to