Copilot commented on code in PR #10078:
URL: https://github.com/apache/seatunnel/pull/10078#discussion_r2541317358


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.PendingJobInfo;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import 
org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+@Slf4j
+public final class PendingDiagnosticsCollector {
+
+    private static final String REASON_WAITING = "WAITING_SLOT_ASSIGNMENT";
+    private static final String REASON_RESOURCE_NOT_ENOUGH = 
"RESOURCE_NOT_ENOUGH";
+    private static final String REASON_REQUEST_FAILED = "REQUEST_FAILED";
+    private static final String REASON_REQUEST_CANCELLED = "REQUEST_CANCELLED";
+
+    private PendingDiagnosticsCollector() {}
+
+    public static PendingJobDiagnostic collectJobDiagnostic(
+            PendingJobInfo pendingJobInfo,
+            Map<String, String> tagFilter,
+            ResourceManager resourceManager) {
+        if (pendingJobInfo == null) {
+            return null;
+        }
+        JobMaster jobMaster = pendingJobInfo.getJobMaster();
+        PendingJobDiagnostic diagnostic = new PendingJobDiagnostic();
+        diagnostic.setJobId(jobMaster.getJobId());
+        
diagnostic.setJobName(jobMaster.getJobImmutableInformation().getJobName());
+        
diagnostic.setPendingSourceState(pendingJobInfo.getPendingSourceState());
+        diagnostic.setJobStatus(jobMaster.getJobStatus());
+        diagnostic.setEnqueueTimestamp(pendingJobInfo.getEnqueueTimestamp());
+        diagnostic.setCheckTime(System.currentTimeMillis());
+        diagnostic.setWaitDurationMs(
+                diagnostic.getCheckTime() - 
pendingJobInfo.getEnqueueTimestamp());
+        diagnostic.setTagFilter(
+                tagFilter == null ? Collections.emptyMap() : new 
HashMap<>(tagFilter));
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> requestFutures =
+                Optional.ofNullable(jobMaster.getPhysicalPlan())
+                        .map(PhysicalPlan::getPreApplyResourceFutures)
+                        .map(HashMap::new)
+                        .orElseGet(HashMap::new);
+
+        buildPipelineDiagnostics(jobMaster, requestFutures, diagnostic);
+        diagnostic.setTotalTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getTotalTaskGroups)
+                        .sum());
+        diagnostic.setAllocatedTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getAllocatedTaskGroups)
+                        .sum());
+        diagnostic.setLackingTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getLackingTaskGroups)
+                        .sum());
+
+        updateFailureReason(diagnostic);
+        diagnostic.setBlockingJobIds(collectBlockingJobs(resourceManager, 
jobMaster.getJobId()));
+
+        return diagnostic;
+    }
+
+    private static void buildPipelineDiagnostics(
+            JobMaster jobMaster,
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
requestFutures,
+            PendingJobDiagnostic diagnostic) {
+        PhysicalPlan plan = jobMaster.getPhysicalPlan();
+        if (plan == null) {
+            diagnostic.setFailureReason(REASON_WAITING);
+            diagnostic.setFailureMessage("Job master not initialized");
+            return;
+        }
+        for (SubPlan subPlan : plan.getPipelineList()) {
+            PendingPipelineDiagnostic pipelineDiagnostic = new 
PendingPipelineDiagnostic();
+            pipelineDiagnostic.setPipelineId(subPlan.getPipelineId());
+            pipelineDiagnostic.setPipelineName(subPlan.getPipelineFullName());
+
+            List<PhysicalVertex> vertices = new ArrayList<>();
+            vertices.addAll(subPlan.getCoordinatorVertexList());
+            vertices.addAll(subPlan.getPhysicalVertexList());
+
+            int allocated = 0;
+            int lacking = 0;
+            for (PhysicalVertex vertex : vertices) {
+                TaskGroupLocation location = vertex.getTaskGroupLocation();
+                PendingTaskGroupDiagnostic taskDiagnostic =
+                        buildTaskDiagnostic(
+                                location, vertex.getTaskFullName(), 
requestFutures.get(location));
+                
pipelineDiagnostic.getTaskGroupDiagnostics().add(taskDiagnostic);
+                if (taskDiagnostic.isAllocated()) {
+                    allocated++;
+                } else {
+                    lacking++;
+                    
diagnostic.getLackingTaskGroupDiagnostics().add(taskDiagnostic);
+                }
+            }
+
+            pipelineDiagnostic.setTotalTaskGroups(vertices.size());
+            pipelineDiagnostic.setAllocatedTaskGroups(allocated);
+            pipelineDiagnostic.setLackingTaskGroups(lacking);
+            diagnostic.getPipelines().add(pipelineDiagnostic);
+        }
+    }
+
+    private static PendingTaskGroupDiagnostic buildTaskDiagnostic(
+            TaskGroupLocation location,
+            String taskFullName,
+            CompletableFuture<SlotProfile> future) {
+        PendingTaskGroupDiagnostic diagnostic = new 
PendingTaskGroupDiagnostic();
+        diagnostic.setTaskGroupLocation(location);
+        diagnostic.setTaskFullName(taskFullName);
+
+        if (future == null) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH);
+            diagnostic.setFailureMessage("Slot request future not created");
+            return diagnostic;
+        }
+
+        if (future.isCancelled()) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_REQUEST_CANCELLED);
+            diagnostic.setFailureMessage("Slot request cancelled by resource 
manager");
+            return diagnostic;
+        }
+
+        if (!future.isDone()) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_WAITING);
+            diagnostic.setFailureMessage("Slot request still pending");
+            return diagnostic;
+        }
+        try {
+            SlotProfile slotProfile = future.join();
+            if (slotProfile != null) {
+                diagnostic.setAllocated(true);
+                return diagnostic;
+            }
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH);
+            diagnostic.setFailureMessage("No available slot profile");
+        } catch (CompletionException e) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_REQUEST_FAILED);
+            diagnostic.setFailureMessage(ExceptionUtils.getMessage(e));
+        }
+        return diagnostic;
+    }
+
+    private static void updateFailureReason(PendingJobDiagnostic diagnostic) {
+        if (diagnostic.getLackingTaskGroupDiagnostics().isEmpty()) {
+            if (diagnostic.getFailureReason() == null) {
+                diagnostic.setFailureReason(REASON_WAITING);
+                diagnostic.setFailureMessage("Job is waiting for scheduler to 
retry");
+            }
+            return;
+        }
+
+        Map<String, Long> reasonCounter =
+                diagnostic.getLackingTaskGroupDiagnostics().stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        
PendingTaskGroupDiagnostic::getFailureReason,
+                                        Collectors.counting()));
+        String dominantReason =
+                reasonCounter.entrySet().stream()
+                        .max(Map.Entry.comparingByValue())
+                        .map(Map.Entry::getKey)
+                        .orElse(REASON_RESOURCE_NOT_ENOUGH);
+        diagnostic.setFailureReason(dominantReason);
+        diagnostic.setFailureMessage(
+                diagnostic.getLackingTaskGroupDiagnostics().stream()
+                        .filter(diag -> 
dominantReason.equals(diag.getFailureReason()))
+                        .map(PendingTaskGroupDiagnostic::getFailureMessage)
+                        .filter(message -> message != null && 
!message.isEmpty())
+                        .distinct()
+                        .collect(Collectors.joining("; ")));
+    }
+
+    private static List<Long> collectBlockingJobs(ResourceManager 
resourceManager, long jobId) {
+        if (resourceManager == null) {
+            return Collections.emptyList();
+        }
+        List<SlotProfile> assignedSlots = Collections.emptyList();
+        try {
+            assignedSlots = 
resourceManager.getAssignedSlots(Collections.emptyMap());

Review Comment:
   The method collects blocking job IDs by getting all assigned slots with an 
empty tag filter on line 225, but the pending job may have specific tag 
requirements. This could report jobs as "blocking" even when they use slots 
with different tags that wouldn't be available to the pending job anyway. 
Consider passing the job's `tagFilter` to `getAssignedSlots()` to only report 
truly blocking jobs.
   ```suggestion
       private static List<Long> collectBlockingJobs(ResourceManager 
resourceManager, long jobId, Map<String, String> tagFilter) {
           if (resourceManager == null) {
               return Collections.emptyList();
           }
           Map<String, String> tags = tagFilter == null ? 
Collections.emptyMap() : new HashMap<>(tagFilter);
           List<SlotProfile> assignedSlots = Collections.emptyList();
           try {
               assignedSlots = resourceManager.getAssignedSlots(tags);
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+import org.apache.seatunnel.engine.server.rest.service.PendingJobsService;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class PendingJobsServlet extends BaseServlet {
+
+    private final PendingJobsService pendingJobsService;
+    private static final Set<String> TIMESTAMP_FIELDS =
+            new HashSet<>(
+                    Arrays.asList(
+                            "oldestEnqueueTimestamp",
+                            "newestEnqueueTimestamp",
+                            "enqueueTimestamp",
+                            "checkTime"));
+    private static final DateTimeFormatter PRETTY_TIME_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss").withZone(ZoneId.systemDefault());
+    private static final Gson PRETTY_GSON = new 
GsonBuilder().setPrettyPrinting().create();
+
+    public PendingJobsServlet(NodeEngineImpl nodeEngine) {
+        super(nodeEngine);
+        this.pendingJobsService = new PendingJobsService(nodeEngine);
+    }
+
+    @Override
+    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+            throws ServletException, IOException {
+
+        Map<String, String> params = new HashMap<>(getParameterMap(req));
+        Long jobId = null;
+        int limit = 0;
+        boolean pretty = false;
+        if (params.containsKey(RestConstant.JOB_ID)) {
+            try {
+                jobId = Long.parseLong(params.remove(RestConstant.JOB_ID));
+            } catch (NumberFormatException e) {
+                resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid 
jobId");
+                return;
+            }
+        }
+
+        if (params.containsKey(RestConstant.LIMIT)) {
+            try {
+                limit = Integer.parseInt(params.remove(RestConstant.LIMIT));
+            } catch (NumberFormatException e) {
+                resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid 
limit");
+                return;
+            }
+        }
+
+        if (params.containsKey(RestConstant.PRETTY)) {
+            pretty = Boolean.parseBoolean(params.remove(RestConstant.PRETTY));
+        }
+
+        PendingJobsResponse response = 
pendingJobsService.getPendingJobs(params, jobId, limit);
+        if (pretty) {
+            writePrettyResponse(resp, response);
+        } else {
+            writeJson(resp, response);
+        }
+    }
+
+    private void writePrettyResponse(HttpServletResponse resp, 
PendingJobsResponse response)
+            throws IOException {
+        JsonElement tree = PRETTY_GSON.toJsonTree(response);
+        formatTimestampFields(tree);
+        resp.setCharacterEncoding("UTF-8");
+        resp.setContentType("application/json; charset=UTF-8");
+        resp.getWriter().write(PRETTY_GSON.toJson(tree));
+    }
+
+    private void formatTimestampFields(JsonElement element) {
+        if (element == null || element.isJsonNull()) {
+            return;
+        }
+        if (element.isJsonObject()) {
+            JsonObject object = element.getAsJsonObject();
+            for (Map.Entry<String, JsonElement> entry : object.entrySet()) {
+                JsonElement value = entry.getValue();
+                if (shouldFormatTimestamp(entry.getKey(), value)) {
+                    long timestamp = value.getAsLong();
+                    object.addProperty(entry.getKey(), 
formatTimestamp(timestamp));
+                } else {
+                    formatTimestampFields(value);
+                }
+            }
+        } else if (element.isJsonArray()) {
+            JsonArray array = element.getAsJsonArray();
+            for (JsonElement child : array) {
+                formatTimestampFields(child);
+            }
+        }
+    }
+
+    private boolean shouldFormatTimestamp(String key, JsonElement element) {
+        if (!TIMESTAMP_FIELDS.contains(key) || element == null) {

Review Comment:
   [nitpick] The method `shouldFormatTimestamp` returns `false` if `element` is 
null on line 138, but this check is redundant since line 141 already checks 
`!element.isJsonPrimitive()`, and line 115 guards against null elements at the 
call site. Consider simplifying by removing the null check on line 138.
   ```suggestion
           if (!TIMESTAMP_FIELDS.contains(key)) {
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1074,6 +1091,72 @@ public ConnectorPackageService 
getConnectorPackageService() {
         return connectorPackageService;
     }
 
+    public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long 
jobId, int limit) {
+        Collection<PendingJobInfo> allPendingJobs =
+                new ArrayList<>(pendingJobQueue.getJobIdMap().values());
+
+        List<PendingJobInfo> selectedJobs = new ArrayList<>();
+        if (jobId != null) {
+            PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId);
+            if (pendingJobInfo != null) {
+                selectedJobs.add(pendingJobInfo);
+            }
+        } else {
+            selectedJobs.addAll(allPendingJobs);
+            
selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp));
+            if (limit > 0 && selectedJobs.size() > limit) {
+                selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit));
+            }

Review Comment:
   In line 1108, when `limit > 0` and the list size exceeds the limit, a new 
ArrayList is created with `subList(0, limit)`. However, if `limit` is negative, 
it's treated as "no limit" per the check on line 1107. The API documentation 
should clarify this behavior, as negative values for `limit` are 
unconventional. Consider rejecting negative values or documenting them 
explicitly.



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollectorTest.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.PendingJobInfo;
+import org.apache.seatunnel.engine.server.execution.PendingSourceState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PendingDiagnosticsCollectorTest {
+
+    @Test
+    public void testCollectJobDiagnosticWithFailures() {
+        JobMaster jobMaster = Mockito.mock(JobMaster.class);
+        Mockito.when(jobMaster.getJobId()).thenReturn(1000L);
+        JobImmutableInformation jobImmutableInformation =
+                Mockito.mock(JobImmutableInformation.class);
+        
Mockito.when(jobImmutableInformation.getJobName()).thenReturn("test_job");
+        
Mockito.when(jobMaster.getJobImmutableInformation()).thenReturn(jobImmutableInformation);
+        Mockito.when(jobMaster.getJobStatus()).thenReturn(JobStatus.PENDING);
+
+        PhysicalPlan physicalPlan = Mockito.mock(PhysicalPlan.class);
+        Mockito.when(jobMaster.getPhysicalPlan()).thenReturn(physicalPlan);
+
+        SubPlan subPlan = Mockito.mock(SubPlan.class);
+        Mockito.when(subPlan.getPipelineId()).thenReturn(1);
+        Mockito.when(subPlan.getPipelineFullName()).thenReturn("pipeline-1");
+
+        PhysicalVertex vertexSuccess = Mockito.mock(PhysicalVertex.class);
+        TaskGroupLocation locationSuccess = new TaskGroupLocation(1000L, 1, 
1L);
+        
Mockito.when(vertexSuccess.getTaskGroupLocation()).thenReturn(locationSuccess);
+        
Mockito.when(vertexSuccess.getTaskFullName()).thenReturn("task-success");
+
+        PhysicalVertex vertexFailA = Mockito.mock(PhysicalVertex.class);
+        TaskGroupLocation locationFailA = new TaskGroupLocation(1000L, 1, 2L);
+        
Mockito.when(vertexFailA.getTaskGroupLocation()).thenReturn(locationFailA);
+        Mockito.when(vertexFailA.getTaskFullName()).thenReturn("task-fail-a");
+
+        PhysicalVertex vertexFailB = Mockito.mock(PhysicalVertex.class);
+        TaskGroupLocation locationFailB = new TaskGroupLocation(1000L, 1, 3L);
+        
Mockito.when(vertexFailB.getTaskGroupLocation()).thenReturn(locationFailB);
+        Mockito.when(vertexFailB.getTaskFullName()).thenReturn("task-fail-b");
+
+        
Mockito.when(subPlan.getCoordinatorVertexList()).thenReturn(Collections.emptyList());
+        Mockito.when(subPlan.getPhysicalVertexList())
+                .thenReturn(Arrays.asList(vertexSuccess, vertexFailA, 
vertexFailB));
+        
Mockito.when(physicalPlan.getPipelineList()).thenReturn(Collections.singletonList(subPlan));
+
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new 
HashMap<>();
+        CompletableFuture<SlotProfile> successFuture =
+                
CompletableFuture.completedFuture(Mockito.mock(SlotProfile.class));
+        futures.put(locationSuccess, successFuture);
+
+        CompletableFuture<SlotProfile> failFutureA = new CompletableFuture<>();
+        failFutureA.completeExceptionally(new RuntimeException("no slot 
available"));
+        futures.put(locationFailA, failFutureA);
+
+        CompletableFuture<SlotProfile> failFutureB = new CompletableFuture<>();
+        failFutureB.completeExceptionally(new RuntimeException("worker busy"));
+        futures.put(locationFailB, failFutureB);
+
+        
Mockito.when(physicalPlan.getPreApplyResourceFutures()).thenReturn(futures);
+
+        PendingJobInfo pendingJobInfo = new 
PendingJobInfo(PendingSourceState.SUBMIT, jobMaster);
+
+        ResourceManager resourceManager = Mockito.mock(ResourceManager.class);
+        SlotProfile blockingSlot = Mockito.mock(SlotProfile.class);
+        Mockito.when(blockingSlot.getOwnerJobID()).thenReturn(2000L);
+        Mockito.when(resourceManager.getAssignedSlots(Mockito.anyMap()))
+                .thenReturn(Collections.singletonList(blockingSlot));
+
+        PendingJobDiagnostic diagnostic =
+                PendingDiagnosticsCollector.collectJobDiagnostic(
+                        pendingJobInfo, Collections.emptyMap(), 
resourceManager);
+

Review Comment:
   [nitpick] In the test, the mock returns "test_job" for `getJobName()` on 
line 50, but the assertion on line 108 checks for `lackingTaskGroups` to equal 
2. However, the test setup on lines 82-92 creates 3 futures: 1 success and 2 
failures. This means all 3 task groups are checked, but only 2 should be marked 
as "lacking" (the failed ones). The assertion is correct but would benefit from 
a comment explaining that successful allocations are not counted as "lacking".
   ```suggestion
   
           // Only the failed task group allocations are counted as "lacking".
           // Of the 3 task groups, 1 allocation succeeded and 2 failed, so we 
expect 2 lacking task groups.
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+import org.apache.seatunnel.engine.server.rest.service.PendingJobsService;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class PendingJobsServlet extends BaseServlet {
+
+    private final PendingJobsService pendingJobsService;
+    private static final Set<String> TIMESTAMP_FIELDS =
+            new HashSet<>(
+                    Arrays.asList(
+                            "oldestEnqueueTimestamp",
+                            "newestEnqueueTimestamp",
+                            "enqueueTimestamp",
+                            "checkTime"));
+    private static final DateTimeFormatter PRETTY_TIME_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss").withZone(ZoneId.systemDefault());
+    private static final Gson PRETTY_GSON = new 
GsonBuilder().setPrettyPrinting().create();
+
+    public PendingJobsServlet(NodeEngineImpl nodeEngine) {
+        super(nodeEngine);
+        this.pendingJobsService = new PendingJobsService(nodeEngine);
+    }
+
+    @Override
+    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+            throws ServletException, IOException {
+
+        Map<String, String> params = new HashMap<>(getParameterMap(req));
+        Long jobId = null;
+        int limit = 0;
+        boolean pretty = false;
+        if (params.containsKey(RestConstant.JOB_ID)) {
+            try {
+                jobId = Long.parseLong(params.remove(RestConstant.JOB_ID));
+            } catch (NumberFormatException e) {
+                resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid 
jobId");
+                return;
+            }
+        }
+
+        if (params.containsKey(RestConstant.LIMIT)) {
+            try {
+                limit = Integer.parseInt(params.remove(RestConstant.LIMIT));
+            } catch (NumberFormatException e) {
+                resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid 
limit");
+                return;
+            }
+        }

Review Comment:
   The `limit` parameter allows a value of 0, which would result in no jobs 
being returned even if jobs are pending. Consider adding validation to ensure 
`limit` is either negative (no limit) or a positive integer. For example, 
reject `limit=0` with a `400 BAD_REQUEST` or treat it as "no limit".



##########
docs/en/seatunnel-engine/rest-api-v2.md:
##########
@@ -956,4 +1109,4 @@ To get the metrics, you need to open `Telemetry` first, or 
you will get an empty
 
 More information about `Telemetry` can be found in the 
[Telemetry](telemetry.md) documentation.
 
-</details>
\ No newline at end of file
+</details>

Review Comment:
   The English documentation has incorrect indentation on line 1112, where 
`</details>` is misaligned with extra leading whitespace. This should be at the 
same indentation level as the opening tag to maintain consistency with other 
sections.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java:
##########
@@ -40,4 +49,30 @@ public JobMaster getJobMaster() {
     public Long getJobId() {
         return jobMaster.getJobId();
     }
+
+    public long getEnqueueTimestamp() {
+        return enqueueTimestamp;
+    }
+
+    public long getLastCheckTime() {
+        return lastCheckTime;
+    }
+
+    public int getCheckTimes() {
+        return checkTimes.get();
+    }
+
+    public PendingJobDiagnostic getLastSnapshot() {
+        return lastSnapshot;
+    }
+
+    public void recordSnapshot(PendingJobDiagnostic snapshot) {
+        if (snapshot == null) {
+            return;
+        }
+        this.lastSnapshot = snapshot;
+        this.lastCheckTime = snapshot.getCheckTime();
+        int current = this.checkTimes.incrementAndGet();
+        snapshot.setCheckCount(current);
+    }

Review Comment:
   The `recordSnapshot` method should validate that the snapshot's `checkTime` 
is not earlier than the current `lastCheckTime` to prevent recording outdated 
snapshots. Consider adding a check: `if (snapshot.getCheckTime() < 
this.lastCheckTime) return;` before updating the fields.



##########
docs/zh/seatunnel-engine/rest-api-v2.md:
##########
@@ -945,4 +1096,4 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 
 更多关于`Telemetry`的信息可以在[Telemetry](telemetry.md)文档中找到。
 
-</details>
\ No newline at end of file
+</details>

Review Comment:
   The Chinese documentation has incorrect indentation on line 1099, where 
`</details>` is misaligned. This should be at the same indentation level as the 
opening tag to maintain consistency with other sections.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1074,6 +1091,72 @@ public ConnectorPackageService 
getConnectorPackageService() {
         return connectorPackageService;
     }
 
+    public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long 
jobId, int limit) {
+        Collection<PendingJobInfo> allPendingJobs =
+                new ArrayList<>(pendingJobQueue.getJobIdMap().values());
+
+        List<PendingJobInfo> selectedJobs = new ArrayList<>();
+        if (jobId != null) {
+            PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId);
+            if (pendingJobInfo != null) {
+                selectedJobs.add(pendingJobInfo);
+            }
+        } else {
+            selectedJobs.addAll(allPendingJobs);
+            
selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp));
+            if (limit > 0 && selectedJobs.size() > limit) {
+                selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit));
+            }
+        }

Review Comment:
   The documentation states that `limit` and `jobId` parameters are mutually 
exclusive (line 132 in Chinese version, line 136 in English version), but the 
servlet implementation doesn't enforce this. If both are provided, `jobId` 
takes precedence and `limit` is silently ignored. Consider either enforcing 
mutual exclusivity with a 400 error or updating the documentation to reflect 
the actual precedence behavior.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java:
##########
@@ -40,4 +49,30 @@ public JobMaster getJobMaster() {
     public Long getJobId() {
         return jobMaster.getJobId();
     }
+
+    public long getEnqueueTimestamp() {
+        return enqueueTimestamp;
+    }
+
+    public long getLastCheckTime() {
+        return lastCheckTime;
+    }
+
+    public int getCheckTimes() {
+        return checkTimes.get();
+    }
+
+    public PendingJobDiagnostic getLastSnapshot() {
+        return lastSnapshot;
+    }
+
+    public void recordSnapshot(PendingJobDiagnostic snapshot) {
+        if (snapshot == null) {
+            return;
+        }
+        this.lastSnapshot = snapshot;
+        this.lastCheckTime = snapshot.getCheckTime();
+        int current = this.checkTimes.incrementAndGet();
+        snapshot.setCheckCount(current);
+    }

Review Comment:
   The `lastSnapshot` field is marked as `volatile` on line 32, but the 
`recordSnapshot` method (lines 69-77) performs multiple non-atomic operations: 
reading `checkTimes`, incrementing it, and updating both `lastSnapshot` and 
`lastCheckTime`. If multiple threads call `recordSnapshot` concurrently, race 
conditions could occur. Consider synchronizing the method or using atomic 
operations to ensure thread safety.



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PendingJobsRestIT.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+
+@Slf4j
+public class PendingJobsRestIT {
+
+    private static final String HOST = "http://localhost:";;
+    private static final String JOB_FILE = "pending_jobs_streaming.conf";
+
+    private HazelcastInstanceImpl node;
+    private SeaTunnelClient engineClient;
+    private SeaTunnelConfig seaTunnelConfig;
+    private final List<ClientJobProxy> submittedJobs = new ArrayList<>();
+    private int httpPort;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        String testClusterName = TestUtils.getClusterName("PendingJobsRestIT");
+        seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+        
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+        seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(2);
+        
seaTunnelConfig.getEngineConfig().setScheduleStrategy(ScheduleStrategy.WAIT);
+        seaTunnelConfig.getEngineConfig().getHttpConfig().setEnabled(true);
+        
seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(false);
+        seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(18082);
+        
seaTunnelConfig.getEngineConfig().getHttpConfig().setContextPath("/seatunnel");
+        httpPort = seaTunnelConfig.getEngineConfig().getHttpConfig().getPort();
+
+        node = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+

Review Comment:
   [nitpick] The integration test uses a hardcoded port 18082 on line 74, but 
if this port is already in use, the test will fail. Consider using port 0 to 
let the system assign an available port, or use a test utility to find a free 
port. The httpPort is already stored in a variable on line 76, so the test 
could be made more robust.
   ```suggestion
           seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(0); // Use 
0 to auto-assign a free port
           
seaTunnelConfig.getEngineConfig().getHttpConfig().setContextPath("/seatunnel");
           // httpPort will be set after server starts
   
           node = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
           // Retrieve the actual port assigned by the server
           httpPort = node.getJettyServer().getURI().getPort();
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1074,6 +1091,72 @@ public ConnectorPackageService 
getConnectorPackageService() {
         return connectorPackageService;
     }
 
+    public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long 
jobId, int limit) {
+        Collection<PendingJobInfo> allPendingJobs =
+                new ArrayList<>(pendingJobQueue.getJobIdMap().values());
+
+        List<PendingJobInfo> selectedJobs = new ArrayList<>();
+        if (jobId != null) {
+            PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId);
+            if (pendingJobInfo != null) {
+                selectedJobs.add(pendingJobInfo);
+            }
+        } else {
+            selectedJobs.addAll(allPendingJobs);
+            
selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp));
+            if (limit > 0 && selectedJobs.size() > limit) {
+                selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit));
+            }
+        }
+
+        ResourceManager resourceManager = getResourceManager();
+        List<PendingJobDiagnostic> diagnostics = new ArrayList<>();
+        for (PendingJobInfo jobInfo : selectedJobs) {
+            PendingJobDiagnostic diagnostic = jobInfo.getLastSnapshot();
+            if (diagnostic == null) {
+                diagnostic =
+                        PendingDiagnosticsCollector.collectJobDiagnostic(
+                                jobInfo, tags, resourceManager);
+                if (diagnostic != null) {
+                    diagnostic.setCheckCount(jobInfo.getCheckTimes());
+                }
+            }
+            if (diagnostic != null) {
+                diagnostics.add(diagnostic);
+            }
+        }
+
+        PendingJobsResponse response = new PendingJobsResponse();
+        response.setPendingJobs(diagnostics);
+        response.setClusterSnapshot(
+                
PendingDiagnosticsCollector.collectClusterSnapshot(resourceManager, tags));
+        response.setQueueSummary(buildQueueSummary(allPendingJobs, 
diagnostics));
+        return response;
+    }
+
+    private PendingQueueSummary buildQueueSummary(
+            Collection<PendingJobInfo> pendingJobs, List<PendingJobDiagnostic> 
diagnostics) {
+        PendingQueueSummary summary = new PendingQueueSummary();
+        summary.setSize(pendingJobQueue.size());
+        summary.setScheduleStrategy(scheduleStrategy.name());
+        summary.setLackingTaskGroups(
+                
diagnostics.stream().mapToInt(PendingJobDiagnostic::getLackingTaskGroups).sum());

Review Comment:
   The `buildQueueSummary` method receives `pendingJobs` collection and 
`diagnostics` list as parameters (line 1137-1138). The `lackingTaskGroups` is 
computed from `diagnostics` on line 1143, but if `diagnostics` is a filtered 
subset (e.g., when `limit` is applied), this count won't reflect the total 
lacking task groups across all pending jobs. Consider computing this from the 
full `pendingJobs` collection or clarifying the semantics in documentation.
   ```suggestion
           // Compute lackingTaskGroups from all pendingJobs, not just 
diagnostics
           summary.setLackingTaskGroups(
                   pendingJobs.stream()
                           .mapToInt(jobInfo -> {
                               PendingJobDiagnostic diag = 
jobInfo.getLastSnapshot();
                               return diag != null ? 
diag.getLackingTaskGroups() : 0;
                           })
                           .sum());
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1074,6 +1091,72 @@ public ConnectorPackageService 
getConnectorPackageService() {
         return connectorPackageService;
     }
 
+    public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long 
jobId, int limit) {
+        Collection<PendingJobInfo> allPendingJobs =
+                new ArrayList<>(pendingJobQueue.getJobIdMap().values());
+
+        List<PendingJobInfo> selectedJobs = new ArrayList<>();
+        if (jobId != null) {
+            PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId);
+            if (pendingJobInfo != null) {
+                selectedJobs.add(pendingJobInfo);
+            }
+        } else {
+            selectedJobs.addAll(allPendingJobs);
+            
selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp));
+            if (limit > 0 && selectedJobs.size() > limit) {
+                selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit));
+            }
+        }
+
+        ResourceManager resourceManager = getResourceManager();
+        List<PendingJobDiagnostic> diagnostics = new ArrayList<>();
+        for (PendingJobInfo jobInfo : selectedJobs) {
+            PendingJobDiagnostic diagnostic = jobInfo.getLastSnapshot();
+            if (diagnostic == null) {
+                diagnostic =
+                        PendingDiagnosticsCollector.collectJobDiagnostic(
+                                jobInfo, tags, resourceManager);
+                if (diagnostic != null) {
+                    diagnostic.setCheckCount(jobInfo.getCheckTimes());

Review Comment:
   On line 1121, `diagnostic.setCheckCount(jobInfo.getCheckTimes())` is called 
when a new diagnostic is collected, but this duplicates the logic from 
`recordSnapshot` in PendingJobInfo (line 76). If the snapshot is later recorded 
via `recordSnapshot`, the checkCount will be incremented again, leading to 
inconsistency. Consider either removing this line or ensuring the diagnostic is 
not recorded again.
   ```suggestion
                   // Removed duplicate setCheckCount to avoid inconsistency; 
handled in recordSnapshot.
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.PendingJobInfo;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import 
org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+@Slf4j
+public final class PendingDiagnosticsCollector {
+
+    private static final String REASON_WAITING = "WAITING_SLOT_ASSIGNMENT";
+    private static final String REASON_RESOURCE_NOT_ENOUGH = 
"RESOURCE_NOT_ENOUGH";
+    private static final String REASON_REQUEST_FAILED = "REQUEST_FAILED";
+    private static final String REASON_REQUEST_CANCELLED = "REQUEST_CANCELLED";
+
+    private PendingDiagnosticsCollector() {}
+
+    public static PendingJobDiagnostic collectJobDiagnostic(
+            PendingJobInfo pendingJobInfo,
+            Map<String, String> tagFilter,
+            ResourceManager resourceManager) {
+        if (pendingJobInfo == null) {
+            return null;
+        }
+        JobMaster jobMaster = pendingJobInfo.getJobMaster();
+        PendingJobDiagnostic diagnostic = new PendingJobDiagnostic();
+        diagnostic.setJobId(jobMaster.getJobId());
+        
diagnostic.setJobName(jobMaster.getJobImmutableInformation().getJobName());
+        
diagnostic.setPendingSourceState(pendingJobInfo.getPendingSourceState());
+        diagnostic.setJobStatus(jobMaster.getJobStatus());
+        diagnostic.setEnqueueTimestamp(pendingJobInfo.getEnqueueTimestamp());
+        diagnostic.setCheckTime(System.currentTimeMillis());
+        diagnostic.setWaitDurationMs(
+                diagnostic.getCheckTime() - 
pendingJobInfo.getEnqueueTimestamp());
+        diagnostic.setTagFilter(
+                tagFilter == null ? Collections.emptyMap() : new 
HashMap<>(tagFilter));
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> requestFutures =
+                Optional.ofNullable(jobMaster.getPhysicalPlan())
+                        .map(PhysicalPlan::getPreApplyResourceFutures)
+                        .map(HashMap::new)
+                        .orElseGet(HashMap::new);
+
+        buildPipelineDiagnostics(jobMaster, requestFutures, diagnostic);
+        diagnostic.setTotalTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getTotalTaskGroups)
+                        .sum());
+        diagnostic.setAllocatedTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getAllocatedTaskGroups)
+                        .sum());
+        diagnostic.setLackingTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getLackingTaskGroups)
+                        .sum());
+
+        updateFailureReason(diagnostic);
+        diagnostic.setBlockingJobIds(collectBlockingJobs(resourceManager, 
jobMaster.getJobId()));
+
+        return diagnostic;
+    }
+
+    private static void buildPipelineDiagnostics(
+            JobMaster jobMaster,
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
requestFutures,
+            PendingJobDiagnostic diagnostic) {
+        PhysicalPlan plan = jobMaster.getPhysicalPlan();
+        if (plan == null) {
+            diagnostic.setFailureReason(REASON_WAITING);
+            diagnostic.setFailureMessage("Job master not initialized");
+            return;
+        }
+        for (SubPlan subPlan : plan.getPipelineList()) {
+            PendingPipelineDiagnostic pipelineDiagnostic = new 
PendingPipelineDiagnostic();
+            pipelineDiagnostic.setPipelineId(subPlan.getPipelineId());
+            pipelineDiagnostic.setPipelineName(subPlan.getPipelineFullName());
+
+            List<PhysicalVertex> vertices = new ArrayList<>();
+            vertices.addAll(subPlan.getCoordinatorVertexList());
+            vertices.addAll(subPlan.getPhysicalVertexList());
+
+            int allocated = 0;
+            int lacking = 0;
+            for (PhysicalVertex vertex : vertices) {
+                TaskGroupLocation location = vertex.getTaskGroupLocation();
+                PendingTaskGroupDiagnostic taskDiagnostic =
+                        buildTaskDiagnostic(
+                                location, vertex.getTaskFullName(), 
requestFutures.get(location));
+                
pipelineDiagnostic.getTaskGroupDiagnostics().add(taskDiagnostic);
+                if (taskDiagnostic.isAllocated()) {
+                    allocated++;
+                } else {
+                    lacking++;
+                    
diagnostic.getLackingTaskGroupDiagnostics().add(taskDiagnostic);
+                }
+            }
+
+            pipelineDiagnostic.setTotalTaskGroups(vertices.size());
+            pipelineDiagnostic.setAllocatedTaskGroups(allocated);
+            pipelineDiagnostic.setLackingTaskGroups(lacking);
+            diagnostic.getPipelines().add(pipelineDiagnostic);
+        }
+    }
+
+    private static PendingTaskGroupDiagnostic buildTaskDiagnostic(
+            TaskGroupLocation location,
+            String taskFullName,
+            CompletableFuture<SlotProfile> future) {
+        PendingTaskGroupDiagnostic diagnostic = new 
PendingTaskGroupDiagnostic();
+        diagnostic.setTaskGroupLocation(location);
+        diagnostic.setTaskFullName(taskFullName);
+
+        if (future == null) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH);
+            diagnostic.setFailureMessage("Slot request future not created");
+            return diagnostic;
+        }
+
+        if (future.isCancelled()) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_REQUEST_CANCELLED);
+            diagnostic.setFailureMessage("Slot request cancelled by resource 
manager");
+            return diagnostic;
+        }
+
+        if (!future.isDone()) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_WAITING);
+            diagnostic.setFailureMessage("Slot request still pending");
+            return diagnostic;
+        }
+        try {
+            SlotProfile slotProfile = future.join();
+            if (slotProfile != null) {
+                diagnostic.setAllocated(true);
+                return diagnostic;
+            }
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH);
+            diagnostic.setFailureMessage("No available slot profile");
+        } catch (CompletionException e) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_REQUEST_FAILED);
+            diagnostic.setFailureMessage(ExceptionUtils.getMessage(e));
+        }
+        return diagnostic;
+    }
+
+    private static void updateFailureReason(PendingJobDiagnostic diagnostic) {
+        if (diagnostic.getLackingTaskGroupDiagnostics().isEmpty()) {
+            if (diagnostic.getFailureReason() == null) {
+                diagnostic.setFailureReason(REASON_WAITING);
+                diagnostic.setFailureMessage("Job is waiting for scheduler to 
retry");
+            }
+            return;
+        }
+
+        Map<String, Long> reasonCounter =
+                diagnostic.getLackingTaskGroupDiagnostics().stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        
PendingTaskGroupDiagnostic::getFailureReason,
+                                        Collectors.counting()));
+        String dominantReason =
+                reasonCounter.entrySet().stream()
+                        .max(Map.Entry.comparingByValue())
+                        .map(Map.Entry::getKey)
+                        .orElse(REASON_RESOURCE_NOT_ENOUGH);
+        diagnostic.setFailureReason(dominantReason);
+        diagnostic.setFailureMessage(
+                diagnostic.getLackingTaskGroupDiagnostics().stream()
+                        .filter(diag -> 
dominantReason.equals(diag.getFailureReason()))
+                        .map(PendingTaskGroupDiagnostic::getFailureMessage)
+                        .filter(message -> message != null && 
!message.isEmpty())
+                        .distinct()
+                        .collect(Collectors.joining("; ")));
+    }
+
+    private static List<Long> collectBlockingJobs(ResourceManager 
resourceManager, long jobId) {
+        if (resourceManager == null) {
+            return Collections.emptyList();
+        }
+        List<SlotProfile> assignedSlots = Collections.emptyList();
+        try {
+            assignedSlots = 
resourceManager.getAssignedSlots(Collections.emptyMap());
+        } catch (Exception e) {
+            log.warn("Collect assigned slots failed: {}", 
ExceptionUtils.getMessage(e));
+        }
+        Set<Long> blocking = new HashSet<>();
+        for (SlotProfile slotProfile : assignedSlots) {
+            long ownerId = slotProfile.getOwnerJobID();
+            if (ownerId > 0 && ownerId != jobId) {
+                blocking.add(ownerId);
+            }
+        }
+        return new ArrayList<>(blocking);
+    }
+
+    public static PendingClusterSnapshot collectClusterSnapshot(
+            ResourceManager resourceManager, Map<String, String> tagFilter) {
+        PendingClusterSnapshot snapshot = new PendingClusterSnapshot();
+        if (resourceManager == null) {
+            return snapshot;
+        }
+        Map<String, String> tags =
+                tagFilter == null ? Collections.emptyMap() : new 
HashMap<>(tagFilter);
+        List<SlotProfile> assignedSlots = Collections.emptyList();
+        List<SlotProfile> unassignedSlots = Collections.emptyList();
+        try {
+            assignedSlots = resourceManager.getAssignedSlots(tags);
+            unassignedSlots = resourceManager.getUnassignedSlots(tags);
+        } catch (Exception e) {
+            log.warn("Collect slots info failed: {}", 
ExceptionUtils.getMessage(e));
+        }
+        snapshot.setAssignedSlots(assignedSlots.size());
+        snapshot.setFreeSlots(unassignedSlots.size());
+        snapshot.setTotalSlots(assignedSlots.size() + unassignedSlots.size());
+        try {
+            snapshot.setWorkerCount(resourceManager.workerCount(tags));
+        } catch (Exception e) {
+            log.warn("Collect worker count failed: {}", 
ExceptionUtils.getMessage(e));
+        }
+        snapshot.setWorkers(buildWorkerSnapshots(resourceManager, tags));
+        return snapshot;
+    }
+
+    private static List<WorkerResourceDiagnostic> buildWorkerSnapshots(
+            ResourceManager resourceManager, Map<String, String> tagFilter) {
+        if (resourceManager == null) {
+            return Collections.emptyList();
+        }
+        Map<Address, WorkerProfile> registerWorker =
+                Optional.ofNullable(resourceManager.getRegisterWorker())
+                        .map(HashMap::new)
+                        .orElseGet(HashMap::new);
+        return registerWorker.values().stream()
+                .map(worker -> convertWorker(worker, tagFilter))
+                .collect(Collectors.toList());
+    }
+
+    private static WorkerResourceDiagnostic convertWorker(
+            WorkerProfile workerProfile, Map<String, String> tagFilter) {
+        WorkerResourceDiagnostic diagnostic = new WorkerResourceDiagnostic();
+        if (workerProfile == null) {
+            return diagnostic;
+        }
+        Address address = workerProfile.getAddress();
+        diagnostic.setAddress(address == null ? "UNKNOWN" : 
address.toString());
+        if (workerProfile.getAttributes() != null) {
+            diagnostic.setTags(new HashMap<>(workerProfile.getAttributes()));
+        } else {
+            diagnostic.setTags(Collections.emptyMap());

Review Comment:
   [nitpick] The method `convertWorker` always sets the worker tags from 
`workerProfile.getAttributes()` without considering the `tagFilter` parameter 
that is passed in. If the intent is to filter workers by tags, this filtering 
is not implemented. If filtering is done elsewhere, the `tagFilter` parameter 
here is unused and should be removed for clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to