github-actions[bot] commented on issue #14688: URL: https://github.com/apache/dolphinscheduler/issues/14688#issuecomment-1661683400
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/dolphinscheduler/issues?q=is%3Aissue) and found no similar issues. ### What happened In the `WorkflowExecuteRunnable#submitStandByTask` method, when the dependResult of the taskInstance instance peeked out from the readyToSubmitTaskQueue queue is WAITING, since peek cannot remove the taskInstance from the readyToSubmitTaskQueue, it will not be able to continue to peek readyToSubmitTaskQueue to make judgments on other taskInstances. When processing the processInstance of sub_process, in some cases, the subtasks of sub_process have not been running, and the state of sub_process has been in the running state. code show as below: ```java int length = readyToSubmitTaskQueue. size(); for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue. peek(); if (task == null) { continue; } // stop tasks which is retrying if forced success happens if (task. taskCanRetry()) { TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); if (retryTask != null && retryTask. getState(). equals(ExecutionStatus. FORCED_SUCCESS)) { task.setState(retryTask.getState()); logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task. getName()); removeTaskFromStandbyList(task); completeTaskMap.put(task.getTaskCode(), task.getId()); taskInstanceMap.put(task.getId(), task); submitPostNode(Long. toString(task. getTaskCode())); continue; } } //init varPool only this task is the first time running if (task. isFirstRun()) { //get pre task , get all the task varPool to this task Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode())); getPreVarPool(task, preTask); } DependResult dependResult = getDependResultForTask(task); if (DependResult. SUCCESS == dependResult) { Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task); if (!taskInstanceOptional. isPresent()) { this.taskFailedSubmit = true; // Remove and add to complete map and error map if (!removeTaskFromStandbyList(task)) { logger. error( "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}", processInstance. getId(), task. getTaskCode()); } completeTaskMap.put(task.getTaskCode(), task.getId()); taskInstanceMap.put(task.getId(), task); errorTaskMap.put(task.getTaskCode(), task.getId()); activeTaskProcessorMaps. remove(task. getTaskCode()); logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}", task.getProcessInstanceId(), task. getId(), task. getTaskCode()); } else { removeTaskFromStandbyList(task); } } else if (DependResult. FAILED == dependResult) { // if the dependency fails, the current node is not submitted and the state changes to failure. dependFailedTaskMap.put(task.getTaskCode(), task.getId()); removeTaskFromStandbyList(task); logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task. getId(), dependResult); } else if (DependResult. NON_EXEC == dependResult) { // for some reasons(depend task pause/stop) this task would not be submit removeTaskFromStandbyList(task); logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task. getId(), dependResult); } } ``` So, is it unreasonable to use the logic of the for loop in this place? ### What you expected to happen Do not block the judgment of taskInstance in the readyToSubmitTaskQueue queue ### How to reproduce no ### Anything else _No response_ ### Version 3.1.x ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
