thirdparty-core opened a new issue, #14688: URL: https://github.com/apache/dolphinscheduler/issues/14688
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/dolphinscheduler/issues?q=is%3Aissue) and found no similar issues. ### What happened 在`WorkflowExecuteRunnable#submitStandByTask`方法中,当从readyToSubmitTaskQueue队列peek出的taskInstance实例的dependResult是WAITING时,由于peek并不能从readyToSubmitTaskQueue中remove掉taskInstance,将不能继续peek readyToSubmitTaskQueue中其他taskInstance做判断。在处理sub_process的processInstance时,导致某些情况下,sub_process的子任务一直不运行,而sub_process状态一直处于running状态。 代码如下: ```java int length = readyToSubmitTaskQueue.size(); for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue.peek(); if (task == null) { continue; } // stop tasks which is retrying if forced success happens if (task.taskCanRetry()) { TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { task.setState(retryTask.getState()); logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); removeTaskFromStandbyList(task); completeTaskMap.put(task.getTaskCode(), task.getId()); taskInstanceMap.put(task.getId(), task); submitPostNode(Long.toString(task.getTaskCode())); continue; } } //init varPool only this task is the first time running if (task.isFirstRun()) { //get pre task ,get all the task varPool to this task Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode())); getPreVarPool(task, preTask); } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task); if (!taskInstanceOptional.isPresent()) { this.taskFailedSubmit = true; // Remove and add to complete map and error map if (!removeTaskFromStandbyList(task)) { logger.error( "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}", processInstance.getId(), task.getTaskCode()); } completeTaskMap.put(task.getTaskCode(), task.getId()); taskInstanceMap.put(task.getId(), task); errorTaskMap.put(task.getTaskCode(), task.getId()); activeTaskProcessorMaps.remove(task.getTaskCode()); logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}", task.getProcessInstanceId(), task.getId(), task.getTaskCode()); } else { removeTaskFromStandbyList(task); } } else if (DependResult.FAILED == dependResult) { // if the dependency fails, the current node is not submitted and the state changes to failure. dependFailedTaskMap.put(task.getTaskCode(), task.getId()); removeTaskFromStandbyList(task); logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); } else if (DependResult.NON_EXEC == dependResult) { // for some reasons(depend task pause/stop) this task would not be submit removeTaskFromStandbyList(task); logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); } } ``` 所以,这个地方用for循环的逻辑是不是不合理? ### What you expected to happen 不阻塞readyToSubmitTaskQueue队列中taskInstance的判断 ### How to reproduce no ### Anything else _No response_ ### Version 3.1.x ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
