This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch 3.0.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit c488a9f82811babe12307b6c37b9433b50a6a0c5 Author: Wenjun Ruan <[email protected]> AuthorDate: Tue Jul 19 11:35:56 2022 +0800 Fix compile error --- .../worker/processor/TaskExecuteProcessor.java | 134 ++++++++++----------- .../server/worker/runner/WorkerManagerThread.java | 1 - 2 files changed, 61 insertions(+), 74 deletions(-) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index b355c6702b..484acf418e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -42,8 +42,6 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; -import org.apache.commons.lang.SystemUtils; - import java.util.Date; import org.slf4j.Logger; @@ -133,84 +131,74 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { - boolean osUserExistFlag; - //if Using distributed is true and Currently supported systems are linux,Should not let it automatically - //create tenants,so TenantAutoCreate has no effect - if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) { - //use the id command to judge in linux - osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode()); - } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { - // if not exists this user, then create + if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); - osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); - } else { - osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); - } - if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { - if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { - OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); - } - - // check if the OS user exists - if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) { - logger.error("tenantCode: {} does not exist, taskInstanceId: {}", - taskExecutionContext.getTenantCode(), - taskExecutionContext.getTaskInstanceId()); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); - taskExecutionContext.setEndTime(new Date()); - taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); - return; - } - - // local execute path - String execLocalPath = getExecLocalPath(taskExecutionContext); - logger.info("task instance local execute path : {}", execLocalPath); - taskExecutionContext.setExecutePath(execLocalPath); - - try { - FileUtils.createWorkDirIfAbsent(execLocalPath); - } catch (Throwable ex) { - logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", - execLocalPath, - taskExecutionContext.getTaskInstanceId()); - logger.error("create executeLocalPath fail", ex); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); - taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); - return; - } } - taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), - new NettyRemoteChannel(channel, command.getOpaque())); - - // delay task process - long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getDelayTime() * 60L); - if (remainTime > 0) { - logger.info("delay the execution of task instance {}, delay time: {} s", - taskExecutionContext.getTaskInstanceId(), - remainTime); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); - taskExecutionContext.setStartTime(null); - taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext); + // check if the OS user exists + if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) { + logger.error("tenantCode: {} does not exist, taskInstanceId: {}", + taskExecutionContext.getTenantCode(), + taskExecutionContext.getTaskInstanceId()); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskExecutionContext.setEndTime(new Date()); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + return; } - // submit task to manager - boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, - taskCallbackService, - alertClientService, - taskPluginManager)); - if (!offer) { - logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", - workerManager.getDelayQueueSize(), + // local execute path + String execLocalPath = getExecLocalPath(taskExecutionContext); + logger.info("task instance local execute path : {}", execLocalPath); + taskExecutionContext.setExecutePath(execLocalPath); + + try { + FileUtils.createWorkDirIfAbsent(execLocalPath); + } catch (Throwable ex) { + logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", + execLocalPath, taskExecutionContext.getTaskInstanceId()); + logger.error("create executeLocalPath fail", ex); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + return; } } + taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), + new NettyRemoteChannel(channel, command.getOpaque())); + + // delay task process + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L); + if (remainTime > 0) { + logger.info("delay the execution of task instance {}, delay time: {} s", + taskExecutionContext.getTaskInstanceId(), + remainTime); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); + taskExecutionContext.setStartTime(null); + taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext); + } + + // submit task to manager + boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, + taskCallbackService, + alertClientService, + taskPluginManager, + storageOperate)); + if (!offer) { + logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", + workerManager.getDelayQueueSize(), + taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + } + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + } + /** * get execute local path * @@ -219,9 +207,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { */ private String getExecLocalPath(TaskExecutionContext taskExecutionContext) { return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index ae735f4338..9c826e0d5a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -143,7 +143,6 @@ public class WorkerManagerThread implements Runnable { while (Stopper.isRunning()) { try { taskExecuteThread = workerExecuteQueue.take(); - taskExecuteThread.setStorageOperate(storageOperate); workerExecService.submit(taskExecuteThread); } catch (Exception e) { logger.error("An unexpected interrupt is happened, "
