This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 1.3.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.4-prepare by this push:
new 542ce2e [Feature-4138][Master]Cherry pick from dev to dispatch add
sleep when dispatch task to work error (#4211)
542ce2e is described below
commit 542ce2ef85ed56f4e1d9b9342fa78c24bf5b9194
Author: lgcareer <[email protected]>
AuthorDate: Sat Dec 12 09:48:39 2020 +0800
[Feature-4138][Master]Cherry pick from dev to dispatch add sleep when
dispatch task to work error (#4211)
* [Feature-4138][Master]Cherry pick from dev to dispatch add sleep when
dispatch task to work error
* [Feature-4138][Master]Cherry pick from dev to dispatch add sleep when
dispatch task to work error
Co-authored-by: BoYiZhang <[email protected]>
---
.../master/consumer/TaskPriorityQueueConsumer.java | 29 +-
.../consumer/TaskPriorityQueueConsumerTest.java | 454 +++++++++++++++++++--
2 files changed, 427 insertions(+), 56 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index eef59e6..59d2d47 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -19,11 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.ResourceType;
-import org.apache.dolphinscheduler.common.enums.SqoopJobType;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.enums.UdfType;
+import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -34,7 +30,6 @@ import
org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@@ -53,6 +48,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -118,8 +114,15 @@ public class TaskPriorityQueueConsumer extends Thread{
failedDispatchTasks.add(taskPriorityInfo);
}
}
- for(String dispatchFailedTask : failedDispatchTasks){
- taskPriorityQueue.put(dispatchFailedTask);
+ if (!failedDispatchTasks.isEmpty()) {
+ for (String dispatchFailedTask : failedDispatchTasks) {
+ taskPriorityQueue.put(dispatchFailedTask);
+ }
+ // If there are tasks in a cycle that cannot find the
worker group,
+ // sleep for 1 second
+ if (taskPriorityQueue.size() <=
failedDispatchTasks.size()) {
+
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+ }
}
}catch (Exception e){
logger.error("dispatcher task error",e);
@@ -134,7 +137,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskInstanceId taskInstanceId
* @return result
*/
- private boolean dispatch(int taskInstanceId){
+ protected boolean dispatch(int taskInstanceId) {
boolean result = false;
try {
TaskExecutionContext context =
getTaskExecutionContext(taskInstanceId);
@@ -255,8 +258,8 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskNode taskNode
*/
- private void setDataxTaskRelation(DataxTaskExecutionContext
dataxTaskExecutionContext, TaskNode taskNode) {
- DataxParameters dataxParameters =
JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
+ protected void setDataxTaskRelation(DataxTaskExecutionContext
dataxTaskExecutionContext, TaskNode taskNode) {
+ DataxParameters dataxParameters =
JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
DataSource dataSource =
processService.findDataSourceById(dataxParameters.getDataSource());
DataSource dataTarget =
processService.findDataSourceById(dataxParameters.getDataTarget());
@@ -371,8 +374,8 @@ public class TaskPriorityQueueConsumer extends Thread{
/**
* get resource map key is full name and value is tenantCode
*/
- private Map<String,String> getResourceFullNames(TaskNode taskNode) {
- Map<String,String> resourceMap = new HashMap<>();
+ protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
+ Map<String, String> resourceMap = new HashMap<>();
AbstractParameters baseParam =
TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) {
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index a95acf8..1ba823c 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master.consumer;
-import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.DbType;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@@ -34,6 +36,8 @@ import
org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,7 +46,11 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
@RunWith(SpringJUnit4ClassRunner.class)
@@ -65,23 +73,21 @@ public class TaskPriorityQueueConsumerTest {
private ExecutorDispatcher dispatcher;
@Before
- public void init(){
+ public void init() {
Tenant tenant = new Tenant();
tenant.setId(1);
tenant.setTenantCode("journey");
- tenant.setTenantName("journey");
tenant.setDescription("journey");
tenant.setQueueId(1);
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
-
Mockito.when(processService.getTenantForProcess(1,2)).thenReturn(tenant);
+ Mockito.doReturn(tenant).when(processService).getTenantForProcess(1,
2);
-
Mockito.when(processService.queryUserQueueByProcessInstanceId(1)).thenReturn("default");
+
Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
}
-
@Test
public void testSHELLTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@@ -90,12 +96,31 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeou
[...]
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,"
+ + "\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
@@ -105,12 +130,15 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
-
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
taskPriorityQueue.put("2_1_2_1_default");
- Thread.sleep(10000);
- }
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotNull(taskInstance);
+ }
@Test
public void testSQLTask() throws Exception {
@@ -120,7 +148,13 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select
id,name,ds,zodia(ds) from
t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"82519315
[...]
+
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\","
+ +
"\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\","
+ + "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from
t_journey_user\\\",\\\"preStatements\\\":[],"
+ +
"\\\"sqlType\\\":0,\\\"receivers\\\":\\\"[email protected]\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\","
+ +
"\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -134,8 +168,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
-
-
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default");
DataSource dataSource = new DataSource();
@@ -143,16 +176,20 @@ public class TaskPriorityQueueConsumerTest {
dataSource.setName("sqlDatasource");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
-
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ +
"\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ + "\"user\":\"root\","
+ + "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
-
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
+
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
- Thread.sleep(10000);
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotNull(taskInstance);
}
-
@Test
public void testDataxTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@@ -161,7 +198,26 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\
[...]
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ +
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ + "\"forbidden\":false,\"id\":\"tasks-97625\","
+ + "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\","
+ + "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\","
+ + " \\\"postStatements\\\":[],"
+ + " \\\"jobSpeedRecord\\\":1000,"
+ + " \\\"customConfig\\\":0,"
+ + " \\\"dtType\\\":\\\"MYSQL\\\","
+ + " \\\"dsType\\\":\\\"MYSQL\\\","
+ + " \\\"jobSpeedByte\\\":0,"
+ + " \\\"dataSource\\\":80,"
+ + " \\\"dataTarget\\\":80,"
+ + " \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\","
+ + " \\\"preStatements\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ +
"\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"DATAX\","
+ + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -175,27 +231,26 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
-
-
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default");
-
-
DataSource dataSource = new DataSource();
dataSource.setId(80);
dataSource.setName("datax");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
-
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ +
"\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ + "\"user\":\"root\","
+ + "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
-
-
Mockito.when(processService.findDataSourceById(80)).thenReturn(dataSource);
-
- Thread.sleep(10000);
+
Mockito.doReturn(dataSource).when(processService).findDataSourceById(80);
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotNull(taskInstance);
}
-
@Test
public void testSqoopTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@@ -204,7 +259,32 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\
[...]
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ + "\"forbidden\":false,\"id\":\"tasks-63634\","
+ + "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\","
+ + "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\","
+ + " \\\"targetType\\\":\\\"HDFS\\\","
+ + "
\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\","
+ + "
\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\","
+ + " \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\","
+ + " \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\","
+ + "
\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\","
+ + " \\\"modelType\\\":\\\"import\\\","
+ + "
\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\","
+ + "
\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\","
+ + " \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\","
+ + " \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\","
+ + " \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\","
+ + " \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\","
+ + "
\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\","
+ + " \\\"localParams\\\":[],\\\"concurrency\\\":1}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SQOOP\","
+ + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -218,43 +298,331 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
-
-
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default");
-
-
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSource.setName("datax");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
-
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ +
"\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ + "\"user\":\"root\","
+ + "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
- Thread.sleep(10000);
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotNull(taskInstance);
}
-
@Test
- public void testTaskInstanceIsFinalState(){
+ public void testTaskInstanceIsFinalState() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeou
[...]
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ +
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ + "\"forbidden\":false,\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
+
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
+ Assert.assertNotNull(state);
+ }
+
+ @Test
+ public void testNotFoundWorkerGroup() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectId(1);
+ taskInstance.setProcessDefine(processDefinition);
+
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+
+ TimeUnit.SECONDS.sleep(10);
+
+ Assert.assertNotNull(taskInstance);
+
+ }
+
+ @Test
+ public void testDispatch() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectId(1);
+ taskInstance.setProcessDefine(processDefinition);
+
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ boolean res = taskPriorityQueueConsumer.dispatch(1);
+
+ Assert.assertFalse(res);
+ }
+
+ @Test
+ public void testGetTaskExecutionContext() throws Exception {
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectId(1);
+ taskInstance.setProcessDefine(processDefinition);
+
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ TaskExecutionContext taskExecutionContext =
taskPriorityQueueConsumer.getTaskExecutionContext(1);
- Mockito.when(
processService.findTaskInstanceById(1)).thenReturn(taskInstance);
- taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
+
+ Assert.assertNotNull(taskExecutionContext);
+ }
+
+ @Test
+ public void testGetResourceFullNames() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+ // task node
+ TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(),
TaskNode.class);
+
+ Map<String, String> map =
taskPriorityQueueConsumer.getResourceFullNames(taskNode);
+
+ List<Resource> resourcesList = new ArrayList<Resource>();
+ Resource resource = new Resource();
+ resource.setFileName("fileName");
+ resourcesList.add(resource);
+
+
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new
Integer[]{123});
+
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(),
ResourceType.FILE);
+ Assert.assertNotNull(map);
+
}
+ @Test
+ public void testSetDataxTaskRelation() throws Exception {
+
+ DataxTaskExecutionContext dataxTaskExecutionContext = new
DataxTaskExecutionContext();
+ TaskNode taskNode = new TaskNode();
+ taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}");
+ DataSource dataSource = new DataSource();
+ dataSource.setId(1);
+ dataSource.setConnectionParams("");
+ dataSource.setType(DbType.MYSQL);
+
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
+
+
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode);
+
+ Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
+ Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectId(1);
+ taskInstance.setProcessDefine(processDefinition);
+
+
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+
+ taskPriorityQueueConsumer.run();
+
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotEquals(-1,taskPriorityQueue.size());
+
+ }
+
+ @After
+ public void close() {
+ Stopper.stop();
+ }
}