github-actions[bot] commented on issue #14688:
URL: 
https://github.com/apache/dolphinscheduler/issues/14688#issuecomment-1661683400

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/dolphinscheduler/issues?q=is%3Aissue) and 
found no similar issues.
   
   
   ### What happened
   
   In the `WorkflowExecuteRunnable#submitStandByTask` method, when the 
dependResult of the taskInstance instance peeked out from the 
readyToSubmitTaskQueue queue is WAITING, since peek cannot remove the 
taskInstance from the readyToSubmitTaskQueue, it will not be able to continue 
to peek readyToSubmitTaskQueue to make judgments on other taskInstances. When 
processing the processInstance of sub_process, in some cases, the subtasks of 
sub_process have not been running, and the state of sub_process has been in the 
running state.
   code show as below:
   ```java
           int length = readyToSubmitTaskQueue. size();
           for (int i = 0; i < length; i++) {
               TaskInstance task = readyToSubmitTaskQueue. peek();
               if (task == null) {
                   continue;
               }
               // stop tasks which is retrying if forced success happens
               if (task. taskCanRetry()) {
                   TaskInstance retryTask = 
processService.findTaskInstanceById(task.getId());
                   if (retryTask != null && retryTask. getState(). 
equals(ExecutionStatus. FORCED_SUCCESS)) {
                       task.setState(retryTask.getState());
                       logger.info("task: {} has been forced success, put it 
into complete task list and stop retrying",
                                   task. getName());
                       removeTaskFromStandbyList(task);
                       completeTaskMap.put(task.getTaskCode(), task.getId());
                       taskInstanceMap.put(task.getId(), task);
                       submitPostNode(Long. toString(task. getTaskCode()));
                       continue;
                   }
               }
               //init varPool only this task is the first time running
               if (task. isFirstRun()) {
                   //get pre task , get all the task varPool to this task
                   Set<String> preTask = 
dag.getPreviousNodes(Long.toString(task.getTaskCode()));
                   getPreVarPool(task, preTask);
               }
               DependResult dependResult = getDependResultForTask(task);
               if (DependResult. SUCCESS == dependResult) {
                   Optional<TaskInstance> taskInstanceOptional = 
submitTaskExec(task);
                   if (!taskInstanceOptional. isPresent()) {
                       this.taskFailedSubmit = true;
                       // Remove and add to complete map and error map
                       if (!removeTaskFromStandbyList(task)) {
                           logger. error(
                               "Task submit failed, remove from standby list 
failed, workflowInstanceId: {}, taskCode: {}",
                               processInstance. getId(),
                               task. getTaskCode());
                       }
                       completeTaskMap.put(task.getTaskCode(), task.getId());
                       taskInstanceMap.put(task.getId(), task);
                       errorTaskMap.put(task.getTaskCode(), task.getId());
                       activeTaskProcessorMaps. remove(task. getTaskCode());
                       logger.error("Task submitted failed, workflowInstanceId: 
{}, taskInstanceId: {}, taskCode: {}",
                                    task.getProcessInstanceId(),
                                    task. getId(),
                                    task. getTaskCode());
                   } else {
                       removeTaskFromStandbyList(task);
                   }
               } else if (DependResult. FAILED == dependResult) {
                   // if the dependency fails, the current node is not 
submitted and the state changes to failure.
                   dependFailedTaskMap.put(task.getTaskCode(), task.getId());
                   removeTaskFromStandbyList(task);
                   logger.info("Task dependent result is failed, 
taskInstanceId:{} depend result : {}",
                               task. getId(),
                               dependResult);
               } else if (DependResult. NON_EXEC == dependResult) {
                   // for some reasons(depend task pause/stop) this task would 
not be submit
                   removeTaskFromStandbyList(task);
                   logger.info("Remove task due to depend result not executed, 
taskInstanceId:{} depend result : {}",
                               task. getId(),
                               dependResult);
               }
           }
   ```
   So, is it unreasonable to use the logic of the for loop in this place?
   
   ### What you expected to happen
   
   Do not block the judgment of taskInstance in the readyToSubmitTaskQueue queue
   
   ### How to reproduce
   
   no
   
   ### Anything else
   
   _No response_
   
   ### Version
   
   3.1.x
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to