This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit c488a9f82811babe12307b6c37b9433b50a6a0c5
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Jul 19 11:35:56 2022 +0800

    Fix compile error
---
 .../worker/processor/TaskExecuteProcessor.java     | 134 ++++++++++-----------
 .../server/worker/runner/WorkerManagerThread.java  |   1 -
 2 files changed, 61 insertions(+), 74 deletions(-)

diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index b355c6702b..484acf418e 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -42,8 +42,6 @@ import 
org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
-import org.apache.commons.lang.SystemUtils;
-
 import java.util.Date;
 
 import org.slf4j.Logger;
@@ -133,84 +131,74 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
             
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
 
             if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) 
{
-                boolean osUserExistFlag;
-                //if Using distributed is true and Currently supported systems 
are linux,Should not let it automatically
-                //create tenants,so TenantAutoCreate has no effect
-                if (workerConfig.isTenantDistributedUser() && 
SystemUtils.IS_OS_LINUX) {
-                    //use the id command to judge in linux
-                    osUserExistFlag = 
OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
-                } else if (CommonUtils.isSudoEnable() && 
workerConfig.isTenantAutoCreate()) {
-                    // if not exists this user, then create
+                if (CommonUtils.isSudoEnable() && 
workerConfig.isTenantAutoCreate()) {
                     
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
-                    osUserExistFlag = 
OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
-                } else {
-                    osUserExistFlag = 
OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
-                }
-                if (Constants.DRY_RUN_FLAG_NO == 
taskExecutionContext.getDryRun()) {
-                    if (CommonUtils.isSudoEnable() && 
workerConfig.isTenantAutoCreate()) {
-                        
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
-                    }
-
-                    // check if the OS user exists
-                    if 
(!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
-                        logger.error("tenantCode: {} does not exist, 
taskInstanceId: {}",
-                                     taskExecutionContext.getTenantCode(),
-                                     taskExecutionContext.getTaskInstanceId());
-                        
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-                        
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
-                        taskExecutionContext.setEndTime(new Date());
-                        
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
-                        return;
-                    }
-
-                    // local execute path
-                    String execLocalPath = 
getExecLocalPath(taskExecutionContext);
-                    logger.info("task instance local execute path : {}", 
execLocalPath);
-                    taskExecutionContext.setExecutePath(execLocalPath);
-
-                    try {
-                        FileUtils.createWorkDirIfAbsent(execLocalPath);
-                    } catch (Throwable ex) {
-                        logger.error("create execLocalPath fail, path: {}, 
taskInstanceId: {}",
-                                     execLocalPath,
-                                     taskExecutionContext.getTaskInstanceId());
-                        logger.error("create executeLocalPath fail", ex);
-                        
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-                        
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
-                        
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
-                        return;
-                    }
                 }
 
-                
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
-                                                     new 
NettyRemoteChannel(channel, command.getOpaque()));
-
-                // delay task process
-                long remainTime = 
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
-                                                          
taskExecutionContext.getDelayTime() * 60L);
-                if (remainTime > 0) {
-                    logger.info("delay the execution of task instance {}, 
delay time: {} s",
-                                taskExecutionContext.getTaskInstanceId(),
-                                remainTime);
-                    
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
-                    taskExecutionContext.setStartTime(null);
-                    
taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
+                // check if the OS user exists
+                if 
(!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
+                    logger.error("tenantCode: {} does not exist, 
taskInstanceId: {}",
+                                 taskExecutionContext.getTenantCode(),
+                                 taskExecutionContext.getTaskInstanceId());
+                    
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+                    
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+                    taskExecutionContext.setEndTime(new Date());
+                    
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                    return;
                 }
 
-                // submit task to manager
-                boolean offer = workerManager.offer(new 
TaskExecuteThread(taskExecutionContext,
-                                                                          
taskCallbackService,
-                                                                          
alertClientService,
-                                                                          
taskPluginManager));
-                if (!offer) {
-                    logger.error("submit task to manager error, queue is full, 
queue size is {}, taskInstanceId: {}",
-                                 workerManager.getDelayQueueSize(),
+                // local execute path
+                String execLocalPath = getExecLocalPath(taskExecutionContext);
+                logger.info("task instance local execute path : {}", 
execLocalPath);
+                taskExecutionContext.setExecutePath(execLocalPath);
+
+                try {
+                    FileUtils.createWorkDirIfAbsent(execLocalPath);
+                } catch (Throwable ex) {
+                    logger.error("create execLocalPath fail, path: {}, 
taskInstanceId: {}",
+                                 execLocalPath,
                                  taskExecutionContext.getTaskInstanceId());
+                    logger.error("create executeLocalPath fail", ex);
+                    
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                     
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
                     
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                    return;
                 }
             }
 
+            
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+                                                 new 
NettyRemoteChannel(channel, command.getOpaque()));
+
+            // delay task process
+            long remainTime = 
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+                                                      
taskExecutionContext.getDelayTime() * 60L);
+            if (remainTime > 0) {
+                logger.info("delay the execution of task instance {}, delay 
time: {} s",
+                            taskExecutionContext.getTaskInstanceId(),
+                            remainTime);
+                
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
+                taskExecutionContext.setStartTime(null);
+                
taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
+            }
+
+            // submit task to manager
+            boolean offer = workerManager.offer(new 
TaskExecuteThread(taskExecutionContext,
+                                                                      
taskCallbackService,
+                                                                      
alertClientService,
+                                                                      
taskPluginManager,
+                                                                      
storageOperate));
+            if (!offer) {
+                logger.error("submit task to manager error, queue is full, 
queue size is {}, taskInstanceId: {}",
+                             workerManager.getDelayQueueSize(),
+                             taskExecutionContext.getTaskInstanceId());
+                
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+                
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+            }
+        } finally {
+            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+        }
+    }
+
     /**
      * get execute local path
      *
@@ -219,9 +207,9 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
      */
     private String getExecLocalPath(TaskExecutionContext taskExecutionContext) 
{
         return 
FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
-                taskExecutionContext.getProcessDefineCode(),
-                taskExecutionContext.getProcessDefineVersion(),
-                taskExecutionContext.getProcessInstanceId(),
-                taskExecutionContext.getTaskInstanceId());
+                                           
taskExecutionContext.getProcessDefineCode(),
+                                           
taskExecutionContext.getProcessDefineVersion(),
+                                           
taskExecutionContext.getProcessInstanceId(),
+                                           
taskExecutionContext.getTaskInstanceId());
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index ae735f4338..9c826e0d5a 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -143,7 +143,6 @@ public class WorkerManagerThread implements Runnable {
         while (Stopper.isRunning()) {
             try {
                 taskExecuteThread = workerExecuteQueue.take();
-                taskExecuteThread.setStorageOperate(storageOperate);
                 workerExecService.submit(taskExecuteThread);
             } catch (Exception e) {
                 logger.error("An unexpected interrupt is happened, "

Reply via email to