This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d80424d5bb [Fix][Zeta] Enhance job state display by including pending
jobs in status retrieval (#9489)
d80424d5bb is described below
commit d80424d5bba4e3020de494d757a3b287bb2ea09f
Author: corgy-w <[email protected]>
AuthorDate: Mon Jun 30 19:10:11 2025 +0800
[Fix][Zeta] Enhance job state display by including pending jobs in status
retrieval (#9489)
---
.../client/SeaTunnelEngineClusterRoleTest.java | 5 ++++
.../engine/server/CoordinatorService.java | 7 +++++
.../engine/server/master/JobHistoryService.java | 30 ++++++++++++++++++++--
.../telemetry/metrics/entity/JobCounter.java | 1 +
.../metrics/exports/JobMetricExports.java | 1 +
5 files changed, 42 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
index bed49f4167..6607e3e458 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
@@ -222,6 +222,9 @@ public class SeaTunnelEngineClusterRoleTest {
() ->
Assertions.assertEquals(
clientJobProxy.getJobStatus(),
JobStatus.PENDING));
+ String status = seaTunnelClient.listJobStatus();
+ status.contains("PENDING");
+
// start two worker nodes
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
@@ -284,6 +287,8 @@ public class SeaTunnelEngineClusterRoleTest {
() ->
Assertions.assertEquals(
clientJobProxy.getJobStatus(),
JobStatus.PENDING));
+ String status = seaTunnelClient.listJobStatus();
+ status.contains("PENDING");
// Cancel the job in the pending state
seaTunnelClient.getJobClient().cancelJob(clientJobProxy.getJobId());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 27129236ca..88af17e1ca 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -966,6 +966,8 @@ public class CoordinatorService {
"Job info detail",
"createdJobCount",
jobCounter.getCreatedJobCount(),
+ "pendingJobCount",
+ jobCounter.getPendingJobCount(),
"scheduledJobCount",
jobCounter.getScheduledJobCount(),
"runningJobCount",
@@ -986,6 +988,7 @@ public class CoordinatorService {
AtomicLong createdJobCount = new AtomicLong();
AtomicLong scheduledJobCount = new AtomicLong();
AtomicLong runningJobCount = new AtomicLong();
+ AtomicLong pendingJobCount = new AtomicLong();
AtomicLong failingJobCount = new AtomicLong();
AtomicLong failedJobCount = new AtomicLong();
AtomicLong cancellingJobCount = new AtomicLong();
@@ -1002,6 +1005,9 @@ public class CoordinatorService {
case CREATED:
createdJobCount.addAndGet(1);
break;
+ case PENDING:
+ pendingJobCount.addAndGet(1);
+ break;
case SCHEDULED:
scheduledJobCount.addAndGet(1);
break;
@@ -1030,6 +1036,7 @@ public class CoordinatorService {
return new JobCounter(
createdJobCount.longValue(),
+ pendingJobCount.longValue(),
scheduledJobCount.longValue(),
runningJobCount.longValue(),
failingJobCount.longValue(),
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 28d66161ea..e158656712 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -144,10 +144,36 @@ public class JobHistoryService {
.collect(Collectors.toList());
Set<Long> runningJonIds =
runningJobStateList.stream().map(JobState::getJobId).collect(Collectors.toSet());
+
+ List<JobState> pendingJobStateList =
+ pendingJobMasterMap.entrySet().stream()
+ .map(
+ entry -> {
+ Long jobId = entry.getKey();
+ JobImmutableInformation
jobImmutableInformation =
+
entry.getValue()._2.getJobImmutableInformation();
+ return new JobState(
+ jobId,
+
jobImmutableInformation.getJobName(),
+ JobStatus.PENDING,
+
jobImmutableInformation.getCreateTime(),
+ null,
+ null,
+ null,
+ null);
+ })
+ .collect(Collectors.toList());
+ Set<Long> pendingJobIds =
+
pendingJobStateList.stream().map(JobState::getJobId).collect(Collectors.toSet());
+
Stream.concat(
- runningJobStateList.stream(),
+ Stream.concat(runningJobStateList.stream(),
pendingJobStateList.stream()),
finishedJobStateImap.values().stream()
- .filter(jobState ->
!runningJonIds.contains(jobState.getJobId())))
+ .filter(
+ jobState ->
+
!runningJonIds.contains(jobState.getJobId())
+ &&
!pendingJobIds.contains(
+
jobState.getJobId())))
.forEach(
jobState -> {
JobStatusData jobStatusData =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/entity/JobCounter.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/entity/JobCounter.java
index a965bed7c6..1ef9bbe22a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/entity/JobCounter.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/entity/JobCounter.java
@@ -24,6 +24,7 @@ import lombok.Data;
@AllArgsConstructor
public class JobCounter {
private long createdJobCount;
+ private long pendingJobCount;
private long scheduledJobCount;
private long runningJobCount;
private long failingJobCount;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobMetricExports.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobMetricExports.java
index 7a7bf7f900..53ff9e05a6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobMetricExports.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobMetricExports.java
@@ -51,6 +51,7 @@ public class JobMetricExports extends AbstractCollector {
metricFamily.addMetric(
labelValues("cancelling"),
jobCountMetrics.getCancellingJobCount());
metricFamily.addMetric(labelValues("created"),
jobCountMetrics.getCreatedJobCount());
+ metricFamily.addMetric(labelValues("pending"),
jobCountMetrics.getPendingJobCount());
metricFamily.addMetric(labelValues("failed"),
jobCountMetrics.getFailedJobCount());
metricFamily.addMetric(labelValues("failing"),
jobCountMetrics.getFailingJobCount());
metricFamily.addMetric(labelValues("finished"),
jobCountMetrics.getFinishedJobCount());