lxorc opened a new issue, #14496: URL: https://github.com/apache/dolphinscheduler/issues/14496
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/dolphinscheduler/issues?q=is%3Aissue) and found no similar issues. ### What happened ``` [INFO] 2023-07-10 17:29:10.423 org.apache.dolphinscheduler.service.process.ProcessServiceImpl:[1333] - [WorkflowInstance-283163][TaskInstance-0] - Start save taskInstance to database : app_retailers_online_secondary_industry_cube_all_copy_20221018142200875, processInstance id:283163, state: WorkflowExecutionStatus{code=4, desc='ready stop'} [ERROR] 2023-07-10 17:29:10.423 org.apache.dolphinscheduler.service.process.ProcessServiceImpl:[1307] - [WorkflowInstance-283164][TaskInstance-0] - task commit to db failed , taskCode: 7196556024840 has already retry 3 times, please check the database [WARN] 2023-07-10 17:29:10.423 org.apache.dolphinscheduler.service.process.ProcessServiceImpl:[1599] - [WorkflowInstance-283163][TaskInstance-0] - processInstance: 283163 state was: WorkflowExecutionStatus{code=4, desc='ready stop'}, skip submit this task, taskCode: 7250396272006 [ERROR] 2023-07-10 17:29:10.423 org.apache.dolphinscheduler.service.process.ProcessServiceImpl:[1340] - [WorkflowInstance-283163][TaskInstance-0] - Save taskInstance to db error, task name:app_retailers_online_secondary_industry_cube_all_copy_20221018142200875, process id:283163 state: WorkflowExecutionStatus{code=4, desc='ready stop'} [ERROR] 2023-07-10 17:29:10.423 org.apache.dolphinscheduler.service.process.ProcessServiceImpl:[1307] - [WorkflowInstance-283163][TaskInstance-0] - task commit to db failed , taskCode: 7250396272006 has already retry 3 times, please check the database ``` ```java public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) { logger.info("Start save taskInstance to database : {}, processInstance id:{}, state: {}", taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); // READY_STOP // submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); // start submit task instance to db if (task == null) { // into this condition logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ", taskInstance.getName(), taskInstance.getProcessInstance().getId(), processInstance.getState()); return null; // and will return null } ``` ```java public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { WorkflowExecutionStatus processInstanceState = processInstance.getState(); if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) { // return null logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}", processInstance.getId(), processInstanceState, taskInstance.getTaskCode()); return null; } ``` ```java public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval) { int retryTimes = 1; TaskInstance task = null; while (retryTimes <= commitRetryTimes) { try { // submit task to db // Only want to use transaction here task = submitTask(processInstance, taskInstance); if (task != null && task.getId() != null) { break; } logger.error( "task commit to db failed , taskCode: {} has already retry {} times, please check the database", // will retry again and task submit max retry times is 5 taskInstance.getTaskCode(), retryTimes); Thread.sleep(commitInterval); } catch (Exception e) { logger.error("task commit to db failed", e); } finally { retryTimes += 1; } } return task; // after 5 times return null } ``` ```java @Override public boolean submitTask() { this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; // return false } this.setTaskExecutionLogger(); logger.info("switch task submit success"); return true; } ``` ```java private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) { try { // package task instance before submit processService.packageTaskInstance(taskInstance, processInstance); ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); taskProcessor.init(taskInstance, processInstance); if (taskInstance.getState().isRunning() && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } boolean submit = taskProcessor.action(TaskAction.SUBMIT); if (!submit) { logger.error("Submit standby task failed!, taskCode: {}, taskName: {}", taskInstance.getTaskCode(), taskInstance.getName()); return Optional.empty(); // TaskInstance is empty } ``` ```java public void submitStandByTask() throws StateEventHandleException { ... if (!taskInstanceOptional.isPresent()) { this.taskFailedSubmit = true; // Remove and add to complete map and error map if (!removeTaskFromStandbyList(task)) { logger.error( "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}", processInstance.getId(), task.getTaskCode()); } completeTaskMap.put(task.getTaskCode(), task.getId()); // getId is NullException taskInstanceMap.put(task.getId(), task); ... ``` ```shell [ERROR] 2023-07-10 17:52:59.909 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable:[950] - [WorkflowInstance-283162][TaskInstance-0] - Submit standby task failed!, taskCode: 7250411457542, taskName: app_retailers_online_secondary_industry_cube_all_copy_20221013173133944_copy_20221018142359512 [ERROR] 2023-07-10 17:52:59.910 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable:[709] - [WorkflowInstance-0][TaskInstance-0] - Start workflow error java.lang.NullPointerException: null at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable.submitStandByTask(WorkflowExecuteRunnable.java:1864) at org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable.submitPostNode(WorkflowExecuteRunnable.java:1370) at org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable.call(WorkflowExecuteRunnable.java:703) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### What you expected to happen WorkflowExecutionStatus change to KILL ### How to reproduce stop a workflow ### Anything else _No response_ ### Version 3.1.x ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
