This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ea6503b  [DS-6403][fix] Put DB queries ahead before submit to 
TaskPriorityQueue (#6406)
ea6503b is described below

commit ea6503b99742ab72d99527edb3abdad398bee83e
Author: caishunfeng <[email protected]>
AuthorDate: Wed Sep 29 14:04:35 2021 +0800

    [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue 
(#6406)
    
    * [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue
    
    * [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue
    
    * [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue
    
    * checkstyle
    
    Co-authored-by: caishunfeng <[email protected]>
---
 .../builder/TaskExecutionContextBuilder.java       |   4 +-
 .../master/cache/TaskInstanceCacheManager.java     |   2 +-
 .../cache/impl/TaskInstanceCacheManagerImpl.java   |   2 +-
 .../master/consumer/TaskPriorityQueueConsumer.java | 259 +--------------------
 .../master/registry/MasterRegistryClient.java      |   3 +-
 .../master/runner/task/BaseTaskProcessor.java      | 259 +++++++++++++++++++++
 .../master/runner/task/CommonTaskProcessor.java    |  10 +-
 .../master/runner/task/ConditionTaskProcessor.java |   2 -
 .../master/runner/task/DependentTaskProcessor.java |   1 -
 .../master/runner/task/SubTaskProcessor.java       |   4 -
 .../master/runner/task/SwitchTaskProcessor.java    |   2 -
 .../dolphinscheduler/server/utils/LogUtils.java    |   2 +-
 .../server/utils/ProcessUtils.java                 |   2 +-
 .../worker/processor/TaskExecuteProcessor.java     |   2 +-
 .../server/worker/processor/TaskKillProcessor.java |   2 +-
 .../server/worker/runner/TaskExecuteThread.java    |   2 +-
 .../server/worker/runner/WorkerManagerThread.java  |   2 +-
 .../impl/TaskInstanceCacheManagerImplTest.java     |   2 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    | 114 ---------
 .../executor/NettyExecutorManagerTest.java         |   2 +-
 .../runner/task/CommonTaskProcessorTest.java       | 166 +++++++++++++
 .../server/utils/ExecutionContextTestUtils.java    |   3 +-
 .../server/utils/LogUtilsTest.java                 |   2 +-
 .../worker/processor/TaskExecuteProcessorTest.java |   2 +-
 .../service/process/ProcessService.java            |  21 +-
 .../service/queue/TaskPriority.java                |  15 ++
 .../entity/DependenceTaskExecutionContext.java     |  12 +-
 .../queue}/entity/TaskExecutionContext.java        |   6 +-
 28 files changed, 490 insertions(+), 415 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 667bdef..afc9b9a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -25,9 +25,9 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import 
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import 
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
index 031d8b2..1388c5b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.cache;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 /**
  *  task instance state manager
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index 43632ae..dd2d6eb 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -23,9 +23,9 @@ import 
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import java.util.Map;
 import java.util.Map.Entry;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index b06ad37..3ccb827 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -18,32 +18,8 @@
 package org.apache.dolphinscheduler.server.master.consumer;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.ResourceType;
-import org.apache.dolphinscheduler.common.enums.SqoopJobType;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.enums.UdfType;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
-import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
-import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
-import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
-import 
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
-import 
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.EnumUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.entity.UdfFunc;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import 
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -52,22 +28,12 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
-import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
-import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
-import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
-
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import javax.annotation.PostConstruct;
 
@@ -161,11 +127,10 @@ public class TaskPriorityQueueConsumer extends Thread {
     protected boolean dispatch(TaskPriority taskPriority) {
         boolean result = false;
         try {
-            int taskInstanceId = taskPriority.getTaskId();
-            TaskExecutionContext context = 
getTaskExecutionContext(taskInstanceId);
+            TaskExecutionContext context = 
taskPriority.getTaskExecutionContext();
             ExecutionContext executionContext = new 
ExecutionContext(context.toCommand(), ExecutorType.WORKER, 
context.getWorkerGroup());
 
-            if (taskInstanceIsFinalState(taskInstanceId)) {
+            if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
                 // when task finish, ignore this task, there is no need to 
dispatch anymore
                 return true;
             } else {
@@ -188,222 +153,4 @@ public class TaskPriorityQueueConsumer extends Thread {
         TaskInstance taskInstance = 
processService.findTaskInstanceById(taskInstanceId);
         return taskInstance.getState().typeIsFinished();
     }
-
-    /**
-     * get TaskExecutionContext
-     *
-     * @param taskInstanceId taskInstanceId
-     * @return TaskExecutionContext
-     */
-    protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId) 
{
-        TaskInstance taskInstance = 
processService.getTaskInstanceDetailByTaskId(taskInstanceId);
-
-        int userId = taskInstance.getProcessDefine() == null ? 0 : 
taskInstance.getProcessDefine().getUserId();
-        Tenant tenant = 
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
 userId);
-
-        // verify tenant is null
-        if (verifyTenantIsNull(tenant, taskInstance)) {
-            processService.changeTaskState(taskInstance, 
ExecutionStatus.FAILURE,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    null,
-                    null,
-                    taskInstance.getId());
-            return null;
-        }
-        // set queue for process instance, user-specified queue takes 
precedence over tenant queue
-        String userQueue = 
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
-        
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? 
tenant.getQueue() : userQueue);
-        
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
-        taskInstance.setResources(getResourceFullNames(taskInstance));
-
-        SQLTaskExecutionContext sqlTaskExecutionContext = new 
SQLTaskExecutionContext();
-        DataxTaskExecutionContext dataxTaskExecutionContext = new 
DataxTaskExecutionContext();
-        ProcedureTaskExecutionContext procedureTaskExecutionContext = new 
ProcedureTaskExecutionContext();
-        SqoopTaskExecutionContext sqoopTaskExecutionContext = new 
SqoopTaskExecutionContext();
-
-        // SQL task
-        if 
(TaskType.SQL.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
-            setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
-        }
-
-        // DATAX task
-        if 
(TaskType.DATAX.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
-            setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
-        }
-
-        // procedure task
-        if 
(TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
-            setProcedureTaskRelation(procedureTaskExecutionContext, 
taskInstance);
-        }
-
-        if 
(TaskType.SQOOP.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
-            setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
-        }
-
-        return TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
-                
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
-                
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
-                .buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
-                .buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
-                .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
-                .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
-                .create();
-    }
-
-    /**
-     * set procedure task relation
-     *
-     * @param procedureTaskExecutionContext procedureTaskExecutionContext
-     * @param taskInstance taskInstance
-     */
-    private void setProcedureTaskRelation(ProcedureTaskExecutionContext 
procedureTaskExecutionContext, TaskInstance taskInstance) {
-        ProcedureParameters procedureParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
-        int datasourceId = procedureParameters.getDatasource();
-        DataSource datasource = 
processService.findDataSourceById(datasourceId);
-        
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
-    }
-
-    /**
-     * set datax task relation
-     *
-     * @param dataxTaskExecutionContext dataxTaskExecutionContext
-     * @param taskInstance taskInstance
-     */
-    protected void setDataxTaskRelation(DataxTaskExecutionContext 
dataxTaskExecutionContext, TaskInstance taskInstance) {
-        DataxParameters dataxParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
-
-        DataSource dbSource = 
processService.findDataSourceById(dataxParameters.getDataSource());
-        DataSource dbTarget = 
processService.findDataSourceById(dataxParameters.getDataTarget());
-
-        if (dbSource != null) {
-            
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
-            
dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
-            
dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
-        }
-
-        if (dbTarget != null) {
-            
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
-            
dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
-            
dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
-        }
-    }
-
-    /**
-     * set sqoop task relation
-     *
-     * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
-     * @param taskInstance taskInstance
-     */
-    private void setSqoopTaskRelation(SqoopTaskExecutionContext 
sqoopTaskExecutionContext, TaskInstance taskInstance) {
-        SqoopParameters sqoopParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
-
-        // sqoop job type is template set task relation
-        if 
(sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
-            SourceMysqlParameter sourceMysqlParameter = 
JSONUtils.parseObject(sqoopParameters.getSourceParams(), 
SourceMysqlParameter.class);
-            TargetMysqlParameter targetMysqlParameter = 
JSONUtils.parseObject(sqoopParameters.getTargetParams(), 
TargetMysqlParameter.class);
-
-            DataSource dataSource = 
processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
-            DataSource dataTarget = 
processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
-
-            if (dataSource != null) {
-                sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
-                
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
-                
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
-            }
-
-            if (dataTarget != null) {
-                sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
-                
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
-                
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
-            }
-        }
-    }
-
-    /**
-     * set SQL task relation
-     *
-     * @param sqlTaskExecutionContext sqlTaskExecutionContext
-     * @param taskInstance taskInstance
-     */
-    private void setSQLTaskRelation(SQLTaskExecutionContext 
sqlTaskExecutionContext, TaskInstance taskInstance) {
-        SqlParameters sqlParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
-        int datasourceId = sqlParameters.getDatasource();
-        DataSource datasource = 
processService.findDataSourceById(datasourceId);
-        
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
-
-        // whether udf type
-        boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, 
sqlParameters.getType())
-                && !StringUtils.isEmpty(sqlParameters.getUdfs());
-
-        if (udfTypeFlag) {
-            String[] udfFunIds = sqlParameters.getUdfs().split(",");
-            int[] udfFunIdsArray = new int[udfFunIds.length];
-            for (int i = 0; i < udfFunIds.length; i++) {
-                udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
-            }
-
-            List<UdfFunc> udfFuncList = 
processService.queryUdfFunListByIds(udfFunIdsArray);
-            UdfFuncRequest udfFuncRequest;
-            Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
-            for (UdfFunc udfFunc : udfFuncList) {
-                udfFuncRequest = 
JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
-                String tenantCode = 
processService.queryTenantCodeByResName(udfFunc.getResourceName(), 
ResourceType.UDF);
-                udfFuncRequestMap.put(udfFuncRequest, tenantCode);
-            }
-            sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
-        }
-    }
-
-    /**
-     * whehter tenant is null
-     *
-     * @param tenant tenant
-     * @param taskInstance taskInstance
-     * @return result
-     */
-    protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance 
taskInstance) {
-        if (tenant == null) {
-            logger.error("tenant not exists,process instance id : {},task 
instance id : {}",
-                    taskInstance.getProcessInstance().getId(),
-                    taskInstance.getId());
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * get resource map key is full name and value is tenantCode
-     */
-    protected Map<String, String> getResourceFullNames(TaskInstance 
taskInstance) {
-        Map<String, String> resourcesMap = new HashMap<>();
-        AbstractParameters baseParam = 
TaskParametersUtils.getParameters(taskInstance.getTaskType(), 
taskInstance.getTaskParams());
-
-        if (baseParam != null) {
-            List<ResourceInfo> projectResourceFiles = 
baseParam.getResourceFilesList();
-            if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
-
-                // filter the resources that the resource id equals 0
-                Set<ResourceInfo> oldVersionResources = 
projectResourceFiles.stream().filter(t -> t.getId() == 
0).collect(Collectors.toSet());
-                if (CollectionUtils.isNotEmpty(oldVersionResources)) {
-                    oldVersionResources.forEach(t -> 
resourcesMap.put(t.getRes(), 
processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
-                }
-
-                // get the resource id in order to get the resource names in 
batch
-                Stream<Integer> resourceIdStream = 
projectResourceFiles.stream().map(ResourceInfo::getId);
-                Set<Integer> resourceIdsSet = 
resourceIdStream.collect(Collectors.toSet());
-
-                if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
-                    Integer[] resourceIds = resourceIdsSet.toArray(new 
Integer[resourceIdsSet.size()]);
-
-                    List<Resource> resources = 
processService.listResourceByIds(resourceIds);
-                    resources.forEach(t -> resourcesMap.put(t.getFullName(), 
processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
-                }
-            }
-        }
-
-        return resourcesMap;
-    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 73aaef2..22de8e7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -35,12 +35,12 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
 import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
@@ -357,7 +357,6 @@ public class MasterRegistryClient {
         registryClient.releaseLock(registryClient.getMasterLockPath());
     }
 
-
     /**
      * registry
      */
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 7ffbd9b..e3a6c07 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -17,8 +17,47 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
+import org.apache.dolphinscheduler.common.enums.SqoopJobType;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.UdfType;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
+import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
+import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
+import 
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
+import 
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.EnumUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import 
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +76,8 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
 
     protected ProcessInstance processInstance;
 
+    protected ProcessService processService = 
SpringApplicationContext.getBean(ProcessService.class);
+
     /**
      * pause task, common tasks donot need this.
      *
@@ -109,4 +150,222 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
     public String getType() {
         return null;
     }
+
+    /**
+     * get TaskExecutionContext
+     *
+     * @param taskInstance taskInstance
+     * @return TaskExecutionContext
+     */
+    protected TaskExecutionContext getTaskExecutionContext(TaskInstance 
taskInstance) {
+        processService.setTaskInstanceDetail(taskInstance);
+
+        int userId = taskInstance.getProcessDefine() == null ? 0 : 
taskInstance.getProcessDefine().getUserId();
+        Tenant tenant = 
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
 userId);
+
+        // verify tenant is null
+        if (verifyTenantIsNull(tenant, taskInstance)) {
+            processService.changeTaskState(taskInstance, 
ExecutionStatus.FAILURE,
+                    taskInstance.getStartTime(),
+                    taskInstance.getHost(),
+                    null,
+                    null,
+                    taskInstance.getId());
+            return null;
+        }
+        // set queue for process instance, user-specified queue takes 
precedence over tenant queue
+        String userQueue = 
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+        
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? 
tenant.getQueue() : userQueue);
+        
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+        taskInstance.setResources(getResourceFullNames(taskInstance));
+
+        SQLTaskExecutionContext sqlTaskExecutionContext = new 
SQLTaskExecutionContext();
+        DataxTaskExecutionContext dataxTaskExecutionContext = new 
DataxTaskExecutionContext();
+        ProcedureTaskExecutionContext procedureTaskExecutionContext = new 
ProcedureTaskExecutionContext();
+        SqoopTaskExecutionContext sqoopTaskExecutionContext = new 
SqoopTaskExecutionContext();
+
+        // SQL task
+        if 
(TaskType.SQL.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
+            setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
+        }
+
+        // DATAX task
+        if 
(TaskType.DATAX.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
+            setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
+        }
+
+        // procedure task
+        if 
(TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
+            setProcedureTaskRelation(procedureTaskExecutionContext, 
taskInstance);
+        }
+
+        if 
(TaskType.SQOOP.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
+            setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
+        }
+
+        return TaskExecutionContextBuilder.get()
+                .buildTaskInstanceRelatedInfo(taskInstance)
+                .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
+                
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
+                
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+                .buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
+                .buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
+                .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
+                .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
+                .create();
+    }
+
+    /**
+     * set procedure task relation
+     *
+     * @param procedureTaskExecutionContext procedureTaskExecutionContext
+     * @param taskInstance taskInstance
+     */
+    private void setProcedureTaskRelation(ProcedureTaskExecutionContext 
procedureTaskExecutionContext, TaskInstance taskInstance) {
+        ProcedureParameters procedureParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
+        int datasourceId = procedureParameters.getDatasource();
+        DataSource datasource = 
processService.findDataSourceById(datasourceId);
+        
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+    }
+
+    /**
+     * set datax task relation
+     *
+     * @param dataxTaskExecutionContext dataxTaskExecutionContext
+     * @param taskInstance taskInstance
+     */
+    protected void setDataxTaskRelation(DataxTaskExecutionContext 
dataxTaskExecutionContext, TaskInstance taskInstance) {
+        DataxParameters dataxParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
+
+        DataSource dbSource = 
processService.findDataSourceById(dataxParameters.getDataSource());
+        DataSource dbTarget = 
processService.findDataSourceById(dataxParameters.getDataTarget());
+
+        if (dbSource != null) {
+            
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
+            
dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
+            
dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
+        }
+
+        if (dbTarget != null) {
+            
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
+            
dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
+            
dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
+        }
+    }
+
+    /**
+     * set sqoop task relation
+     *
+     * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
+     * @param taskInstance taskInstance
+     */
+    private void setSqoopTaskRelation(SqoopTaskExecutionContext 
sqoopTaskExecutionContext, TaskInstance taskInstance) {
+        SqoopParameters sqoopParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
+
+        // sqoop job type is template set task relation
+        if 
(sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
+            SourceMysqlParameter sourceMysqlParameter = 
JSONUtils.parseObject(sqoopParameters.getSourceParams(), 
SourceMysqlParameter.class);
+            TargetMysqlParameter targetMysqlParameter = 
JSONUtils.parseObject(sqoopParameters.getTargetParams(), 
TargetMysqlParameter.class);
+
+            DataSource dataSource = 
processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
+            DataSource dataTarget = 
processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
+
+            if (dataSource != null) {
+                sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
+                
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+                
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+            }
+
+            if (dataTarget != null) {
+                sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
+                
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+                
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            }
+        }
+    }
+
+    /**
+     * set SQL task relation
+     *
+     * @param sqlTaskExecutionContext sqlTaskExecutionContext
+     * @param taskInstance taskInstance
+     */
+    private void setSQLTaskRelation(SQLTaskExecutionContext 
sqlTaskExecutionContext, TaskInstance taskInstance) {
+        SqlParameters sqlParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
+        int datasourceId = sqlParameters.getDatasource();
+        DataSource datasource = 
processService.findDataSourceById(datasourceId);
+        
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+
+        // whether udf type
+        boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, 
sqlParameters.getType())
+                && !StringUtils.isEmpty(sqlParameters.getUdfs());
+
+        if (udfTypeFlag) {
+            String[] udfFunIds = sqlParameters.getUdfs().split(",");
+            int[] udfFunIdsArray = new int[udfFunIds.length];
+            for (int i = 0; i < udfFunIds.length; i++) {
+                udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
+            }
+
+            List<UdfFunc> udfFuncList = 
processService.queryUdfFunListByIds(udfFunIdsArray);
+            UdfFuncRequest udfFuncRequest;
+            Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
+            for (UdfFunc udfFunc : udfFuncList) {
+                udfFuncRequest = 
JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
+                String tenantCode = 
processService.queryTenantCodeByResName(udfFunc.getResourceName(), 
ResourceType.UDF);
+                udfFuncRequestMap.put(udfFuncRequest, tenantCode);
+            }
+            sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
+        }
+    }
+
+    /**
+     * whehter tenant is null
+     *
+     * @param tenant tenant
+     * @param taskInstance taskInstance
+     * @return result
+     */
+    protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance 
taskInstance) {
+        if (tenant == null) {
+            logger.error("tenant not exists,process instance id : {},task 
instance id : {}",
+                    taskInstance.getProcessInstance().getId(),
+                    taskInstance.getId());
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * get resource map key is full name and value is tenantCode
+     */
+    protected Map<String, String> getResourceFullNames(TaskInstance 
taskInstance) {
+        Map<String, String> resourcesMap = new HashMap<>();
+        AbstractParameters baseParam = 
TaskParametersUtils.getParameters(taskInstance.getTaskType(), 
taskInstance.getTaskParams());
+
+        if (baseParam != null) {
+            List<ResourceInfo> projectResourceFiles = 
baseParam.getResourceFilesList();
+            if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
+
+                // filter the resources that the resource id equals 0
+                Set<ResourceInfo> oldVersionResources = 
projectResourceFiles.stream().filter(t -> t.getId() == 
0).collect(Collectors.toSet());
+                if (CollectionUtils.isNotEmpty(oldVersionResources)) {
+                    oldVersionResources.forEach(t -> 
resourcesMap.put(t.getRes(), 
processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
+                }
+
+                // get the resource id in order to get the resource names in 
batch
+                Stream<Integer> resourceIdStream = 
projectResourceFiles.stream().map(ResourceInfo::getId);
+                Set<Integer> resourceIdsSet = 
resourceIdStream.collect(Collectors.toSet());
+
+                if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
+                    Integer[] resourceIds = resourceIdsSet.toArray(new 
Integer[resourceIdsSet.size()]);
+
+                    List<Resource> resources = 
processService.listResourceByIds(resourceIds);
+                    resources.forEach(t -> resourcesMap.put(t.getFullName(), 
processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
+                }
+            }
+        }
+
+        return resourcesMap;
+    }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 8590863..4296b85 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -30,10 +30,10 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -62,8 +62,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
      */
     protected Logger logger = LoggerFactory.getLogger(getClass());
 
-    protected ProcessService processService = 
SpringApplicationContext.getBean(ProcessService.class);
-
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, 
int maxRetryTimes, int commitInterval) {
         this.processInstance = processInstance;
@@ -124,12 +122,16 @@ public class CommonTaskProcessor extends 
BaseTaskProcessor {
             TaskPriority taskPriority = new 
TaskPriority(processInstance.getProcessInstancePriority().getCode(),
                     processInstance.getId(), 
taskInstance.getProcessInstancePriority().getCode(),
                     taskInstance.getId(), 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
+
+            TaskExecutionContext taskExecutionContext = 
getTaskExecutionContext(taskInstance);
+            taskPriority.setTaskExecutionContext(taskExecutionContext);
+
             taskUpdateQueue.put(taskPriority);
             logger.info(String.format("master submit success, task : %s", 
taskInstance.getName()));
             return true;
         } catch (Exception e) {
             logger.error("submit task  Exception: ", e);
-            logger.error("task error : %s", 
JSONUtils.toJsonString(taskInstance));
+            logger.error("task error : {}", 
JSONUtils.toJsonString(taskInstance));
             return false;
         }
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index fae7fc5..80d8eba 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -66,7 +65,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor 
{
      */
     private Map<String, ExecutionStatus> completeTaskList = new 
ConcurrentHashMap<>();
 
-    protected ProcessService processService = 
SpringApplicationContext.getBean(ProcessService.class);
     MasterConfig masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
 
     private TaskDefinition taskDefinition;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index b6b9008..b26e641 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -73,7 +73,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor 
{
     ProcessInstance processInstance;
     TaskDefinition taskDefinition;
 
-    protected ProcessService processService = 
SpringApplicationContext.getBean(ProcessService.class);
     MasterConfig masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
 
     boolean allDependentItemFinished;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index f0ac7d3..7a4be58 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -23,8 +23,6 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.Date;
 import java.util.concurrent.locks.Lock;
@@ -45,8 +43,6 @@ public class SubTaskProcessor extends BaseTaskProcessor {
      */
     private final Lock runLock = new ReentrantLock();
 
-    protected ProcessService processService = 
SpringApplicationContext.getBean(ProcessService.class);
-
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, 
int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
         this.processInstance = processInstance;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 87e79df..68189c6 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -32,7 +32,6 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -53,7 +52,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     private ProcessInstance processInstance;
     TaskDefinition taskDefinition;
 
-    protected ProcessService processService = 
SpringApplicationContext.getBean(ProcessService.class);
     MasterConfig masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
index e6893d0..bffb380 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
@@ -17,8 +17,8 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 8eb40b7..3ca9120 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import org.apache.commons.lang.StringUtils;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3466326..c3720fe 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -33,7 +33,6 @@ import 
org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -42,6 +41,7 @@ import 
org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 101eb0f..c0ecd67 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -30,12 +30,12 @@ import 
org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.Pair;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 383c3cd..04e4749 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
@@ -44,6 +43,7 @@ import 
org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
 import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index edffc60..8319e01 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -23,11 +23,11 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
index 8dc3f80..f609845 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import java.util.Calendar;
 import java.util.Date;
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 976e058..dd190f7 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -21,28 +21,21 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
 
-import java.util.ArrayList;
 import java.util.Date;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.After;
@@ -328,113 +321,6 @@ public class TaskPriorityQueueConsumerTest {
     }
 
     @Test
-    public void testGetTaskExecutionContext() throws Exception {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1);
-        taskInstance.setTaskType(TaskType.SHELL.getDesc());
-        taskInstance.setProcessInstanceId(1);
-        taskInstance.setState(ExecutionStatus.KILL);
-        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
-        taskInstance.setWorkerGroup("NoWorkGroup");
-        taskInstance.setExecutorId(2);
-
-        ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setId(1);
-        processInstance.setTenantId(1);
-        processInstance.setCommandType(CommandType.START_PROCESS);
-        taskInstance.setProcessInstance(processInstance);
-        taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
-
-        ProcessDefinition processDefinition = new ProcessDefinition();
-        processDefinition.setUserId(2);
-        processDefinition.setProjectCode(1L);
-        taskInstance.setProcessDefine(processDefinition);
-
-        TaskDefinition taskDefinition = new TaskDefinition();
-        taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
-        taskInstance.setTaskDefine(taskDefinition);
-
-        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
-        
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
-
-        TaskExecutionContext taskExecutionContext = 
taskPriorityQueueConsumer.getTaskExecutionContext(1);
-
-        Assert.assertNotNull(taskExecutionContext);
-    }
-
-    @Test
-    public void testGetResourceFullNames() {
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1);
-        taskInstance.setTaskType(TaskType.SHELL.getDesc());
-        taskInstance.setProcessInstanceId(1);
-        taskInstance.setState(ExecutionStatus.KILL);
-        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
-        taskInstance.setWorkerGroup("NoWorkGroup");
-        taskInstance.setExecutorId(2);
-        // task node
-
-        Map<String, String> map = 
taskPriorityQueueConsumer.getResourceFullNames(taskInstance);
-
-        List<Resource> resourcesList = new ArrayList<Resource>();
-        Resource resource = new Resource();
-        resource.setFileName("fileName");
-        resourcesList.add(resource);
-
-        
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new 
Integer[]{123});
-        
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(),
 ResourceType.FILE);
-        Assert.assertNotNull(map);
-
-    }
-
-    @Test
-    public void testVerifyTenantIsNull() {
-        Tenant tenant = null;
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1);
-        taskInstance.setTaskType(TaskType.SHELL.getDesc());
-        taskInstance.setProcessInstanceId(1);
-
-        ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setId(1);
-        taskInstance.setProcessInstance(processInstance);
-
-        boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant, 
taskInstance);
-        Assert.assertTrue(res);
-
-        tenant = new Tenant();
-        tenant.setId(1);
-        tenant.setTenantCode("journey");
-        tenant.setDescription("journey");
-        tenant.setQueueId(1);
-        tenant.setCreateTime(new Date());
-        tenant.setUpdateTime(new Date());
-        res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant, 
taskInstance);
-        Assert.assertFalse(res);
-
-    }
-
-    @Test
-    public void testSetDataxTaskRelation() throws Exception {
-
-        DataxTaskExecutionContext dataxTaskExecutionContext = new 
DataxTaskExecutionContext();
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
-        DataSource dataSource = new DataSource();
-        dataSource.setId(1);
-        dataSource.setConnectionParams("");
-        dataSource.setType(DbType.MYSQL);
-        
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
-
-        
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext, 
taskInstance);
-
-        Assert.assertEquals(1, dataxTaskExecutionContext.getDataSourceId());
-        Assert.assertEquals(1, dataxTaskExecutionContext.getDataTargetId());
-    }
-
-    @Test
     public void testRun() throws Exception {
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setId(1);
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index d6a4e59..7570788 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -26,11 +26,11 @@ import 
org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import org.junit.Assert;
 import org.junit.Ignore;
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
new file mode 100644
index 0000000..c0aeb72
--- /dev/null
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.DbType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@Ignore
+public class CommonTaskProcessorTest {
+
+    @Autowired
+    private CommonTaskProcessor commonTaskProcessor;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Test
+    public void testGetTaskExecutionContext() throws Exception {
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType(TaskType.SHELL.getDesc());
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectCode(1L);
+        taskInstance.setProcessDefine(processDefinition);
+
+        TaskDefinition taskDefinition = new TaskDefinition();
+        taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
+        taskInstance.setTaskDefine(taskDefinition);
+
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        TaskExecutionContext taskExecutionContext = 
commonTaskProcessor.getTaskExecutionContext(taskInstance);
+        Assert.assertNotNull(taskExecutionContext);
+    }
+
+    @Test
+    public void testGetResourceFullNames() {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType(TaskType.SHELL.getDesc());
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+        // task node
+
+        Map<String, String> map = 
commonTaskProcessor.getResourceFullNames(taskInstance);
+
+        List<Resource> resourcesList = new ArrayList<Resource>();
+        Resource resource = new Resource();
+        resource.setFileName("fileName");
+        resourcesList.add(resource);
+
+        
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new 
Integer[]{123});
+        
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(),
 ResourceType.FILE);
+        Assert.assertNotNull(map);
+
+    }
+
+    @Test
+    public void testVerifyTenantIsNull() {
+        Tenant tenant = null;
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType(TaskType.SHELL.getDesc());
+        taskInstance.setProcessInstanceId(1);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        taskInstance.setProcessInstance(processInstance);
+
+        boolean res = commonTaskProcessor.verifyTenantIsNull(tenant, 
taskInstance);
+        Assert.assertTrue(res);
+
+        tenant = new Tenant();
+        tenant.setId(1);
+        tenant.setTenantCode("journey");
+        tenant.setDescription("journey");
+        tenant.setQueueId(1);
+        tenant.setCreateTime(new Date());
+        tenant.setUpdateTime(new Date());
+        res = commonTaskProcessor.verifyTenantIsNull(tenant, taskInstance);
+        Assert.assertFalse(res);
+
+    }
+
+    @Test
+    public void testSetDataxTaskRelation() throws Exception {
+
+        DataxTaskExecutionContext dataxTaskExecutionContext = new 
DataxTaskExecutionContext();
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
+        DataSource dataSource = new DataSource();
+        dataSource.setId(1);
+        dataSource.setConnectionParams("");
+        dataSource.setType(DbType.MYSQL);
+        
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
+
+        commonTaskProcessor.setDataxTaskRelation(dataxTaskExecutionContext, 
taskInstance);
+
+        Assert.assertEquals(1, dataxTaskExecutionContext.getDataSourceId());
+        Assert.assertEquals(1, dataxTaskExecutionContext.getDataTargetId());
+    }
+}
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
index 0894c1b..58edd9a 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
@@ -24,9 +24,10 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+
 import org.mockito.Mockito;
 
 /**
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
index bc64d0b..cf886f8 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index efb6805..daee652 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -29,12 +29,12 @@ import 
org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import java.util.Date;
 import java.util.concurrent.ExecutorService;
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c4fce3e..a5d1329 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -26,11 +26,9 @@ import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
 import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
 
 import static java.util.stream.Collectors.toSet;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -129,6 +127,7 @@ import 
org.springframework.transaction.annotation.Transactional;
 
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.facebook.presto.jdbc.internal.guava.collect.Lists;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
@@ -1535,19 +1534,29 @@ public class ProcessService {
         if (taskInstance == null) {
             return null;
         }
+        setTaskInstanceDetail(taskInstance);
+        return taskInstance;
+    }
+
+    /**
+     * package task instance,associate processInstance and processDefine
+     *
+     * @param taskInstance taskInstance
+     * @return task instance
+     */
+    public void setTaskInstanceDetail(TaskInstance taskInstance) {
         // get process instance
         ProcessInstance processInstance = 
findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
         // get process define
         ProcessDefinition processDefine = 
findProcessDefinition(processInstance.getProcessDefinitionCode(),
-            processInstance.getProcessDefinitionVersion());
+                processInstance.getProcessDefinitionVersion());
         taskInstance.setProcessInstance(processInstance);
         taskInstance.setProcessDefine(processDefine);
         TaskDefinition taskDefinition = 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
-            taskInstance.getTaskCode(),
-            taskInstance.getTaskDefinitionVersion());
+                taskInstance.getTaskCode(),
+                taskInstance.getTaskDefinitionVersion());
         updateTaskDefinitionResources(taskDefinition);
         taskInstance.setTaskDefine(taskDefinition);
-        return taskInstance;
     }
 
     /**
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index a872f6d..d78fb98 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.service.queue;
 
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+
 import java.util.Map;
 import java.util.Objects;
 
@@ -46,6 +48,11 @@ public class TaskPriority implements 
Comparable<TaskPriority> {
     private int taskId;
 
     /**
+     * taskExecutionContext
+     */
+    private TaskExecutionContext taskExecutionContext;
+
+    /**
      * groupName
      */
     private String groupName;
@@ -116,6 +123,14 @@ public class TaskPriority implements 
Comparable<TaskPriority> {
         this.context = context;
     }
 
+    public TaskExecutionContext getTaskExecutionContext() {
+        return taskExecutionContext;
+    }
+
+    public void setTaskExecutionContext(TaskExecutionContext 
taskExecutionContext) {
+        this.taskExecutionContext = taskExecutionContext;
+    }
+
     @Override
     public int compareTo(TaskPriority other) {
         if (this.getProcessInstancePriority() > 
other.getProcessInstancePriority()) {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DependenceTaskExecutionContext.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/DependenceTaskExecutionContext.java
similarity index 83%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DependenceTaskExecutionContext.java
rename to 
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/DependenceTaskExecutionContext.java
index 953f294..bddaa4f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DependenceTaskExecutionContext.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/DependenceTaskExecutionContext.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.service.queue.entity;
 
 import java.io.Serializable;
 
 /**
- *  master/worker task transport
+ * master/worker task transport
  */
-public class DependenceTaskExecutionContext implements Serializable{
+public class DependenceTaskExecutionContext implements Serializable {
 
     private String dependence;
 
@@ -36,8 +36,8 @@ public class DependenceTaskExecutionContext implements 
Serializable{
 
     @Override
     public String toString() {
-        return "DependenceTaskExecutionContext{" +
-                "dependence='" + dependence + '\'' +
-                '}';
+        return "DependenceTaskExecutionContext{"
+                + "dependence='" + dependence + '\''
+                + '}';
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
similarity index 99%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
rename to 
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
index a3f5fe5..9f73d82 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.service.queue.entity;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
 import 
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 
 import java.io.Serializable;
 import java.util.Date;

Reply via email to