Copilot commented on code in PR #10078: URL: https://github.com/apache/seatunnel/pull/10078#discussion_r2541317358
########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java: ########## @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.diagnostic; + +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture; +import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan; +import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex; +import org.apache.seatunnel.engine.server.dag.physical.SubPlan; +import org.apache.seatunnel.engine.server.execution.PendingJobInfo; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.master.JobMaster; +import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo; +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; + +import com.hazelcast.cluster.Address; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +@Slf4j +public final class PendingDiagnosticsCollector { + + private static final String REASON_WAITING = "WAITING_SLOT_ASSIGNMENT"; + private static final String REASON_RESOURCE_NOT_ENOUGH = "RESOURCE_NOT_ENOUGH"; + private static final String REASON_REQUEST_FAILED = "REQUEST_FAILED"; + private static final String REASON_REQUEST_CANCELLED = "REQUEST_CANCELLED"; + + private PendingDiagnosticsCollector() {} + + public static PendingJobDiagnostic collectJobDiagnostic( + PendingJobInfo pendingJobInfo, + Map<String, String> tagFilter, + ResourceManager resourceManager) { + if (pendingJobInfo == null) { + return null; + } + JobMaster jobMaster = pendingJobInfo.getJobMaster(); + PendingJobDiagnostic diagnostic = new PendingJobDiagnostic(); + diagnostic.setJobId(jobMaster.getJobId()); + diagnostic.setJobName(jobMaster.getJobImmutableInformation().getJobName()); + diagnostic.setPendingSourceState(pendingJobInfo.getPendingSourceState()); + diagnostic.setJobStatus(jobMaster.getJobStatus()); + diagnostic.setEnqueueTimestamp(pendingJobInfo.getEnqueueTimestamp()); + diagnostic.setCheckTime(System.currentTimeMillis()); + diagnostic.setWaitDurationMs( + diagnostic.getCheckTime() - pendingJobInfo.getEnqueueTimestamp()); + diagnostic.setTagFilter( + tagFilter == null ? Collections.emptyMap() : new HashMap<>(tagFilter)); + Map<TaskGroupLocation, CompletableFuture<SlotProfile>> requestFutures = + Optional.ofNullable(jobMaster.getPhysicalPlan()) + .map(PhysicalPlan::getPreApplyResourceFutures) + .map(HashMap::new) + .orElseGet(HashMap::new); + + buildPipelineDiagnostics(jobMaster, requestFutures, diagnostic); + diagnostic.setTotalTaskGroups( + diagnostic.getPipelines().stream() + .mapToInt(PendingPipelineDiagnostic::getTotalTaskGroups) + .sum()); + diagnostic.setAllocatedTaskGroups( + diagnostic.getPipelines().stream() + .mapToInt(PendingPipelineDiagnostic::getAllocatedTaskGroups) + .sum()); + diagnostic.setLackingTaskGroups( + diagnostic.getPipelines().stream() + .mapToInt(PendingPipelineDiagnostic::getLackingTaskGroups) + .sum()); + + updateFailureReason(diagnostic); + diagnostic.setBlockingJobIds(collectBlockingJobs(resourceManager, jobMaster.getJobId())); + + return diagnostic; + } + + private static void buildPipelineDiagnostics( + JobMaster jobMaster, + Map<TaskGroupLocation, CompletableFuture<SlotProfile>> requestFutures, + PendingJobDiagnostic diagnostic) { + PhysicalPlan plan = jobMaster.getPhysicalPlan(); + if (plan == null) { + diagnostic.setFailureReason(REASON_WAITING); + diagnostic.setFailureMessage("Job master not initialized"); + return; + } + for (SubPlan subPlan : plan.getPipelineList()) { + PendingPipelineDiagnostic pipelineDiagnostic = new PendingPipelineDiagnostic(); + pipelineDiagnostic.setPipelineId(subPlan.getPipelineId()); + pipelineDiagnostic.setPipelineName(subPlan.getPipelineFullName()); + + List<PhysicalVertex> vertices = new ArrayList<>(); + vertices.addAll(subPlan.getCoordinatorVertexList()); + vertices.addAll(subPlan.getPhysicalVertexList()); + + int allocated = 0; + int lacking = 0; + for (PhysicalVertex vertex : vertices) { + TaskGroupLocation location = vertex.getTaskGroupLocation(); + PendingTaskGroupDiagnostic taskDiagnostic = + buildTaskDiagnostic( + location, vertex.getTaskFullName(), requestFutures.get(location)); + pipelineDiagnostic.getTaskGroupDiagnostics().add(taskDiagnostic); + if (taskDiagnostic.isAllocated()) { + allocated++; + } else { + lacking++; + diagnostic.getLackingTaskGroupDiagnostics().add(taskDiagnostic); + } + } + + pipelineDiagnostic.setTotalTaskGroups(vertices.size()); + pipelineDiagnostic.setAllocatedTaskGroups(allocated); + pipelineDiagnostic.setLackingTaskGroups(lacking); + diagnostic.getPipelines().add(pipelineDiagnostic); + } + } + + private static PendingTaskGroupDiagnostic buildTaskDiagnostic( + TaskGroupLocation location, + String taskFullName, + CompletableFuture<SlotProfile> future) { + PendingTaskGroupDiagnostic diagnostic = new PendingTaskGroupDiagnostic(); + diagnostic.setTaskGroupLocation(location); + diagnostic.setTaskFullName(taskFullName); + + if (future == null) { + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH); + diagnostic.setFailureMessage("Slot request future not created"); + return diagnostic; + } + + if (future.isCancelled()) { + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_REQUEST_CANCELLED); + diagnostic.setFailureMessage("Slot request cancelled by resource manager"); + return diagnostic; + } + + if (!future.isDone()) { + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_WAITING); + diagnostic.setFailureMessage("Slot request still pending"); + return diagnostic; + } + try { + SlotProfile slotProfile = future.join(); + if (slotProfile != null) { + diagnostic.setAllocated(true); + return diagnostic; + } + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH); + diagnostic.setFailureMessage("No available slot profile"); + } catch (CompletionException e) { + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_REQUEST_FAILED); + diagnostic.setFailureMessage(ExceptionUtils.getMessage(e)); + } + return diagnostic; + } + + private static void updateFailureReason(PendingJobDiagnostic diagnostic) { + if (diagnostic.getLackingTaskGroupDiagnostics().isEmpty()) { + if (diagnostic.getFailureReason() == null) { + diagnostic.setFailureReason(REASON_WAITING); + diagnostic.setFailureMessage("Job is waiting for scheduler to retry"); + } + return; + } + + Map<String, Long> reasonCounter = + diagnostic.getLackingTaskGroupDiagnostics().stream() + .collect( + Collectors.groupingBy( + PendingTaskGroupDiagnostic::getFailureReason, + Collectors.counting())); + String dominantReason = + reasonCounter.entrySet().stream() + .max(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .orElse(REASON_RESOURCE_NOT_ENOUGH); + diagnostic.setFailureReason(dominantReason); + diagnostic.setFailureMessage( + diagnostic.getLackingTaskGroupDiagnostics().stream() + .filter(diag -> dominantReason.equals(diag.getFailureReason())) + .map(PendingTaskGroupDiagnostic::getFailureMessage) + .filter(message -> message != null && !message.isEmpty()) + .distinct() + .collect(Collectors.joining("; "))); + } + + private static List<Long> collectBlockingJobs(ResourceManager resourceManager, long jobId) { + if (resourceManager == null) { + return Collections.emptyList(); + } + List<SlotProfile> assignedSlots = Collections.emptyList(); + try { + assignedSlots = resourceManager.getAssignedSlots(Collections.emptyMap()); Review Comment: The method collects blocking job IDs by getting all assigned slots with an empty tag filter on line 225, but the pending job may have specific tag requirements. This could report jobs as "blocking" even when they use slots with different tags that wouldn't be available to the pending job anyway. Consider passing the job's `tagFilter` to `getAssignedSlots()` to only report truly blocking jobs. ```suggestion private static List<Long> collectBlockingJobs(ResourceManager resourceManager, long jobId, Map<String, String> tagFilter) { if (resourceManager == null) { return Collections.emptyList(); } Map<String, String> tags = tagFilter == null ? Collections.emptyMap() : new HashMap<>(tagFilter); List<SlotProfile> assignedSlots = Collections.emptyList(); try { assignedSlots = resourceManager.getAssignedSlots(tags); ``` ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest.servlet; + +import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse; +import org.apache.seatunnel.engine.server.rest.RestConstant; +import org.apache.seatunnel.engine.server.rest.service.PendingJobsService; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.hazelcast.spi.impl.NodeEngineImpl; +import lombok.extern.slf4j.Slf4j; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +@Slf4j +public class PendingJobsServlet extends BaseServlet { + + private final PendingJobsService pendingJobsService; + private static final Set<String> TIMESTAMP_FIELDS = + new HashSet<>( + Arrays.asList( + "oldestEnqueueTimestamp", + "newestEnqueueTimestamp", + "enqueueTimestamp", + "checkTime")); + private static final DateTimeFormatter PRETTY_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); + private static final Gson PRETTY_GSON = new GsonBuilder().setPrettyPrinting().create(); + + public PendingJobsServlet(NodeEngineImpl nodeEngine) { + super(nodeEngine); + this.pendingJobsService = new PendingJobsService(nodeEngine); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + + Map<String, String> params = new HashMap<>(getParameterMap(req)); + Long jobId = null; + int limit = 0; + boolean pretty = false; + if (params.containsKey(RestConstant.JOB_ID)) { + try { + jobId = Long.parseLong(params.remove(RestConstant.JOB_ID)); + } catch (NumberFormatException e) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid jobId"); + return; + } + } + + if (params.containsKey(RestConstant.LIMIT)) { + try { + limit = Integer.parseInt(params.remove(RestConstant.LIMIT)); + } catch (NumberFormatException e) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid limit"); + return; + } + } + + if (params.containsKey(RestConstant.PRETTY)) { + pretty = Boolean.parseBoolean(params.remove(RestConstant.PRETTY)); + } + + PendingJobsResponse response = pendingJobsService.getPendingJobs(params, jobId, limit); + if (pretty) { + writePrettyResponse(resp, response); + } else { + writeJson(resp, response); + } + } + + private void writePrettyResponse(HttpServletResponse resp, PendingJobsResponse response) + throws IOException { + JsonElement tree = PRETTY_GSON.toJsonTree(response); + formatTimestampFields(tree); + resp.setCharacterEncoding("UTF-8"); + resp.setContentType("application/json; charset=UTF-8"); + resp.getWriter().write(PRETTY_GSON.toJson(tree)); + } + + private void formatTimestampFields(JsonElement element) { + if (element == null || element.isJsonNull()) { + return; + } + if (element.isJsonObject()) { + JsonObject object = element.getAsJsonObject(); + for (Map.Entry<String, JsonElement> entry : object.entrySet()) { + JsonElement value = entry.getValue(); + if (shouldFormatTimestamp(entry.getKey(), value)) { + long timestamp = value.getAsLong(); + object.addProperty(entry.getKey(), formatTimestamp(timestamp)); + } else { + formatTimestampFields(value); + } + } + } else if (element.isJsonArray()) { + JsonArray array = element.getAsJsonArray(); + for (JsonElement child : array) { + formatTimestampFields(child); + } + } + } + + private boolean shouldFormatTimestamp(String key, JsonElement element) { + if (!TIMESTAMP_FIELDS.contains(key) || element == null) { Review Comment: [nitpick] The method `shouldFormatTimestamp` returns `false` if `element` is null on line 138, but this check is redundant since line 141 already checks `!element.isJsonPrimitive()`, and line 115 guards against null elements at the call site. Consider simplifying by removing the null check on line 138. ```suggestion if (!TIMESTAMP_FIELDS.contains(key)) { ``` ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java: ########## @@ -1074,6 +1091,72 @@ public ConnectorPackageService getConnectorPackageService() { return connectorPackageService; } + public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long jobId, int limit) { + Collection<PendingJobInfo> allPendingJobs = + new ArrayList<>(pendingJobQueue.getJobIdMap().values()); + + List<PendingJobInfo> selectedJobs = new ArrayList<>(); + if (jobId != null) { + PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId); + if (pendingJobInfo != null) { + selectedJobs.add(pendingJobInfo); + } + } else { + selectedJobs.addAll(allPendingJobs); + selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp)); + if (limit > 0 && selectedJobs.size() > limit) { + selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit)); + } Review Comment: In line 1108, when `limit > 0` and the list size exceeds the limit, a new ArrayList is created with `subList(0, limit)`. However, if `limit` is negative, it's treated as "no limit" per the check on line 1107. The API documentation should clarify this behavior, as negative values for `limit` are unconventional. Consider rejecting negative values or documenting them explicitly. ########## seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollectorTest.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.diagnostic; + +import org.apache.seatunnel.engine.common.job.JobStatus; +import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan; +import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex; +import org.apache.seatunnel.engine.server.dag.physical.SubPlan; +import org.apache.seatunnel.engine.server.execution.PendingJobInfo; +import org.apache.seatunnel.engine.server.execution.PendingSourceState; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.master.JobMaster; +import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class PendingDiagnosticsCollectorTest { + + @Test + public void testCollectJobDiagnosticWithFailures() { + JobMaster jobMaster = Mockito.mock(JobMaster.class); + Mockito.when(jobMaster.getJobId()).thenReturn(1000L); + JobImmutableInformation jobImmutableInformation = + Mockito.mock(JobImmutableInformation.class); + Mockito.when(jobImmutableInformation.getJobName()).thenReturn("test_job"); + Mockito.when(jobMaster.getJobImmutableInformation()).thenReturn(jobImmutableInformation); + Mockito.when(jobMaster.getJobStatus()).thenReturn(JobStatus.PENDING); + + PhysicalPlan physicalPlan = Mockito.mock(PhysicalPlan.class); + Mockito.when(jobMaster.getPhysicalPlan()).thenReturn(physicalPlan); + + SubPlan subPlan = Mockito.mock(SubPlan.class); + Mockito.when(subPlan.getPipelineId()).thenReturn(1); + Mockito.when(subPlan.getPipelineFullName()).thenReturn("pipeline-1"); + + PhysicalVertex vertexSuccess = Mockito.mock(PhysicalVertex.class); + TaskGroupLocation locationSuccess = new TaskGroupLocation(1000L, 1, 1L); + Mockito.when(vertexSuccess.getTaskGroupLocation()).thenReturn(locationSuccess); + Mockito.when(vertexSuccess.getTaskFullName()).thenReturn("task-success"); + + PhysicalVertex vertexFailA = Mockito.mock(PhysicalVertex.class); + TaskGroupLocation locationFailA = new TaskGroupLocation(1000L, 1, 2L); + Mockito.when(vertexFailA.getTaskGroupLocation()).thenReturn(locationFailA); + Mockito.when(vertexFailA.getTaskFullName()).thenReturn("task-fail-a"); + + PhysicalVertex vertexFailB = Mockito.mock(PhysicalVertex.class); + TaskGroupLocation locationFailB = new TaskGroupLocation(1000L, 1, 3L); + Mockito.when(vertexFailB.getTaskGroupLocation()).thenReturn(locationFailB); + Mockito.when(vertexFailB.getTaskFullName()).thenReturn("task-fail-b"); + + Mockito.when(subPlan.getCoordinatorVertexList()).thenReturn(Collections.emptyList()); + Mockito.when(subPlan.getPhysicalVertexList()) + .thenReturn(Arrays.asList(vertexSuccess, vertexFailA, vertexFailB)); + Mockito.when(physicalPlan.getPipelineList()).thenReturn(Collections.singletonList(subPlan)); + + Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new HashMap<>(); + CompletableFuture<SlotProfile> successFuture = + CompletableFuture.completedFuture(Mockito.mock(SlotProfile.class)); + futures.put(locationSuccess, successFuture); + + CompletableFuture<SlotProfile> failFutureA = new CompletableFuture<>(); + failFutureA.completeExceptionally(new RuntimeException("no slot available")); + futures.put(locationFailA, failFutureA); + + CompletableFuture<SlotProfile> failFutureB = new CompletableFuture<>(); + failFutureB.completeExceptionally(new RuntimeException("worker busy")); + futures.put(locationFailB, failFutureB); + + Mockito.when(physicalPlan.getPreApplyResourceFutures()).thenReturn(futures); + + PendingJobInfo pendingJobInfo = new PendingJobInfo(PendingSourceState.SUBMIT, jobMaster); + + ResourceManager resourceManager = Mockito.mock(ResourceManager.class); + SlotProfile blockingSlot = Mockito.mock(SlotProfile.class); + Mockito.when(blockingSlot.getOwnerJobID()).thenReturn(2000L); + Mockito.when(resourceManager.getAssignedSlots(Mockito.anyMap())) + .thenReturn(Collections.singletonList(blockingSlot)); + + PendingJobDiagnostic diagnostic = + PendingDiagnosticsCollector.collectJobDiagnostic( + pendingJobInfo, Collections.emptyMap(), resourceManager); + Review Comment: [nitpick] In the test, the mock returns "test_job" for `getJobName()` on line 50, but the assertion on line 108 checks for `lackingTaskGroups` to equal 2. However, the test setup on lines 82-92 creates 3 futures: 1 success and 2 failures. This means all 3 task groups are checked, but only 2 should be marked as "lacking" (the failed ones). The assertion is correct but would benefit from a comment explaining that successful allocations are not counted as "lacking". ```suggestion // Only the failed task group allocations are counted as "lacking". // Of the 3 task groups, 1 allocation succeeded and 2 failed, so we expect 2 lacking task groups. ``` ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest.servlet; + +import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse; +import org.apache.seatunnel.engine.server.rest.RestConstant; +import org.apache.seatunnel.engine.server.rest.service.PendingJobsService; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.hazelcast.spi.impl.NodeEngineImpl; +import lombok.extern.slf4j.Slf4j; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +@Slf4j +public class PendingJobsServlet extends BaseServlet { + + private final PendingJobsService pendingJobsService; + private static final Set<String> TIMESTAMP_FIELDS = + new HashSet<>( + Arrays.asList( + "oldestEnqueueTimestamp", + "newestEnqueueTimestamp", + "enqueueTimestamp", + "checkTime")); + private static final DateTimeFormatter PRETTY_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); + private static final Gson PRETTY_GSON = new GsonBuilder().setPrettyPrinting().create(); + + public PendingJobsServlet(NodeEngineImpl nodeEngine) { + super(nodeEngine); + this.pendingJobsService = new PendingJobsService(nodeEngine); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + + Map<String, String> params = new HashMap<>(getParameterMap(req)); + Long jobId = null; + int limit = 0; + boolean pretty = false; + if (params.containsKey(RestConstant.JOB_ID)) { + try { + jobId = Long.parseLong(params.remove(RestConstant.JOB_ID)); + } catch (NumberFormatException e) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid jobId"); + return; + } + } + + if (params.containsKey(RestConstant.LIMIT)) { + try { + limit = Integer.parseInt(params.remove(RestConstant.LIMIT)); + } catch (NumberFormatException e) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid limit"); + return; + } + } Review Comment: The `limit` parameter allows a value of 0, which would result in no jobs being returned even if jobs are pending. Consider adding validation to ensure `limit` is either negative (no limit) or a positive integer. For example, reject `limit=0` with a `400 BAD_REQUEST` or treat it as "no limit". ########## docs/en/seatunnel-engine/rest-api-v2.md: ########## @@ -956,4 +1109,4 @@ To get the metrics, you need to open `Telemetry` first, or you will get an empty More information about `Telemetry` can be found in the [Telemetry](telemetry.md) documentation. -</details> \ No newline at end of file +</details> Review Comment: The English documentation has incorrect indentation on line 1112, where `</details>` is misaligned with extra leading whitespace. This should be at the same indentation level as the opening tag to maintain consistency with other sections. ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java: ########## @@ -40,4 +49,30 @@ public JobMaster getJobMaster() { public Long getJobId() { return jobMaster.getJobId(); } + + public long getEnqueueTimestamp() { + return enqueueTimestamp; + } + + public long getLastCheckTime() { + return lastCheckTime; + } + + public int getCheckTimes() { + return checkTimes.get(); + } + + public PendingJobDiagnostic getLastSnapshot() { + return lastSnapshot; + } + + public void recordSnapshot(PendingJobDiagnostic snapshot) { + if (snapshot == null) { + return; + } + this.lastSnapshot = snapshot; + this.lastCheckTime = snapshot.getCheckTime(); + int current = this.checkTimes.incrementAndGet(); + snapshot.setCheckCount(current); + } Review Comment: The `recordSnapshot` method should validate that the snapshot's `checkTime` is not earlier than the current `lastCheckTime` to prevent recording outdated snapshots. Consider adding a check: `if (snapshot.getCheckTime() < this.lastCheckTime) return;` before updating the fields. ########## docs/zh/seatunnel-engine/rest-api-v2.md: ########## @@ -945,4 +1096,4 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@" 更多关于`Telemetry`的信息可以在[Telemetry](telemetry.md)文档中找到。 -</details> \ No newline at end of file +</details> Review Comment: The Chinese documentation has incorrect indentation on line 1099, where `</details>` is misaligned. This should be at the same indentation level as the opening tag to maintain consistency with other sections. ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java: ########## @@ -1074,6 +1091,72 @@ public ConnectorPackageService getConnectorPackageService() { return connectorPackageService; } + public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long jobId, int limit) { + Collection<PendingJobInfo> allPendingJobs = + new ArrayList<>(pendingJobQueue.getJobIdMap().values()); + + List<PendingJobInfo> selectedJobs = new ArrayList<>(); + if (jobId != null) { + PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId); + if (pendingJobInfo != null) { + selectedJobs.add(pendingJobInfo); + } + } else { + selectedJobs.addAll(allPendingJobs); + selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp)); + if (limit > 0 && selectedJobs.size() > limit) { + selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit)); + } + } Review Comment: The documentation states that `limit` and `jobId` parameters are mutually exclusive (line 132 in Chinese version, line 136 in English version), but the servlet implementation doesn't enforce this. If both are provided, `jobId` takes precedence and `limit` is silently ignored. Consider either enforcing mutual exclusivity with a 400 error or updating the documentation to reflect the actual precedence behavior. ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java: ########## @@ -40,4 +49,30 @@ public JobMaster getJobMaster() { public Long getJobId() { return jobMaster.getJobId(); } + + public long getEnqueueTimestamp() { + return enqueueTimestamp; + } + + public long getLastCheckTime() { + return lastCheckTime; + } + + public int getCheckTimes() { + return checkTimes.get(); + } + + public PendingJobDiagnostic getLastSnapshot() { + return lastSnapshot; + } + + public void recordSnapshot(PendingJobDiagnostic snapshot) { + if (snapshot == null) { + return; + } + this.lastSnapshot = snapshot; + this.lastCheckTime = snapshot.getCheckTime(); + int current = this.checkTimes.incrementAndGet(); + snapshot.setCheckCount(current); + } Review Comment: The `lastSnapshot` field is marked as `volatile` on line 32, but the `recordSnapshot` method (lines 69-77) performs multiple non-atomic operations: reading `checkTimes`, incrementing it, and updating both `lastSnapshot` and `lastCheckTime`. If multiple threads call `recordSnapshot` concurrently, race conditions could occur. Consider synchronizing the method or using atomic operations to ensure thread safety. ########## seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PendingJobsRestIT.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy; +import org.apache.seatunnel.engine.common.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.engine.server.rest.RestConstant; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import io.restassured.response.Response; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static io.restassured.RestAssured.given; + +@Slf4j +public class PendingJobsRestIT { + + private static final String HOST = "http://localhost:"; + private static final String JOB_FILE = "pending_jobs_streaming.conf"; + + private HazelcastInstanceImpl node; + private SeaTunnelClient engineClient; + private SeaTunnelConfig seaTunnelConfig; + private final List<ClientJobProxy> submittedJobs = new ArrayList<>(); + private int httpPort; + + @BeforeEach + void setUp() throws Exception { + String testClusterName = TestUtils.getClusterName("PendingJobsRestIT"); + seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName); + seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false); + seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(2); + seaTunnelConfig.getEngineConfig().setScheduleStrategy(ScheduleStrategy.WAIT); + seaTunnelConfig.getEngineConfig().getHttpConfig().setEnabled(true); + seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(false); + seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(18082); + seaTunnelConfig.getEngineConfig().getHttpConfig().setContextPath("/seatunnel"); + httpPort = seaTunnelConfig.getEngineConfig().getHttpConfig().getPort(); + + node = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + Review Comment: [nitpick] The integration test uses a hardcoded port 18082 on line 74, but if this port is already in use, the test will fail. Consider using port 0 to let the system assign an available port, or use a test utility to find a free port. The httpPort is already stored in a variable on line 76, so the test could be made more robust. ```suggestion seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(0); // Use 0 to auto-assign a free port seaTunnelConfig.getEngineConfig().getHttpConfig().setContextPath("/seatunnel"); // httpPort will be set after server starts node = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); // Retrieve the actual port assigned by the server httpPort = node.getJettyServer().getURI().getPort(); ``` ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java: ########## @@ -1074,6 +1091,72 @@ public ConnectorPackageService getConnectorPackageService() { return connectorPackageService; } + public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long jobId, int limit) { + Collection<PendingJobInfo> allPendingJobs = + new ArrayList<>(pendingJobQueue.getJobIdMap().values()); + + List<PendingJobInfo> selectedJobs = new ArrayList<>(); + if (jobId != null) { + PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId); + if (pendingJobInfo != null) { + selectedJobs.add(pendingJobInfo); + } + } else { + selectedJobs.addAll(allPendingJobs); + selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp)); + if (limit > 0 && selectedJobs.size() > limit) { + selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit)); + } + } + + ResourceManager resourceManager = getResourceManager(); + List<PendingJobDiagnostic> diagnostics = new ArrayList<>(); + for (PendingJobInfo jobInfo : selectedJobs) { + PendingJobDiagnostic diagnostic = jobInfo.getLastSnapshot(); + if (diagnostic == null) { + diagnostic = + PendingDiagnosticsCollector.collectJobDiagnostic( + jobInfo, tags, resourceManager); + if (diagnostic != null) { + diagnostic.setCheckCount(jobInfo.getCheckTimes()); + } + } + if (diagnostic != null) { + diagnostics.add(diagnostic); + } + } + + PendingJobsResponse response = new PendingJobsResponse(); + response.setPendingJobs(diagnostics); + response.setClusterSnapshot( + PendingDiagnosticsCollector.collectClusterSnapshot(resourceManager, tags)); + response.setQueueSummary(buildQueueSummary(allPendingJobs, diagnostics)); + return response; + } + + private PendingQueueSummary buildQueueSummary( + Collection<PendingJobInfo> pendingJobs, List<PendingJobDiagnostic> diagnostics) { + PendingQueueSummary summary = new PendingQueueSummary(); + summary.setSize(pendingJobQueue.size()); + summary.setScheduleStrategy(scheduleStrategy.name()); + summary.setLackingTaskGroups( + diagnostics.stream().mapToInt(PendingJobDiagnostic::getLackingTaskGroups).sum()); Review Comment: The `buildQueueSummary` method receives `pendingJobs` collection and `diagnostics` list as parameters (line 1137-1138). The `lackingTaskGroups` is computed from `diagnostics` on line 1143, but if `diagnostics` is a filtered subset (e.g., when `limit` is applied), this count won't reflect the total lacking task groups across all pending jobs. Consider computing this from the full `pendingJobs` collection or clarifying the semantics in documentation. ```suggestion // Compute lackingTaskGroups from all pendingJobs, not just diagnostics summary.setLackingTaskGroups( pendingJobs.stream() .mapToInt(jobInfo -> { PendingJobDiagnostic diag = jobInfo.getLastSnapshot(); return diag != null ? diag.getLackingTaskGroups() : 0; }) .sum()); ``` ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java: ########## @@ -1074,6 +1091,72 @@ public ConnectorPackageService getConnectorPackageService() { return connectorPackageService; } + public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long jobId, int limit) { + Collection<PendingJobInfo> allPendingJobs = + new ArrayList<>(pendingJobQueue.getJobIdMap().values()); + + List<PendingJobInfo> selectedJobs = new ArrayList<>(); + if (jobId != null) { + PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId); + if (pendingJobInfo != null) { + selectedJobs.add(pendingJobInfo); + } + } else { + selectedJobs.addAll(allPendingJobs); + selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp)); + if (limit > 0 && selectedJobs.size() > limit) { + selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit)); + } + } + + ResourceManager resourceManager = getResourceManager(); + List<PendingJobDiagnostic> diagnostics = new ArrayList<>(); + for (PendingJobInfo jobInfo : selectedJobs) { + PendingJobDiagnostic diagnostic = jobInfo.getLastSnapshot(); + if (diagnostic == null) { + diagnostic = + PendingDiagnosticsCollector.collectJobDiagnostic( + jobInfo, tags, resourceManager); + if (diagnostic != null) { + diagnostic.setCheckCount(jobInfo.getCheckTimes()); Review Comment: On line 1121, `diagnostic.setCheckCount(jobInfo.getCheckTimes())` is called when a new diagnostic is collected, but this duplicates the logic from `recordSnapshot` in PendingJobInfo (line 76). If the snapshot is later recorded via `recordSnapshot`, the checkCount will be incremented again, leading to inconsistency. Consider either removing this line or ensuring the diagnostic is not recorded again. ```suggestion // Removed duplicate setCheckCount to avoid inconsistency; handled in recordSnapshot. ``` ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java: ########## @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.diagnostic; + +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture; +import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan; +import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex; +import org.apache.seatunnel.engine.server.dag.physical.SubPlan; +import org.apache.seatunnel.engine.server.execution.PendingJobInfo; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.master.JobMaster; +import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo; +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; + +import com.hazelcast.cluster.Address; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +@Slf4j +public final class PendingDiagnosticsCollector { + + private static final String REASON_WAITING = "WAITING_SLOT_ASSIGNMENT"; + private static final String REASON_RESOURCE_NOT_ENOUGH = "RESOURCE_NOT_ENOUGH"; + private static final String REASON_REQUEST_FAILED = "REQUEST_FAILED"; + private static final String REASON_REQUEST_CANCELLED = "REQUEST_CANCELLED"; + + private PendingDiagnosticsCollector() {} + + public static PendingJobDiagnostic collectJobDiagnostic( + PendingJobInfo pendingJobInfo, + Map<String, String> tagFilter, + ResourceManager resourceManager) { + if (pendingJobInfo == null) { + return null; + } + JobMaster jobMaster = pendingJobInfo.getJobMaster(); + PendingJobDiagnostic diagnostic = new PendingJobDiagnostic(); + diagnostic.setJobId(jobMaster.getJobId()); + diagnostic.setJobName(jobMaster.getJobImmutableInformation().getJobName()); + diagnostic.setPendingSourceState(pendingJobInfo.getPendingSourceState()); + diagnostic.setJobStatus(jobMaster.getJobStatus()); + diagnostic.setEnqueueTimestamp(pendingJobInfo.getEnqueueTimestamp()); + diagnostic.setCheckTime(System.currentTimeMillis()); + diagnostic.setWaitDurationMs( + diagnostic.getCheckTime() - pendingJobInfo.getEnqueueTimestamp()); + diagnostic.setTagFilter( + tagFilter == null ? Collections.emptyMap() : new HashMap<>(tagFilter)); + Map<TaskGroupLocation, CompletableFuture<SlotProfile>> requestFutures = + Optional.ofNullable(jobMaster.getPhysicalPlan()) + .map(PhysicalPlan::getPreApplyResourceFutures) + .map(HashMap::new) + .orElseGet(HashMap::new); + + buildPipelineDiagnostics(jobMaster, requestFutures, diagnostic); + diagnostic.setTotalTaskGroups( + diagnostic.getPipelines().stream() + .mapToInt(PendingPipelineDiagnostic::getTotalTaskGroups) + .sum()); + diagnostic.setAllocatedTaskGroups( + diagnostic.getPipelines().stream() + .mapToInt(PendingPipelineDiagnostic::getAllocatedTaskGroups) + .sum()); + diagnostic.setLackingTaskGroups( + diagnostic.getPipelines().stream() + .mapToInt(PendingPipelineDiagnostic::getLackingTaskGroups) + .sum()); + + updateFailureReason(diagnostic); + diagnostic.setBlockingJobIds(collectBlockingJobs(resourceManager, jobMaster.getJobId())); + + return diagnostic; + } + + private static void buildPipelineDiagnostics( + JobMaster jobMaster, + Map<TaskGroupLocation, CompletableFuture<SlotProfile>> requestFutures, + PendingJobDiagnostic diagnostic) { + PhysicalPlan plan = jobMaster.getPhysicalPlan(); + if (plan == null) { + diagnostic.setFailureReason(REASON_WAITING); + diagnostic.setFailureMessage("Job master not initialized"); + return; + } + for (SubPlan subPlan : plan.getPipelineList()) { + PendingPipelineDiagnostic pipelineDiagnostic = new PendingPipelineDiagnostic(); + pipelineDiagnostic.setPipelineId(subPlan.getPipelineId()); + pipelineDiagnostic.setPipelineName(subPlan.getPipelineFullName()); + + List<PhysicalVertex> vertices = new ArrayList<>(); + vertices.addAll(subPlan.getCoordinatorVertexList()); + vertices.addAll(subPlan.getPhysicalVertexList()); + + int allocated = 0; + int lacking = 0; + for (PhysicalVertex vertex : vertices) { + TaskGroupLocation location = vertex.getTaskGroupLocation(); + PendingTaskGroupDiagnostic taskDiagnostic = + buildTaskDiagnostic( + location, vertex.getTaskFullName(), requestFutures.get(location)); + pipelineDiagnostic.getTaskGroupDiagnostics().add(taskDiagnostic); + if (taskDiagnostic.isAllocated()) { + allocated++; + } else { + lacking++; + diagnostic.getLackingTaskGroupDiagnostics().add(taskDiagnostic); + } + } + + pipelineDiagnostic.setTotalTaskGroups(vertices.size()); + pipelineDiagnostic.setAllocatedTaskGroups(allocated); + pipelineDiagnostic.setLackingTaskGroups(lacking); + diagnostic.getPipelines().add(pipelineDiagnostic); + } + } + + private static PendingTaskGroupDiagnostic buildTaskDiagnostic( + TaskGroupLocation location, + String taskFullName, + CompletableFuture<SlotProfile> future) { + PendingTaskGroupDiagnostic diagnostic = new PendingTaskGroupDiagnostic(); + diagnostic.setTaskGroupLocation(location); + diagnostic.setTaskFullName(taskFullName); + + if (future == null) { + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH); + diagnostic.setFailureMessage("Slot request future not created"); + return diagnostic; + } + + if (future.isCancelled()) { + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_REQUEST_CANCELLED); + diagnostic.setFailureMessage("Slot request cancelled by resource manager"); + return diagnostic; + } + + if (!future.isDone()) { + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_WAITING); + diagnostic.setFailureMessage("Slot request still pending"); + return diagnostic; + } + try { + SlotProfile slotProfile = future.join(); + if (slotProfile != null) { + diagnostic.setAllocated(true); + return diagnostic; + } + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH); + diagnostic.setFailureMessage("No available slot profile"); + } catch (CompletionException e) { + diagnostic.setAllocated(false); + diagnostic.setFailureReason(REASON_REQUEST_FAILED); + diagnostic.setFailureMessage(ExceptionUtils.getMessage(e)); + } + return diagnostic; + } + + private static void updateFailureReason(PendingJobDiagnostic diagnostic) { + if (diagnostic.getLackingTaskGroupDiagnostics().isEmpty()) { + if (diagnostic.getFailureReason() == null) { + diagnostic.setFailureReason(REASON_WAITING); + diagnostic.setFailureMessage("Job is waiting for scheduler to retry"); + } + return; + } + + Map<String, Long> reasonCounter = + diagnostic.getLackingTaskGroupDiagnostics().stream() + .collect( + Collectors.groupingBy( + PendingTaskGroupDiagnostic::getFailureReason, + Collectors.counting())); + String dominantReason = + reasonCounter.entrySet().stream() + .max(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .orElse(REASON_RESOURCE_NOT_ENOUGH); + diagnostic.setFailureReason(dominantReason); + diagnostic.setFailureMessage( + diagnostic.getLackingTaskGroupDiagnostics().stream() + .filter(diag -> dominantReason.equals(diag.getFailureReason())) + .map(PendingTaskGroupDiagnostic::getFailureMessage) + .filter(message -> message != null && !message.isEmpty()) + .distinct() + .collect(Collectors.joining("; "))); + } + + private static List<Long> collectBlockingJobs(ResourceManager resourceManager, long jobId) { + if (resourceManager == null) { + return Collections.emptyList(); + } + List<SlotProfile> assignedSlots = Collections.emptyList(); + try { + assignedSlots = resourceManager.getAssignedSlots(Collections.emptyMap()); + } catch (Exception e) { + log.warn("Collect assigned slots failed: {}", ExceptionUtils.getMessage(e)); + } + Set<Long> blocking = new HashSet<>(); + for (SlotProfile slotProfile : assignedSlots) { + long ownerId = slotProfile.getOwnerJobID(); + if (ownerId > 0 && ownerId != jobId) { + blocking.add(ownerId); + } + } + return new ArrayList<>(blocking); + } + + public static PendingClusterSnapshot collectClusterSnapshot( + ResourceManager resourceManager, Map<String, String> tagFilter) { + PendingClusterSnapshot snapshot = new PendingClusterSnapshot(); + if (resourceManager == null) { + return snapshot; + } + Map<String, String> tags = + tagFilter == null ? Collections.emptyMap() : new HashMap<>(tagFilter); + List<SlotProfile> assignedSlots = Collections.emptyList(); + List<SlotProfile> unassignedSlots = Collections.emptyList(); + try { + assignedSlots = resourceManager.getAssignedSlots(tags); + unassignedSlots = resourceManager.getUnassignedSlots(tags); + } catch (Exception e) { + log.warn("Collect slots info failed: {}", ExceptionUtils.getMessage(e)); + } + snapshot.setAssignedSlots(assignedSlots.size()); + snapshot.setFreeSlots(unassignedSlots.size()); + snapshot.setTotalSlots(assignedSlots.size() + unassignedSlots.size()); + try { + snapshot.setWorkerCount(resourceManager.workerCount(tags)); + } catch (Exception e) { + log.warn("Collect worker count failed: {}", ExceptionUtils.getMessage(e)); + } + snapshot.setWorkers(buildWorkerSnapshots(resourceManager, tags)); + return snapshot; + } + + private static List<WorkerResourceDiagnostic> buildWorkerSnapshots( + ResourceManager resourceManager, Map<String, String> tagFilter) { + if (resourceManager == null) { + return Collections.emptyList(); + } + Map<Address, WorkerProfile> registerWorker = + Optional.ofNullable(resourceManager.getRegisterWorker()) + .map(HashMap::new) + .orElseGet(HashMap::new); + return registerWorker.values().stream() + .map(worker -> convertWorker(worker, tagFilter)) + .collect(Collectors.toList()); + } + + private static WorkerResourceDiagnostic convertWorker( + WorkerProfile workerProfile, Map<String, String> tagFilter) { + WorkerResourceDiagnostic diagnostic = new WorkerResourceDiagnostic(); + if (workerProfile == null) { + return diagnostic; + } + Address address = workerProfile.getAddress(); + diagnostic.setAddress(address == null ? "UNKNOWN" : address.toString()); + if (workerProfile.getAttributes() != null) { + diagnostic.setTags(new HashMap<>(workerProfile.getAttributes())); + } else { + diagnostic.setTags(Collections.emptyMap()); Review Comment: [nitpick] The method `convertWorker` always sets the worker tags from `workerProfile.getAttributes()` without considering the `tagFilter` parameter that is passed in. If the intent is to filter workers by tags, this filtering is not implemented. If filtering is done elsewhere, the `tagFilter` parameter here is unused and should be removed for clarity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
