This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch 3.0.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 4fc9bce444b482248a68b0cf2a310c59d848ef95 Author: Wenjun Ruan <[email protected]> AuthorDate: Wed Jul 6 14:53:28 2022 +0800 [Fix-10785] Fix state event handle error will not retry (#10786) * Fix state event handle error will not retry * Use state event handler to deal with the event (cherry picked from commit 67d14fb7b3d941af613a96a0b9c3a2928e5c201c) --- .../master/runner/WorkflowExecuteRunnable.java | 35 ++++++++++------------ 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 7a15a43d2b..16604662ac 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -281,14 +281,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { try { stateEvent = this.stateEvents.peek(); LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), - stateEvent.getTaskInstanceId()); + stateEvent.getTaskInstanceId()); // if state handle success then will remove this state, otherwise will retry this state next time. // The state should always handle success except database error. checkProcessInstance(stateEvent); - StateEventHandler stateEventHandler - = StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) - .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); + StateEventHandler stateEventHandler = + StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) + .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); if (stateEventHandler.handleStateEvent(this, stateEvent)) { this.stateEvents.remove(stateEvent); } @@ -298,14 +298,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (StateEventHandleException stateEventHandleException) { logger.error("State event handle error, will retry this event: {}", - stateEvent, - stateEventHandleException); + stateEvent, + stateEventHandleException); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue. logger.error("State event handle error, get a unknown exception, will retry this event: {}", - stateEvent, - e); + stateEvent, + e); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); @@ -350,7 +350,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); return true; } if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { @@ -454,8 +454,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); if (newTaskInstance == null) { logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", - taskInstance.getTaskCode(), - taskInstance.getId()); + taskInstance.getTaskCode(), + taskInstance.getId()); return; } waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); @@ -787,8 +787,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { errorTaskMap.clear(); if (!isNewProcessInstance()) { - List<TaskInstance> validTaskInstanceList - = processService.findValidTaskListByProcessId(processInstance.getId()); + List<TaskInstance> validTaskInstanceList = + processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance task : validTaskInstanceList) { if (validTaskMap.containsKey(task.getTaskCode())) { int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); @@ -799,7 +799,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { continue; } logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", - task.getTaskCode()); + task.getTaskCode()); } validTaskMap.put(task.getTaskCode(), task.getId()); @@ -1193,10 +1193,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { } private void submitPostNode(String parentNodeCode) throws StateEventHandleException { - Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, - skipTaskNodeMap, - dag, - getCompleteTaskInstanceMap()); + Set<String> submitTaskNodeList = + DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); List<TaskInstance> taskInstances = new ArrayList<>(); for (String taskNode : submitTaskNodeList) { TaskNode taskNodeObject = dag.getNode(taskNode); @@ -1859,7 +1857,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { return waitToRetryTaskInstanceMap; } - private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) { // get start params from command param Map<String, String> startParamMap = new HashMap<>();
