This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ea6503b [DS-6403][fix] Put DB queries ahead before submit to
TaskPriorityQueue (#6406)
ea6503b is described below
commit ea6503b99742ab72d99527edb3abdad398bee83e
Author: caishunfeng <[email protected]>
AuthorDate: Wed Sep 29 14:04:35 2021 +0800
[DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue
(#6406)
* [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue
* [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue
* [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue
* checkstyle
Co-authored-by: caishunfeng <[email protected]>
---
.../builder/TaskExecutionContextBuilder.java | 4 +-
.../master/cache/TaskInstanceCacheManager.java | 2 +-
.../cache/impl/TaskInstanceCacheManagerImpl.java | 2 +-
.../master/consumer/TaskPriorityQueueConsumer.java | 259 +--------------------
.../master/registry/MasterRegistryClient.java | 3 +-
.../master/runner/task/BaseTaskProcessor.java | 259 +++++++++++++++++++++
.../master/runner/task/CommonTaskProcessor.java | 10 +-
.../master/runner/task/ConditionTaskProcessor.java | 2 -
.../master/runner/task/DependentTaskProcessor.java | 1 -
.../master/runner/task/SubTaskProcessor.java | 4 -
.../master/runner/task/SwitchTaskProcessor.java | 2 -
.../dolphinscheduler/server/utils/LogUtils.java | 2 +-
.../server/utils/ProcessUtils.java | 2 +-
.../worker/processor/TaskExecuteProcessor.java | 2 +-
.../server/worker/processor/TaskKillProcessor.java | 2 +-
.../server/worker/runner/TaskExecuteThread.java | 2 +-
.../server/worker/runner/WorkerManagerThread.java | 2 +-
.../impl/TaskInstanceCacheManagerImplTest.java | 2 +-
.../consumer/TaskPriorityQueueConsumerTest.java | 114 ---------
.../executor/NettyExecutorManagerTest.java | 2 +-
.../runner/task/CommonTaskProcessorTest.java | 166 +++++++++++++
.../server/utils/ExecutionContextTestUtils.java | 3 +-
.../server/utils/LogUtilsTest.java | 2 +-
.../worker/processor/TaskExecuteProcessorTest.java | 2 +-
.../service/process/ProcessService.java | 21 +-
.../service/queue/TaskPriority.java | 15 ++
.../entity/DependenceTaskExecutionContext.java | 12 +-
.../queue}/entity/TaskExecutionContext.java | 6 +-
28 files changed, 490 insertions(+), 415 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 667bdef..afc9b9a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -25,9 +25,9 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
index 031d8b2..1388c5b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
/**
* task instance state manager
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index 43632ae..dd2d6eb 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -23,9 +23,9 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Map;
import java.util.Map.Entry;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index b06ad37..3ccb827 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -18,32 +18,8 @@
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.ResourceType;
-import org.apache.dolphinscheduler.common.enums.SqoopJobType;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.enums.UdfType;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
-import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
-import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
-import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
-import
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
-import
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.EnumUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.entity.UdfFunc;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -52,22 +28,12 @@ import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
-import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
-import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
-import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
-
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import javax.annotation.PostConstruct;
@@ -161,11 +127,10 @@ public class TaskPriorityQueueConsumer extends Thread {
protected boolean dispatch(TaskPriority taskPriority) {
boolean result = false;
try {
- int taskInstanceId = taskPriority.getTaskId();
- TaskExecutionContext context =
getTaskExecutionContext(taskInstanceId);
+ TaskExecutionContext context =
taskPriority.getTaskExecutionContext();
ExecutionContext executionContext = new
ExecutionContext(context.toCommand(), ExecutorType.WORKER,
context.getWorkerGroup());
- if (taskInstanceIsFinalState(taskInstanceId)) {
+ if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to
dispatch anymore
return true;
} else {
@@ -188,222 +153,4 @@ public class TaskPriorityQueueConsumer extends Thread {
TaskInstance taskInstance =
processService.findTaskInstanceById(taskInstanceId);
return taskInstance.getState().typeIsFinished();
}
-
- /**
- * get TaskExecutionContext
- *
- * @param taskInstanceId taskInstanceId
- * @return TaskExecutionContext
- */
- protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId)
{
- TaskInstance taskInstance =
processService.getTaskInstanceDetailByTaskId(taskInstanceId);
-
- int userId = taskInstance.getProcessDefine() == null ? 0 :
taskInstance.getProcessDefine().getUserId();
- Tenant tenant =
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
userId);
-
- // verify tenant is null
- if (verifyTenantIsNull(tenant, taskInstance)) {
- processService.changeTaskState(taskInstance,
ExecutionStatus.FAILURE,
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- null,
- null,
- taskInstance.getId());
- return null;
- }
- // set queue for process instance, user-specified queue takes
precedence over tenant queue
- String userQueue =
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
-
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ?
tenant.getQueue() : userQueue);
-
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
- taskInstance.setResources(getResourceFullNames(taskInstance));
-
- SQLTaskExecutionContext sqlTaskExecutionContext = new
SQLTaskExecutionContext();
- DataxTaskExecutionContext dataxTaskExecutionContext = new
DataxTaskExecutionContext();
- ProcedureTaskExecutionContext procedureTaskExecutionContext = new
ProcedureTaskExecutionContext();
- SqoopTaskExecutionContext sqoopTaskExecutionContext = new
SqoopTaskExecutionContext();
-
- // SQL task
- if
(TaskType.SQL.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
- setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
- }
-
- // DATAX task
- if
(TaskType.DATAX.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
- setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
- }
-
- // procedure task
- if
(TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
- setProcedureTaskRelation(procedureTaskExecutionContext,
taskInstance);
- }
-
- if
(TaskType.SQOOP.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
- setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
- }
-
- return TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
-
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
-
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
- .buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
- .buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
- .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
- .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
- .create();
- }
-
- /**
- * set procedure task relation
- *
- * @param procedureTaskExecutionContext procedureTaskExecutionContext
- * @param taskInstance taskInstance
- */
- private void setProcedureTaskRelation(ProcedureTaskExecutionContext
procedureTaskExecutionContext, TaskInstance taskInstance) {
- ProcedureParameters procedureParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
- int datasourceId = procedureParameters.getDatasource();
- DataSource datasource =
processService.findDataSourceById(datasourceId);
-
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
- }
-
- /**
- * set datax task relation
- *
- * @param dataxTaskExecutionContext dataxTaskExecutionContext
- * @param taskInstance taskInstance
- */
- protected void setDataxTaskRelation(DataxTaskExecutionContext
dataxTaskExecutionContext, TaskInstance taskInstance) {
- DataxParameters dataxParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
-
- DataSource dbSource =
processService.findDataSourceById(dataxParameters.getDataSource());
- DataSource dbTarget =
processService.findDataSourceById(dataxParameters.getDataTarget());
-
- if (dbSource != null) {
-
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
-
dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
-
dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
- }
-
- if (dbTarget != null) {
-
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
-
dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
-
dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
- }
- }
-
- /**
- * set sqoop task relation
- *
- * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
- * @param taskInstance taskInstance
- */
- private void setSqoopTaskRelation(SqoopTaskExecutionContext
sqoopTaskExecutionContext, TaskInstance taskInstance) {
- SqoopParameters sqoopParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
-
- // sqoop job type is template set task relation
- if
(sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
- SourceMysqlParameter sourceMysqlParameter =
JSONUtils.parseObject(sqoopParameters.getSourceParams(),
SourceMysqlParameter.class);
- TargetMysqlParameter targetMysqlParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),
TargetMysqlParameter.class);
-
- DataSource dataSource =
processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
- DataSource dataTarget =
processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
-
- if (dataSource != null) {
- sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
-
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
-
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
- }
-
- if (dataTarget != null) {
- sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
-
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
-
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
- }
- }
- }
-
- /**
- * set SQL task relation
- *
- * @param sqlTaskExecutionContext sqlTaskExecutionContext
- * @param taskInstance taskInstance
- */
- private void setSQLTaskRelation(SQLTaskExecutionContext
sqlTaskExecutionContext, TaskInstance taskInstance) {
- SqlParameters sqlParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
- int datasourceId = sqlParameters.getDatasource();
- DataSource datasource =
processService.findDataSourceById(datasourceId);
-
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
-
- // whether udf type
- boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class,
sqlParameters.getType())
- && !StringUtils.isEmpty(sqlParameters.getUdfs());
-
- if (udfTypeFlag) {
- String[] udfFunIds = sqlParameters.getUdfs().split(",");
- int[] udfFunIdsArray = new int[udfFunIds.length];
- for (int i = 0; i < udfFunIds.length; i++) {
- udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
- }
-
- List<UdfFunc> udfFuncList =
processService.queryUdfFunListByIds(udfFunIdsArray);
- UdfFuncRequest udfFuncRequest;
- Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
- for (UdfFunc udfFunc : udfFuncList) {
- udfFuncRequest =
JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
- String tenantCode =
processService.queryTenantCodeByResName(udfFunc.getResourceName(),
ResourceType.UDF);
- udfFuncRequestMap.put(udfFuncRequest, tenantCode);
- }
- sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
- }
- }
-
- /**
- * whehter tenant is null
- *
- * @param tenant tenant
- * @param taskInstance taskInstance
- * @return result
- */
- protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance
taskInstance) {
- if (tenant == null) {
- logger.error("tenant not exists,process instance id : {},task
instance id : {}",
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId());
- return true;
- }
- return false;
- }
-
- /**
- * get resource map key is full name and value is tenantCode
- */
- protected Map<String, String> getResourceFullNames(TaskInstance
taskInstance) {
- Map<String, String> resourcesMap = new HashMap<>();
- AbstractParameters baseParam =
TaskParametersUtils.getParameters(taskInstance.getTaskType(),
taskInstance.getTaskParams());
-
- if (baseParam != null) {
- List<ResourceInfo> projectResourceFiles =
baseParam.getResourceFilesList();
- if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
-
- // filter the resources that the resource id equals 0
- Set<ResourceInfo> oldVersionResources =
projectResourceFiles.stream().filter(t -> t.getId() ==
0).collect(Collectors.toSet());
- if (CollectionUtils.isNotEmpty(oldVersionResources)) {
- oldVersionResources.forEach(t ->
resourcesMap.put(t.getRes(),
processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
- }
-
- // get the resource id in order to get the resource names in
batch
- Stream<Integer> resourceIdStream =
projectResourceFiles.stream().map(ResourceInfo::getId);
- Set<Integer> resourceIdsSet =
resourceIdStream.collect(Collectors.toSet());
-
- if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
- Integer[] resourceIds = resourceIdsSet.toArray(new
Integer[resourceIdsSet.size()]);
-
- List<Resource> resources =
processService.listResourceByIds(resourceIds);
- resources.forEach(t -> resourcesMap.put(t.getFullName(),
processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
- }
- }
- }
-
- return resourcesMap;
- }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 73aaef2..22de8e7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -35,12 +35,12 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
@@ -357,7 +357,6 @@ public class MasterRegistryClient {
registryClient.releaseLock(registryClient.getMasterLockPath());
}
-
/**
* registry
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 7ffbd9b..e3a6c07 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -17,8 +17,47 @@
package org.apache.dolphinscheduler.server.master.runner.task;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
+import org.apache.dolphinscheduler.common.enums.SqoopJobType;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.UdfType;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
+import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
+import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
+import
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
+import
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.EnumUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +76,8 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
protected ProcessInstance processInstance;
+ protected ProcessService processService =
SpringApplicationContext.getBean(ProcessService.class);
+
/**
* pause task, common tasks donot need this.
*
@@ -109,4 +150,222 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
public String getType() {
return null;
}
+
+ /**
+ * get TaskExecutionContext
+ *
+ * @param taskInstance taskInstance
+ * @return TaskExecutionContext
+ */
+ protected TaskExecutionContext getTaskExecutionContext(TaskInstance
taskInstance) {
+ processService.setTaskInstanceDetail(taskInstance);
+
+ int userId = taskInstance.getProcessDefine() == null ? 0 :
taskInstance.getProcessDefine().getUserId();
+ Tenant tenant =
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
userId);
+
+ // verify tenant is null
+ if (verifyTenantIsNull(tenant, taskInstance)) {
+ processService.changeTaskState(taskInstance,
ExecutionStatus.FAILURE,
+ taskInstance.getStartTime(),
+ taskInstance.getHost(),
+ null,
+ null,
+ taskInstance.getId());
+ return null;
+ }
+ // set queue for process instance, user-specified queue takes
precedence over tenant queue
+ String userQueue =
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ?
tenant.getQueue() : userQueue);
+
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+ taskInstance.setResources(getResourceFullNames(taskInstance));
+
+ SQLTaskExecutionContext sqlTaskExecutionContext = new
SQLTaskExecutionContext();
+ DataxTaskExecutionContext dataxTaskExecutionContext = new
DataxTaskExecutionContext();
+ ProcedureTaskExecutionContext procedureTaskExecutionContext = new
ProcedureTaskExecutionContext();
+ SqoopTaskExecutionContext sqoopTaskExecutionContext = new
SqoopTaskExecutionContext();
+
+ // SQL task
+ if
(TaskType.SQL.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
+ setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
+ }
+
+ // DATAX task
+ if
(TaskType.DATAX.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
+ setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
+ }
+
+ // procedure task
+ if
(TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
+ setProcedureTaskRelation(procedureTaskExecutionContext,
taskInstance);
+ }
+
+ if
(TaskType.SQOOP.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
+ setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
+ }
+
+ return TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
+
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
+
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+ .buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
+ .buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
+ .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
+ .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
+ .create();
+ }
+
+ /**
+ * set procedure task relation
+ *
+ * @param procedureTaskExecutionContext procedureTaskExecutionContext
+ * @param taskInstance taskInstance
+ */
+ private void setProcedureTaskRelation(ProcedureTaskExecutionContext
procedureTaskExecutionContext, TaskInstance taskInstance) {
+ ProcedureParameters procedureParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
+ int datasourceId = procedureParameters.getDatasource();
+ DataSource datasource =
processService.findDataSourceById(datasourceId);
+
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+ }
+
+ /**
+ * set datax task relation
+ *
+ * @param dataxTaskExecutionContext dataxTaskExecutionContext
+ * @param taskInstance taskInstance
+ */
+ protected void setDataxTaskRelation(DataxTaskExecutionContext
dataxTaskExecutionContext, TaskInstance taskInstance) {
+ DataxParameters dataxParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
+
+ DataSource dbSource =
processService.findDataSourceById(dataxParameters.getDataSource());
+ DataSource dbTarget =
processService.findDataSourceById(dataxParameters.getDataTarget());
+
+ if (dbSource != null) {
+
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
+
dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
+
dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
+ }
+
+ if (dbTarget != null) {
+
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
+
dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
+
dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
+ }
+ }
+
+ /**
+ * set sqoop task relation
+ *
+ * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
+ * @param taskInstance taskInstance
+ */
+ private void setSqoopTaskRelation(SqoopTaskExecutionContext
sqoopTaskExecutionContext, TaskInstance taskInstance) {
+ SqoopParameters sqoopParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
+
+ // sqoop job type is template set task relation
+ if
(sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
+ SourceMysqlParameter sourceMysqlParameter =
JSONUtils.parseObject(sqoopParameters.getSourceParams(),
SourceMysqlParameter.class);
+ TargetMysqlParameter targetMysqlParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),
TargetMysqlParameter.class);
+
+ DataSource dataSource =
processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
+ DataSource dataTarget =
processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
+
+ if (dataSource != null) {
+ sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
+
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+ }
+
+ if (dataTarget != null) {
+ sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
+
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+ }
+ }
+ }
+
+ /**
+ * set SQL task relation
+ *
+ * @param sqlTaskExecutionContext sqlTaskExecutionContext
+ * @param taskInstance taskInstance
+ */
+ private void setSQLTaskRelation(SQLTaskExecutionContext
sqlTaskExecutionContext, TaskInstance taskInstance) {
+ SqlParameters sqlParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
+ int datasourceId = sqlParameters.getDatasource();
+ DataSource datasource =
processService.findDataSourceById(datasourceId);
+
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+
+ // whether udf type
+ boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class,
sqlParameters.getType())
+ && !StringUtils.isEmpty(sqlParameters.getUdfs());
+
+ if (udfTypeFlag) {
+ String[] udfFunIds = sqlParameters.getUdfs().split(",");
+ int[] udfFunIdsArray = new int[udfFunIds.length];
+ for (int i = 0; i < udfFunIds.length; i++) {
+ udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
+ }
+
+ List<UdfFunc> udfFuncList =
processService.queryUdfFunListByIds(udfFunIdsArray);
+ UdfFuncRequest udfFuncRequest;
+ Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
+ for (UdfFunc udfFunc : udfFuncList) {
+ udfFuncRequest =
JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
+ String tenantCode =
processService.queryTenantCodeByResName(udfFunc.getResourceName(),
ResourceType.UDF);
+ udfFuncRequestMap.put(udfFuncRequest, tenantCode);
+ }
+ sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
+ }
+ }
+
+ /**
+ * whehter tenant is null
+ *
+ * @param tenant tenant
+ * @param taskInstance taskInstance
+ * @return result
+ */
+ protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance
taskInstance) {
+ if (tenant == null) {
+ logger.error("tenant not exists,process instance id : {},task
instance id : {}",
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * get resource map key is full name and value is tenantCode
+ */
+ protected Map<String, String> getResourceFullNames(TaskInstance
taskInstance) {
+ Map<String, String> resourcesMap = new HashMap<>();
+ AbstractParameters baseParam =
TaskParametersUtils.getParameters(taskInstance.getTaskType(),
taskInstance.getTaskParams());
+
+ if (baseParam != null) {
+ List<ResourceInfo> projectResourceFiles =
baseParam.getResourceFilesList();
+ if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
+
+ // filter the resources that the resource id equals 0
+ Set<ResourceInfo> oldVersionResources =
projectResourceFiles.stream().filter(t -> t.getId() ==
0).collect(Collectors.toSet());
+ if (CollectionUtils.isNotEmpty(oldVersionResources)) {
+ oldVersionResources.forEach(t ->
resourcesMap.put(t.getRes(),
processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
+ }
+
+ // get the resource id in order to get the resource names in
batch
+ Stream<Integer> resourceIdStream =
projectResourceFiles.stream().map(ResourceInfo::getId);
+ Set<Integer> resourceIdsSet =
resourceIdStream.collect(Collectors.toSet());
+
+ if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
+ Integer[] resourceIds = resourceIdsSet.toArray(new
Integer[resourceIdsSet.size()]);
+
+ List<Resource> resources =
processService.listResourceByIds(resourceIds);
+ resources.forEach(t -> resourcesMap.put(t.getFullName(),
processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
+ }
+ }
+ }
+
+ return resourcesMap;
+ }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 8590863..4296b85 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -30,10 +30,10 @@ import
org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.commons.lang.StringUtils;
@@ -62,8 +62,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
*/
protected Logger logger = LoggerFactory.getLogger(getClass());
- protected ProcessService processService =
SpringApplicationContext.getBean(ProcessService.class);
-
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance,
int maxRetryTimes, int commitInterval) {
this.processInstance = processInstance;
@@ -124,12 +122,16 @@ public class CommonTaskProcessor extends
BaseTaskProcessor {
TaskPriority taskPriority = new
TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(),
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
+
+ TaskExecutionContext taskExecutionContext =
getTaskExecutionContext(taskInstance);
+ taskPriority.setTaskExecutionContext(taskExecutionContext);
+
taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s",
taskInstance.getName()));
return true;
} catch (Exception e) {
logger.error("submit task Exception: ", e);
- logger.error("task error : %s",
JSONUtils.toJsonString(taskInstance));
+ logger.error("task error : {}",
JSONUtils.toJsonString(taskInstance));
return false;
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index fae7fc5..80d8eba 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
@@ -66,7 +65,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor
{
*/
private Map<String, ExecutionStatus> completeTaskList = new
ConcurrentHashMap<>();
- protected ProcessService processService =
SpringApplicationContext.getBean(ProcessService.class);
MasterConfig masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
private TaskDefinition taskDefinition;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index b6b9008..b26e641 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -73,7 +73,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor
{
ProcessInstance processInstance;
TaskDefinition taskDefinition;
- protected ProcessService processService =
SpringApplicationContext.getBean(ProcessService.class);
MasterConfig masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
boolean allDependentItemFinished;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index f0ac7d3..7a4be58 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -23,8 +23,6 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import java.util.concurrent.locks.Lock;
@@ -45,8 +43,6 @@ public class SubTaskProcessor extends BaseTaskProcessor {
*/
private final Lock runLock = new ReentrantLock();
- protected ProcessService processService =
SpringApplicationContext.getBean(ProcessService.class);
-
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance,
int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 87e79df..68189c6 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -32,7 +32,6 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils;
@@ -53,7 +52,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
TaskDefinition taskDefinition;
- protected ProcessService processService =
SpringApplicationContext.getBean(ProcessService.class);
MasterConfig masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
index e6893d0..bffb380 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.utils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.nio.file.Path;
import java.nio.file.Paths;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 8eb40b7..3ca9120 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.commons.lang.StringUtils;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3466326..c3720fe 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -33,7 +33,6 @@ import
org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -42,6 +41,7 @@ import
org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 101eb0f..c0ecd67 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -30,12 +30,12 @@ import
org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 383c3cd..04e4749 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
@@ -44,6 +43,7 @@ import
org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index edffc60..8319e01 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -23,11 +23,11 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
index 8dc3f80..f609845 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Calendar;
import java.util.Date;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 976e058..dd190f7 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -21,28 +21,21 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
-import java.util.ArrayList;
import java.util.Date;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@@ -328,113 +321,6 @@ public class TaskPriorityQueueConsumerTest {
}
@Test
- public void testGetTaskExecutionContext() throws Exception {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(1);
- taskInstance.setTaskType(TaskType.SHELL.getDesc());
- taskInstance.setProcessInstanceId(1);
- taskInstance.setState(ExecutionStatus.KILL);
- taskInstance.setProcessInstancePriority(Priority.MEDIUM);
- taskInstance.setWorkerGroup("NoWorkGroup");
- taskInstance.setExecutorId(2);
-
- ProcessInstance processInstance = new ProcessInstance();
- processInstance.setId(1);
- processInstance.setTenantId(1);
- processInstance.setCommandType(CommandType.START_PROCESS);
- taskInstance.setProcessInstance(processInstance);
- taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
-
- ProcessDefinition processDefinition = new ProcessDefinition();
- processDefinition.setUserId(2);
- processDefinition.setProjectCode(1L);
- taskInstance.setProcessDefine(processDefinition);
-
- TaskDefinition taskDefinition = new TaskDefinition();
- taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
- taskInstance.setTaskDefine(taskDefinition);
-
-
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
-
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
-
- TaskExecutionContext taskExecutionContext =
taskPriorityQueueConsumer.getTaskExecutionContext(1);
-
- Assert.assertNotNull(taskExecutionContext);
- }
-
- @Test
- public void testGetResourceFullNames() {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(1);
- taskInstance.setTaskType(TaskType.SHELL.getDesc());
- taskInstance.setProcessInstanceId(1);
- taskInstance.setState(ExecutionStatus.KILL);
- taskInstance.setProcessInstancePriority(Priority.MEDIUM);
- taskInstance.setWorkerGroup("NoWorkGroup");
- taskInstance.setExecutorId(2);
- // task node
-
- Map<String, String> map =
taskPriorityQueueConsumer.getResourceFullNames(taskInstance);
-
- List<Resource> resourcesList = new ArrayList<Resource>();
- Resource resource = new Resource();
- resource.setFileName("fileName");
- resourcesList.add(resource);
-
-
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new
Integer[]{123});
-
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(),
ResourceType.FILE);
- Assert.assertNotNull(map);
-
- }
-
- @Test
- public void testVerifyTenantIsNull() {
- Tenant tenant = null;
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(1);
- taskInstance.setTaskType(TaskType.SHELL.getDesc());
- taskInstance.setProcessInstanceId(1);
-
- ProcessInstance processInstance = new ProcessInstance();
- processInstance.setId(1);
- taskInstance.setProcessInstance(processInstance);
-
- boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,
taskInstance);
- Assert.assertTrue(res);
-
- tenant = new Tenant();
- tenant.setId(1);
- tenant.setTenantCode("journey");
- tenant.setDescription("journey");
- tenant.setQueueId(1);
- tenant.setCreateTime(new Date());
- tenant.setUpdateTime(new Date());
- res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,
taskInstance);
- Assert.assertFalse(res);
-
- }
-
- @Test
- public void testSetDataxTaskRelation() throws Exception {
-
- DataxTaskExecutionContext dataxTaskExecutionContext = new
DataxTaskExecutionContext();
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
- DataSource dataSource = new DataSource();
- dataSource.setId(1);
- dataSource.setConnectionParams("");
- dataSource.setType(DbType.MYSQL);
-
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
-
-
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,
taskInstance);
-
- Assert.assertEquals(1, dataxTaskExecutionContext.getDataSourceId());
- Assert.assertEquals(1, dataxTaskExecutionContext.getDataTargetId());
- }
-
- @Test
public void testRun() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index d6a4e59..7570788 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -26,11 +26,11 @@ import
org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.junit.Assert;
import org.junit.Ignore;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
new file mode 100644
index 0000000..c0aeb72
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.DbType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@Ignore
+public class CommonTaskProcessorTest {
+
+ @Autowired
+ private CommonTaskProcessor commonTaskProcessor;
+
+ @Autowired
+ private ProcessService processService;
+
+ @Test
+ public void testGetTaskExecutionContext() throws Exception {
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType(TaskType.SHELL.getDesc());
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectCode(1L);
+ taskInstance.setProcessDefine(processDefinition);
+
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
+ taskInstance.setTaskDefine(taskDefinition);
+
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ TaskExecutionContext taskExecutionContext =
commonTaskProcessor.getTaskExecutionContext(taskInstance);
+ Assert.assertNotNull(taskExecutionContext);
+ }
+
+ @Test
+ public void testGetResourceFullNames() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType(TaskType.SHELL.getDesc());
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+ // task node
+
+ Map<String, String> map =
commonTaskProcessor.getResourceFullNames(taskInstance);
+
+ List<Resource> resourcesList = new ArrayList<Resource>();
+ Resource resource = new Resource();
+ resource.setFileName("fileName");
+ resourcesList.add(resource);
+
+
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new
Integer[]{123});
+
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(),
ResourceType.FILE);
+ Assert.assertNotNull(map);
+
+ }
+
+ @Test
+ public void testVerifyTenantIsNull() {
+ Tenant tenant = null;
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType(TaskType.SHELL.getDesc());
+ taskInstance.setProcessInstanceId(1);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ taskInstance.setProcessInstance(processInstance);
+
+ boolean res = commonTaskProcessor.verifyTenantIsNull(tenant,
taskInstance);
+ Assert.assertTrue(res);
+
+ tenant = new Tenant();
+ tenant.setId(1);
+ tenant.setTenantCode("journey");
+ tenant.setDescription("journey");
+ tenant.setQueueId(1);
+ tenant.setCreateTime(new Date());
+ tenant.setUpdateTime(new Date());
+ res = commonTaskProcessor.verifyTenantIsNull(tenant, taskInstance);
+ Assert.assertFalse(res);
+
+ }
+
+ @Test
+ public void testSetDataxTaskRelation() throws Exception {
+
+ DataxTaskExecutionContext dataxTaskExecutionContext = new
DataxTaskExecutionContext();
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
+ DataSource dataSource = new DataSource();
+ dataSource.setId(1);
+ dataSource.setConnectionParams("");
+ dataSource.setType(DbType.MYSQL);
+
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
+
+ commonTaskProcessor.setDataxTaskRelation(dataxTaskExecutionContext,
taskInstance);
+
+ Assert.assertEquals(1, dataxTaskExecutionContext.getDataSourceId());
+ Assert.assertEquals(1, dataxTaskExecutionContext.getDataTargetId());
+ }
+}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
index 0894c1b..58edd9a 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
@@ -24,9 +24,10 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+
import org.mockito.Mockito;
/**
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
index bc64d0b..cf886f8 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.utils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.nio.file.Path;
import java.nio.file.Paths;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index efb6805..daee652 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -29,12 +29,12 @@ import
org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Date;
import java.util.concurrent.ExecutorService;
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c4fce3e..a5d1329 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -26,11 +26,9 @@ import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
-import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -129,6 +127,7 @@ import
org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.facebook.presto.jdbc.internal.guava.collect.Lists;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@@ -1535,19 +1534,29 @@ public class ProcessService {
if (taskInstance == null) {
return null;
}
+ setTaskInstanceDetail(taskInstance);
+ return taskInstance;
+ }
+
+ /**
+ * package task instanceļ¼associate processInstance and processDefine
+ *
+ * @param taskInstance taskInstance
+ * @return task instance
+ */
+ public void setTaskInstanceDetail(TaskInstance taskInstance) {
// get process instance
ProcessInstance processInstance =
findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine =
findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionVersion());
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
TaskDefinition taskDefinition =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
- taskInstance.getTaskCode(),
- taskInstance.getTaskDefinitionVersion());
+ taskInstance.getTaskCode(),
+ taskInstance.getTaskDefinitionVersion());
updateTaskDefinitionResources(taskDefinition);
taskInstance.setTaskDefine(taskDefinition);
- return taskInstance;
}
/**
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index a872f6d..d78fb98 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.service.queue;
+import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+
import java.util.Map;
import java.util.Objects;
@@ -46,6 +48,11 @@ public class TaskPriority implements
Comparable<TaskPriority> {
private int taskId;
/**
+ * taskExecutionContext
+ */
+ private TaskExecutionContext taskExecutionContext;
+
+ /**
* groupName
*/
private String groupName;
@@ -116,6 +123,14 @@ public class TaskPriority implements
Comparable<TaskPriority> {
this.context = context;
}
+ public TaskExecutionContext getTaskExecutionContext() {
+ return taskExecutionContext;
+ }
+
+ public void setTaskExecutionContext(TaskExecutionContext
taskExecutionContext) {
+ this.taskExecutionContext = taskExecutionContext;
+ }
+
@Override
public int compareTo(TaskPriority other) {
if (this.getProcessInstancePriority() >
other.getProcessInstancePriority()) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DependenceTaskExecutionContext.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/DependenceTaskExecutionContext.java
similarity index 83%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DependenceTaskExecutionContext.java
rename to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/DependenceTaskExecutionContext.java
index 953f294..bddaa4f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DependenceTaskExecutionContext.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/DependenceTaskExecutionContext.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.service.queue.entity;
import java.io.Serializable;
/**
- * master/worker task transport
+ * master/worker task transport
*/
-public class DependenceTaskExecutionContext implements Serializable{
+public class DependenceTaskExecutionContext implements Serializable {
private String dependence;
@@ -36,8 +36,8 @@ public class DependenceTaskExecutionContext implements
Serializable{
@Override
public String toString() {
- return "DependenceTaskExecutionContext{" +
- "dependence='" + dependence + '\'' +
- '}';
+ return "DependenceTaskExecutionContext{"
+ + "dependence='" + dependence + '\''
+ + '}';
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
similarity index 99%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
rename to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
index a3f5fe5..9f73d82 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.service.queue.entity;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import
org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import java.io.Serializable;
import java.util.Date;