This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch 3.0.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 4ceb42087310575ffedd7b09fbf35a08614bb3b7 Author: Wenjun Ruan <[email protected]> AuthorDate: Thu Jun 16 21:46:18 2022 +0800 Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479) * Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (cherry picked from commit ad2646ff1f7baa5d76d29023ced2c28a89b52f6b) --- .../dolphinscheduler/common/utils/FileUtils.java | 29 +++- .../dao/entity/ProcessInstance.java | 12 +- .../master/consumer/TaskPriorityQueueConsumer.java | 8 +- .../server/master/processor/queue/TaskEvent.java | 105 +----------- .../processor/queue/TaskExecuteThreadPool.java | 22 +-- .../master/runner/WorkflowExecuteThread.java | 176 +++++++++++++-------- .../master/runner/WorkflowExecuteThreadPool.java | 2 +- .../master/runner/task/CommonTaskProcessor.java | 4 +- .../master/runner/task/TaskProcessorFactory.java | 27 +++- .../runner/task/TaskProcessorFactoryTest.java | 4 +- .../remote/utils/ChannelUtils.java | 12 +- .../apache/dolphinscheduler/remote/utils/Host.java | 2 + .../service/process/ProcessServiceImpl.java | 5 +- .../queue/PeerTaskInstancePriorityQueue.java | 35 ++-- .../queue/PeerTaskInstancePriorityQueueTest.java | 9 +- .../src/main/resources/application.yaml | 4 +- .../plugin/task/shell/ShellTask.java | 7 +- .../server/worker/config/WorkerConfig.java | 4 +- .../worker/processor/TaskCallbackService.java | 2 + .../server/worker/runner/TaskExecuteThread.java | 24 ++- 20 files changed, 247 insertions(+), 246 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index bdcf62f76a..23e4b74b75 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -17,14 +17,25 @@ package org.apache.dolphinscheduler.common.utils; +import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH; +import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE; +import static org.apache.dolphinscheduler.common.Constants.UTF_8; +import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS; + import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; -import static org.apache.dolphinscheduler.common.Constants.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * file utils @@ -112,7 +123,15 @@ public class FileUtils { File execLocalPathFile = new File(execLocalPath); if (execLocalPathFile.exists()) { - org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile); + try { + org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile); + } catch (Exception ex) { + if (ex instanceof NoSuchFileException || ex.getCause() instanceof NoSuchFileException) { + // this file is already be deleted. + } else { + throw ex; + } + } } //create work dir diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 8f639efc07..fb3ae13858 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -266,13 +266,11 @@ public class ProcessInstance { */ public ProcessInstance(ProcessDefinition processDefinition) { this.processDefinition = processDefinition; - this.name = processDefinition.getName() - + "-" - + - processDefinition.getVersion() - + "-" - + - DateUtils.getCurrentTimeStamp(); + // todo: the name is not unique + this.name = String.join("-", + processDefinition.getName(), + String.valueOf(processDefinition.getVersion()), + DateUtils.getCurrentTimeStamp()); } public String getVarPool() { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index f692685009..f9c19c48fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -180,16 +180,18 @@ public class TaskPriorityQueueConsumer extends Thread { return true; } } - result = dispatcher.dispatch(executionContext); if (result) { + logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId()); addDispatchEvent(context, executionContext); + } else { + logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId()); } } catch (RuntimeException e) { - logger.error("dispatch error: ", e); + logger.error("Master dispatch task to worker error: ", e); } catch (ExecuteException e) { - logger.error("dispatch error: {}", e.getMessage()); + logger.error("Master dispatch task to worker error: {}", e); } return result; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index 865eee53a5..86cce5170e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -26,10 +26,12 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import java.util.Date; import io.netty.channel.Channel; +import lombok.Data; /** * task event */ +@Data public class TaskEvent { /** @@ -135,107 +137,4 @@ public class TaskEvent { return event; } - public String getVarPool() { - return varPool; - } - - public void setVarPool(String varPool) { - this.varPool = varPool; - } - - public int getTaskInstanceId() { - return taskInstanceId; - } - - public void setTaskInstanceId(int taskInstanceId) { - this.taskInstanceId = taskInstanceId; - } - - public String getWorkerAddress() { - return workerAddress; - } - - public void setWorkerAddress(String workerAddress) { - this.workerAddress = workerAddress; - } - - public ExecutionStatus getState() { - return state; - } - - public void setState(ExecutionStatus state) { - this.state = state; - } - - public Date getStartTime() { - return startTime; - } - - public void setStartTime(Date startTime) { - this.startTime = startTime; - } - - public Date getEndTime() { - return endTime; - } - - public void setEndTime(Date endTime) { - this.endTime = endTime; - } - - public String getExecutePath() { - return executePath; - } - - public void setExecutePath(String executePath) { - this.executePath = executePath; - } - - public String getLogPath() { - return logPath; - } - - public void setLogPath(String logPath) { - this.logPath = logPath; - } - - public int getProcessId() { - return processId; - } - - public void setProcessId(int processId) { - this.processId = processId; - } - - public String getAppIds() { - return appIds; - } - - public void setAppIds(String appIds) { - this.appIds = appIds; - } - - public Event getEvent() { - return event; - } - - public void setEvent(Event event) { - this.event = event; - } - - public Channel getChannel() { - return channel; - } - - public void setChannel(Channel channel) { - this.channel = channel; - } - - public int getProcessInstanceId() { - return processInstanceId; - } - - public void setProcessInstanceId(int processInstanceId) { - this.processInstanceId = processInstanceId; - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java index 75c0272519..c9c2868d36 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -79,7 +79,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { public void submitTaskEvent(TaskEvent taskEvent) { if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) { - logger.warn("workflowExecuteThread is null, event: {}", taskEvent); + logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent); return; } if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) { @@ -114,20 +114,24 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { - logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex); - if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) { - taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId()); - logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId()); + Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); + logger.error("persist event failed processInstanceId: {}", processInstanceId, ex); + if (!processInstanceExecCacheManager.contains(processInstanceId)) { + taskExecuteThreadMap.remove(processInstanceId); + logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}", + processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); } @Override public void onSuccess(Object result) { - logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId()); - if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) { - taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId()); - logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId()); + Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); + logger.info("persist events succeeded, processInstanceId: {}", processInstanceId); + if (!processInstanceExecCacheManager.contains(processInstanceId)) { + taskExecuteThreadMap.remove(processInstanceId); + logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}", + processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 993e1df14b..fede5abad2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -75,8 +75,8 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.math.NumberUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import java.util.ArrayList; import java.util.Arrays; @@ -87,6 +87,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -238,12 +239,12 @@ public class WorkflowExecuteThread { * @param masterConfig masterConfig * @param stateWheelExecuteThread stateWheelExecuteThread */ - public WorkflowExecuteThread(ProcessInstance processInstance - , ProcessService processService - , NettyExecutorManager nettyExecutorManager - , ProcessAlertManager processAlertManager - , MasterConfig masterConfig - , StateWheelExecuteThread stateWheelExecuteThread) { + public WorkflowExecuteThread(ProcessInstance processInstance, + ProcessService processService, + NettyExecutorManager nettyExecutorManager, + ProcessAlertManager processAlertManager, + MasterConfig masterConfig, + StateWheelExecuteThread stateWheelExecuteThread) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = masterConfig; @@ -279,15 +280,14 @@ public class WorkflowExecuteThread { } public String getKey() { - if (StringUtils.isNotEmpty(key) - || this.processDefinition == null) { + if (StringUtils.isNotEmpty(key) || this.processDefinition == null) { return key; } key = String.format("%d_%d_%d", - this.processDefinition.getCode(), - this.processDefinition.getVersion(), - this.processInstance.getId()); + this.processDefinition.getCode(), + this.processDefinition.getVersion(), + this.processInstance.getId()); return key; } @@ -436,10 +436,10 @@ public class WorkflowExecuteThread { private void taskFinished(TaskInstance taskInstance) { logger.info("work flow {} task id:{} code:{} state:{} ", - processInstance.getId(), - taskInstance.getId(), - taskInstance.getTaskCode(), - taskInstance.getState()); + processInstance.getId(), + taskInstance.getId(), + taskInstance.getTaskCode(), + taskInstance.getState()); activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); @@ -460,7 +460,7 @@ public class WorkflowExecuteThread { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); // There are child nodes and the failure policy is: CONTINUE if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag) - && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { + && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); @@ -492,8 +492,9 @@ public class WorkflowExecuteThread { this.stateEvents.add(nextEvent); } else { ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); - this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), - org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); + this.processService.sendStartTask2Master(processInstance, + nextTaskInstance.getId(), + org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); } } } @@ -515,7 +516,8 @@ public class WorkflowExecuteThread { } waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); if (!taskInstance.retryTaskIntervalOverTime()) { - logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", + logger.info( + "failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", processInstance.getId(), newTaskInstance.getTaskCode(), newTaskInstance.getState(), @@ -552,7 +554,7 @@ public class WorkflowExecuteThread { logger.info("process instance update: {}", processInstanceId); processInstance = processService.findProcessInstanceById(processInstanceId); processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); } @@ -580,9 +582,7 @@ public class WorkflowExecuteThread { */ public boolean checkProcessInstance(StateEvent stateEvent) { if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { - logger.error("mismatch process instance id: {}, state event:{}", - this.processInstance.getId(), - stateEvent); + logger.error("mismatch process instance id: {}, state event:{}", this.processInstance.getId(), stateEvent); return false; } return true; @@ -742,9 +742,9 @@ public class WorkflowExecuteThread { return true; } logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", - processInstance.getId(), - processInstance.getScheduleTime(), - complementListDate.toString()); + processInstance.getId(), + processInstance.getScheduleTime(), + complementListDate.toString()); scheduleDate = complementListDate.get(index + 1); } //the next process complement @@ -783,8 +783,7 @@ public class WorkflowExecuteThread { } private boolean needComplementProcess() { - if (processInstance.isComplementData() - && Flag.NO == processInstance.getIsSubProcess()) { + if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) { return true; } return false; @@ -863,7 +862,7 @@ public class WorkflowExecuteThread { return; } processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); @@ -883,7 +882,9 @@ public class WorkflowExecuteThread { List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); ProcessDag processDag = generateFlowDag(taskNodeList, - startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType()); + startNodeNameList, + recoveryNodeCodeList, + processInstance.getTaskDependType()); if (processDag == null) { logger.error("processDag is null"); return; @@ -955,14 +956,16 @@ public class WorkflowExecuteThread { if (complementListDate.size() == 0 && needComplementProcess()) { complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); logger.info(" process definition code:{} complement data: {}", - processInstance.getProcessDefinitionCode(), complementListDate.toString()); + processInstance.getProcessDefinitionCode(), + complementListDate.toString()); if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementListDate.get(0)); - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE))); + processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, + processInstance.getScheduleTime(), + cmdParam.get(Constants.SCHEDULE_TIMEZONE))); processService.updateProcessInstance(processInstance); } } @@ -976,7 +979,7 @@ public class WorkflowExecuteThread { * @param taskInstance task instance * @return TaskInstance */ - private TaskInstance submitTaskExec(TaskInstance taskInstance) { + private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) { try { // package task instance before submit processService.packageTaskInstance(taskInstance, processInstance); @@ -984,17 +987,17 @@ public class WorkflowExecuteThread { ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); taskProcessor.init(taskInstance, processInstance); - if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION - && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { + if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType() + .equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } boolean submit = taskProcessor.action(TaskAction.SUBMIT); if (!submit) { logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", - processInstance.getId(), processInstance.getName(), - taskInstance.getId(), taskInstance.getName()); - return null; + processInstance.getId(), processInstance.getName(), + taskInstance.getId(), taskInstance.getName()); + return Optional.empty(); } // in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid @@ -1033,10 +1036,10 @@ public class WorkflowExecuteThread { taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE); this.stateEvents.add(taskStateChangeEvent); } - return taskInstance; + return Optional.of(taskInstance); } catch (Exception e) { logger.error("submit standby task error", e); - return null; + return Optional.empty(); } } @@ -1333,6 +1336,7 @@ public class WorkflowExecuteThread { for (TaskInstance task : taskInstances) { if (readyToSubmitTaskQueue.contains(task)) { + logger.warn("Task is already at submit queue, taskInstanceId: {}", task.getId()); continue; } @@ -1659,11 +1663,12 @@ public class WorkflowExecuteThread { private void updateProcessInstanceState() { ExecutionStatus state = getProcessInstanceState(processInstance); if (processInstance.getState() != state) { - logger.info( - "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), processInstance.getName(), - processInstance.getState(), state, - processInstance.getCommandType()); + logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", + processInstance.getId(), + processInstance.getName(), + processInstance.getState(), + state, + processInstance.getCommandType()); processInstance.setState(state); if (state.typeIsFinished()) { @@ -1687,11 +1692,12 @@ public class WorkflowExecuteThread { private void updateProcessInstanceState(StateEvent stateEvent) { ExecutionStatus state = stateEvent.getExecutionStatus(); if (processInstance.getState() != state) { - logger.info( - "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), processInstance.getName(), - processInstance.getState(), state, - processInstance.getCommandType()); + logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", + processInstance.getId(), + processInstance.getName(), + processInstance.getState(), + state, + processInstance.getCommandType()); processInstance.setState(state); if (state.typeIsFinished()) { @@ -1723,7 +1729,9 @@ public class WorkflowExecuteThread { return; } logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", - taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); + taskInstance.getName(), + taskInstance.getId(), + taskInstance.getTaskCode()); readyToSubmitTaskQueue.put(taskInstance); } catch (Exception e) { logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e); @@ -1736,15 +1744,14 @@ public class WorkflowExecuteThread { * @param taskInstance task instance */ private void removeTaskFromStandbyList(TaskInstance taskInstance) { - logger.info("remove task from stand by list, id: {} name:{}", - taskInstance.getId(), - taskInstance.getName()); + logger.info("remove task from stand by list, id: {} name:{}", taskInstance.getId(), taskInstance.getName()); try { readyToSubmitTaskQueue.remove(taskInstance); } catch (Exception e) { logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}", - taskInstance.getId(), - taskInstance.getName(), e); + taskInstance.getId(), + taskInstance.getName(), + e); } } @@ -1766,8 +1773,9 @@ public class WorkflowExecuteThread { * close the on going tasks */ private void killAllTasks() { - logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(), - activeTaskProcessorMaps.size()); + logger.info("kill called on process instance id: {}, num: {}", + processInstance.getId(), + activeTaskProcessorMaps.size()); if (readyToSubmitTaskQueue.size() > 0) { readyToSubmitTaskQueue.clear(); @@ -1831,14 +1839,16 @@ public class WorkflowExecuteThread { } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { - TaskInstance taskInstance = submitTaskExec(task); - if (taskInstance == null) { + Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task); + if (!taskInstanceOptional.isPresent()) { this.taskFailedSubmit = true; // Remove and add to complete map and error map removeTaskFromStandbyList(task); completeTaskMap.put(task.getTaskCode(), task.getId()); errorTaskMap.put(task.getTaskCode(), task.getId()); - logger.error("process {}, task {}, code:{} submit task failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode()); + logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", + task.getProcessInstanceId(), + task.getId()); } else { removeTaskFromStandbyList(task); } @@ -1846,11 +1856,15 @@ public class WorkflowExecuteThread { // if the dependency fails, the current node is not submitted and the state changes to failure. dependFailedTaskMap.put(task.getTaskCode(), task.getId()); removeTaskFromStandbyList(task); - logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult); + logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", + task.getId(), + dependResult); } else if (DependResult.NON_EXEC == dependResult) { // for some reasons(depend task pause/stop) this task would not be submit removeTaskFromStandbyList(task); - logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult); + logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", + task.getId(), + dependResult); } } } catch (Exception e) { @@ -2009,4 +2023,30 @@ public class WorkflowExecuteThread { } } } + } + + private void measureTaskState(StateEvent taskStateEvent) { + if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) { + // the event is broken + logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent); + return; + } + if (taskStateEvent.getExecutionStatus().typeIsFinished()) { + TaskMetrics.incTaskFinish(); + } + switch (taskStateEvent.getExecutionStatus()) { + case STOP: + TaskMetrics.incTaskStop(); + break; + case SUCCESS: + TaskMetrics.incTaskSuccess(); + break; + case FAILURE: + TaskMetrics.incTaskFailure(); + break; + default: + break; + } + } +} \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index cd337b4c95..556f36a1b7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -100,7 +100,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { /** * execute workflow */ - public void executeEvent(WorkflowExecuteThread workflowExecuteThread) { + public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) { if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) { return; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index ffeb89a0d2..81e3070089 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -110,7 +110,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); return true; } - logger.info("task ready to submit: {}", taskInstance); + logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId()); TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(), processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), @@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { taskPriority.setTaskExecutionContext(taskExecutionContext); taskUpdateQueue.put(taskPriority); - logger.info(String.format("master submit success, task : %s", taskInstance.getName())); + logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId()); return true; } catch (Exception e) { logger.error("submit task error", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java index 0129338649..d5bd131a10 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java @@ -21,36 +21,47 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; import org.apache.commons.lang3.StringUtils; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.Map; -import java.util.Objects; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * the factory to create task processor */ public class TaskProcessorFactory { - public static final Map<String, ITaskProcessor> PROCESS_MAP = new ConcurrentHashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class); + + public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>(); private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE; static { for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) { - PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor); + try { + PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor()); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("The task processor should has a no args constructor"); + } } } - public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException { + public static ITaskProcessor getTaskProcessor(String type) throws InvocationTargetException, InstantiationException, IllegalAccessException { if (StringUtils.isEmpty(type)) { type = DEFAULT_PROCESSOR; } - ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type); - if (Objects.isNull(iTaskProcessor)) { - iTaskProcessor = PROCESS_MAP.get(DEFAULT_PROCESSOR); + Constructor<ITaskProcessor> iTaskProcessorConstructor = PROCESS_MAP.get(type); + if (iTaskProcessorConstructor == null) { + logger.warn("ITaskProcessor could not found for taskType: {}", type); + iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR); } - return iTaskProcessor.getClass().newInstance(); + return iTaskProcessorConstructor.newInstance(); } /** diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java index d0371809cc..b974a40b31 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner.task; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import java.lang.reflect.InvocationTargetException; + import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -27,7 +29,7 @@ import org.junit.Test; public class TaskProcessorFactoryTest { @Test - public void testFactory() throws InstantiationException, IllegalAccessException { + public void testFactory() throws InvocationTargetException, InstantiationException, IllegalAccessException { TaskInstance taskInstance = new TaskInstance(); taskInstance.setTaskType("shell"); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java index 239a3993c0..b4177ec25d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java @@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import java.net.InetSocketAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.netty.channel.Channel; /** @@ -28,6 +31,8 @@ import io.netty.channel.Channel; */ public class ChannelUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class); + private ChannelUtils() { throw new IllegalStateException(ChannelUtils.class.getName()); } @@ -49,7 +54,7 @@ public class ChannelUtils { * @return remote address */ public static String getRemoteAddress(Channel channel) { - return NetUtils.getHost(((InetSocketAddress) channel.remoteAddress()).getAddress()); + return toAddress(channel).getAddress(); } /** @@ -60,6 +65,11 @@ public class ChannelUtils { */ public static Host toAddress(Channel channel) { InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress()); + if (socketAddress == null) { + // the remote channel already closed + LOGGER.warn("The channel is already closed"); + return Host.EMPTY; + } return new Host(NetUtils.getHost(socketAddress.getAddress()), socketAddress.getPort()); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java index 2163e9c7d8..dc8e1f0d36 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java @@ -27,6 +27,8 @@ import java.util.Objects; */ public class Host implements Serializable { + public static final Host EMPTY = new Host(); + /** * address */ diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 48bb9c624d..eba7ced00a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -748,7 +748,8 @@ public class ProcessServiceImpl implements ProcessService { processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); processInstance.setRecovery(Flag.NO); processInstance.setStartTime(new Date()); - processInstance.setRestartTime(processInstance.getStartTime()); + // the new process instance restart time is null. + processInstance.setRestartTime(null); processInstance.setRunTimes(1); processInstance.setMaxTryTimes(0); processInstance.setCommandParam(command.getCommandParam()); @@ -1266,7 +1267,7 @@ public class ProcessServiceImpl implements ProcessService { @Override @Transactional(rollbackFor = Exception.class) public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) { - logger.info("start submit task : {}, instance id:{}, state: {}", + logger.info("start submit task : {}, processInstance id:{}, state: {}", taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); //submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 231fd2a20f..2e939ee332 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; +import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.PriorityQueue; +import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; /** * Task instances priority queue implementation @@ -40,12 +42,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst /** * queue */ - private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); - - /** - * Lock used for all public operations - */ - private final ReentrantLock lock = new ReentrantLock(true); + private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + private final Set<Integer> taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>()); /** * put task instance to priority queue @@ -56,6 +54,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst @Override public void put(TaskInstance taskInstance) throws TaskPriorityQueueException { queue.add(taskInstance); + taskInstanceIdSet.add(taskInstance.getId()); } /** @@ -66,7 +65,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst */ @Override public TaskInstance take() throws TaskPriorityQueueException { - return queue.poll(); + TaskInstance taskInstance = queue.poll(); + if (taskInstance != null) { + taskInstanceIdSet.remove(taskInstance.getId()); + } + return taskInstance; } /** @@ -111,6 +114,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst */ public void clear() { queue.clear(); + taskInstanceIdSet.clear(); } /** @@ -120,20 +124,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst * @return true is contains */ public boolean contains(TaskInstance taskInstance) { - return this.contains(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); + return this.contains(taskInstance.getId()); } - public boolean contains(long taskCode, int taskVersion) { - Iterator<TaskInstance> iterator = this.queue.iterator(); - while (iterator.hasNext()) { - TaskInstance taskInstance = iterator.next(); - if (taskCode == taskInstance.getTaskCode() - && taskVersion == taskInstance.getTaskDefinitionVersion()) { - return true; - } - } - return false; - + public boolean contains(int taskInstanceId) { + return taskInstanceIdSet.contains(taskInstanceId); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 8da3a6c194..67e40d1189 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -46,14 +46,11 @@ public class PeerTaskInstancePriorityQueueTest { Assert.assertTrue(queue.size() < peekBeforeLength); } - @Test + + @Test(expected = TaskPriorityQueueException.class) public void poll() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - try { - queue.poll(1000, TimeUnit.MILLISECONDS); - } catch (TaskPriorityQueueException e) { - e.printStackTrace(); - } + queue.poll(1000, TimeUnit.MILLISECONDS); } @Test diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index c50883da62..7be661de4e 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -109,7 +109,7 @@ master: # master prepare execute thread number to limit handle commands in parallel pre-exec-threads: 10 # master execute thread number to limit process instances in parallel - exec-threads: 100 + exec-threads: 10 # master dispatch task number per batch dispatch-task-number: 3 # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight @@ -134,7 +134,7 @@ worker: # worker listener port listen-port: 1234 # worker execute thread number to limit task instances in parallel - exec-threads: 100 + exec-threads: 10 # worker heartbeat interval, the unit is second heartbeat-interval: 10 # worker host weight to dispatch tasks, default value 100 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index 737b876b74..9d084d0ec8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.commons.collections4.MapUtils; import java.io.File; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -147,7 +148,11 @@ public class ShellTask extends AbstractTaskExecutor { if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } - Files.createFile(path, attr); + try { + Files.createFile(path, attr); + } catch (FileAlreadyExistsException ex) { + // this is expected + } } Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 5192e27d61..6edff09f97 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -21,9 +21,9 @@ import java.util.Set; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.stereotype.Component; +import org.springframework.context.annotation.Configuration; -@Component +@Configuration @EnableConfigurationProperties @ConfigurationProperties("worker") public class WorkerConfig { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 3641de8453..2a92222b0a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -162,6 +162,8 @@ public class TaskCallbackService { } } }); + } else { + logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index cc15eb62f4..e6ff2b5653 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; -import org.apache.commons.lang3.tuple.Pair; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException; import org.apache.dolphinscheduler.common.storage.StorageOperate; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -40,10 +43,17 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import java.io.File; import java.io.IOException; -import java.util.*; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -233,7 +243,11 @@ public class TaskExecuteThread implements Runnable, Delayed { org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath)); logger.info("exec local path: {} cleared.", execLocalPath); } catch (IOException e) { - logger.error("delete exec dir failed : {}", e.getMessage(), e); + if (e instanceof NoSuchFileException) { + // this is expected + } else { + logger.error("Delete exec dir failed.", e); + } } } } @@ -264,7 +278,7 @@ public class TaskExecuteThread implements Runnable, Delayed { task.cancelApplication(true); ProcessUtils.killYarnJob(taskExecutionContext); } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error("Kill task failed", e); } } }
