This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 1.3.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.4-prepare by this push:
     new 542ce2e  [Feature-4138][Master]Cherry pick from dev to dispatch add 
sleep when dispatch task to work error (#4211)
542ce2e is described below

commit 542ce2ef85ed56f4e1d9b9342fa78c24bf5b9194
Author: lgcareer <[email protected]>
AuthorDate: Sat Dec 12 09:48:39 2020 +0800

    [Feature-4138][Master]Cherry pick from dev to dispatch add sleep when 
dispatch task to work error (#4211)
    
    * [Feature-4138][Master]Cherry pick from dev to dispatch add sleep when 
dispatch task to work error
    
    * [Feature-4138][Master]Cherry pick from dev to dispatch add sleep when 
dispatch task to work error
    
    Co-authored-by: BoYiZhang <[email protected]>
---
 .../master/consumer/TaskPriorityQueueConsumer.java |  29 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    | 454 +++++++++++++++++++--
 2 files changed, 427 insertions(+), 56 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index eef59e6..59d2d47 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -19,11 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
 
 import com.alibaba.fastjson.JSONObject;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.ResourceType;
-import org.apache.dolphinscheduler.common.enums.SqoopJobType;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.enums.UdfType;
+import org.apache.dolphinscheduler.common.enums.*;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -34,7 +30,6 @@ import 
org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import 
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
 import 
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@@ -53,6 +48,7 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -118,8 +114,15 @@ public class TaskPriorityQueueConsumer extends Thread{
                         failedDispatchTasks.add(taskPriorityInfo);
                     }
                 }
-                for(String dispatchFailedTask : failedDispatchTasks){
-                    taskPriorityQueue.put(dispatchFailedTask);
+                if (!failedDispatchTasks.isEmpty()) {
+                    for (String dispatchFailedTask : failedDispatchTasks) {
+                        taskPriorityQueue.put(dispatchFailedTask);
+                    }
+                    // If there are tasks in a cycle that cannot find the 
worker group,
+                    // sleep for 1 second
+                    if (taskPriorityQueue.size() <= 
failedDispatchTasks.size()) {
+                        
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                    }
                 }
             }catch (Exception e){
                 logger.error("dispatcher task error",e);
@@ -134,7 +137,7 @@ public class TaskPriorityQueueConsumer extends Thread{
      * @param taskInstanceId taskInstanceId
      * @return result
      */
-    private boolean dispatch(int taskInstanceId){
+    protected boolean dispatch(int taskInstanceId) {
         boolean result = false;
         try {
             TaskExecutionContext context = 
getTaskExecutionContext(taskInstanceId);
@@ -255,8 +258,8 @@ public class TaskPriorityQueueConsumer extends Thread{
      * @param dataxTaskExecutionContext dataxTaskExecutionContext
      * @param taskNode taskNode
      */
-    private void setDataxTaskRelation(DataxTaskExecutionContext 
dataxTaskExecutionContext, TaskNode taskNode) {
-        DataxParameters dataxParameters = 
JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
+    protected void setDataxTaskRelation(DataxTaskExecutionContext 
dataxTaskExecutionContext, TaskNode taskNode) {
+        DataxParameters dataxParameters = 
JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
 
         DataSource dataSource = 
processService.findDataSourceById(dataxParameters.getDataSource());
         DataSource dataTarget = 
processService.findDataSourceById(dataxParameters.getDataTarget());
@@ -371,8 +374,8 @@ public class TaskPriorityQueueConsumer extends Thread{
     /**
      * get resource map key is full name and value is tenantCode
      */
-    private Map<String,String> getResourceFullNames(TaskNode taskNode) {
-        Map<String,String> resourceMap = new HashMap<>();
+    protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
+        Map<String, String> resourceMap = new HashMap<>();
         AbstractParameters baseParam = 
TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
 
         if (baseParam != null) {
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index a95acf8..1ba823c 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.dolphinscheduler.server.master.consumer;
 
-import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.DbType;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@@ -34,6 +36,8 @@ import 
org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,7 +46,11 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 
 @RunWith(SpringJUnit4ClassRunner.class)
@@ -65,23 +73,21 @@ public class TaskPriorityQueueConsumerTest {
     private ExecutorDispatcher dispatcher;
 
     @Before
-    public void init(){
+    public void init() {
 
         Tenant tenant = new Tenant();
         tenant.setId(1);
         tenant.setTenantCode("journey");
-        tenant.setTenantName("journey");
         tenant.setDescription("journey");
         tenant.setQueueId(1);
         tenant.setCreateTime(new Date());
         tenant.setUpdateTime(new Date());
 
-        
Mockito.when(processService.getTenantForProcess(1,2)).thenReturn(tenant);
+        Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 
2);
 
-        
Mockito.when(processService.queryUserQueueByProcessInstanceId(1)).thenReturn("default");
+        
Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
     }
 
-
     @Test
     public void testSHELLTask() throws Exception {
         TaskInstance taskInstance = new TaskInstance();
@@ -90,12 +96,31 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo
 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeou
 [...]
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,"
+                + "\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
 
         ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
         processInstance.setTenantId(1);
         processInstance.setCommandType(CommandType.START_PROCESS);
         taskInstance.setProcessInstance(processInstance);
@@ -105,12 +130,15 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setProjectId(1);
         taskInstance.setProcessDefine(processDefinition);
 
-        
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
         taskPriorityQueue.put("2_1_2_1_default");
 
-        Thread.sleep(10000);
-    }
+        TimeUnit.SECONDS.sleep(10);
 
+        Assert.assertNotNull(taskInstance);
+    }
 
     @Test
     public void testSQLTask() throws Exception {
@@ -120,7 +148,13 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select
 id,name,ds,zodia(ds) from 
t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"82519315
 [...]
+        
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\","
+                + 
"\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\","
+                + "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from 
t_journey_user\\\",\\\"preStatements\\\":[],"
+                + 
"\\\"sqlType\\\":0,\\\"receivers\\\":\\\"[email protected]\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\","
+                + 
"\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -134,8 +168,7 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         processDefinition.setProjectId(1);
         taskInstance.setProcessDefine(processDefinition);
-
-        
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         taskPriorityQueue.put("2_1_2_1_default");
 
         DataSource dataSource = new DataSource();
@@ -143,16 +176,20 @@ public class TaskPriorityQueueConsumerTest {
         dataSource.setName("sqlDatasource");
         dataSource.setType(DbType.MYSQL);
         dataSource.setUserId(2);
-        
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+        
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+                + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+                + 
"\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+                + "\"user\":\"root\","
+                + "\"password\":\"root@123\"}");
         dataSource.setCreateTime(new Date());
         dataSource.setUpdateTime(new Date());
 
-        
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
+        
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
 
-        Thread.sleep(10000);
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertNotNull(taskInstance);
     }
 
-
     @Test
     public void testDataxTask() throws Exception {
         TaskInstance taskInstance = new TaskInstance();
@@ -161,7 +198,26 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\
 [...]
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + 
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+                + "\"forbidden\":false,\"id\":\"tasks-97625\","
+                + "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\","
+                + "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\","
+                + "    \\\"postStatements\\\":[],"
+                + "    \\\"jobSpeedRecord\\\":1000,"
+                + "    \\\"customConfig\\\":0,"
+                + "    \\\"dtType\\\":\\\"MYSQL\\\","
+                + "    \\\"dsType\\\":\\\"MYSQL\\\","
+                + "    \\\"jobSpeedByte\\\":0,"
+                + "    \\\"dataSource\\\":80,"
+                + "    \\\"dataTarget\\\":80,"
+                + "    \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\","
+                + "    \\\"preStatements\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + 
"\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+                + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"DATAX\","
+                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -175,27 +231,26 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         processDefinition.setProjectId(1);
         taskInstance.setProcessDefine(processDefinition);
-
-        
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         taskPriorityQueue.put("2_1_2_1_default");
 
-
-
         DataSource dataSource = new DataSource();
         dataSource.setId(80);
         dataSource.setName("datax");
         dataSource.setType(DbType.MYSQL);
         dataSource.setUserId(2);
-        
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+        
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+                + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+                + 
"\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+                + "\"user\":\"root\","
+                + "\"password\":\"root@123\"}");
         dataSource.setCreateTime(new Date());
         dataSource.setUpdateTime(new Date());
-
-        
Mockito.when(processService.findDataSourceById(80)).thenReturn(dataSource);
-
-        Thread.sleep(10000);
+        
Mockito.doReturn(dataSource).when(processService).findDataSourceById(80);
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertNotNull(taskInstance);
     }
 
-
     @Test
     public void testSqoopTask() throws Exception {
         TaskInstance taskInstance = new TaskInstance();
@@ -204,7 +259,32 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\
 [...]
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+                + "\"forbidden\":false,\"id\":\"tasks-63634\","
+                + "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\","
+                + "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\","
+                + "    \\\"targetType\\\":\\\"HDFS\\\","
+                + "    
\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\","
+                + "        
\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\","
+                + "        \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\","
+                + "        \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\","
+                + "        
\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\","
+                + "    \\\"modelType\\\":\\\"import\\\","
+                + "    
\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\","
+                + "        
\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\","
+                + "        \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\","
+                + "        \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\","
+                + "        \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\","
+                + "        \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\","
+                + "        
\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\","
+                + "    \\\"localParams\\\":[],\\\"concurrency\\\":1}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SQOOP\","
+                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -218,43 +298,331 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         processDefinition.setProjectId(1);
         taskInstance.setProcessDefine(processDefinition);
-
-        
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         taskPriorityQueue.put("2_1_2_1_default");
 
-
-
         DataSource dataSource = new DataSource();
         dataSource.setId(1);
         dataSource.setName("datax");
         dataSource.setType(DbType.MYSQL);
         dataSource.setUserId(2);
-        
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+        
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+                + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+                + 
"\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+                + "\"user\":\"root\","
+                + "\"password\":\"root@123\"}");
         dataSource.setCreateTime(new Date());
         dataSource.setUpdateTime(new Date());
         
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
-        Thread.sleep(10000);
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertNotNull(taskInstance);
     }
 
-
     @Test
-    public void testTaskInstanceIsFinalState(){
+    public void testTaskInstanceIsFinalState() {
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setId(1);
         taskInstance.setTaskType("SHELL");
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo
 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeou
 [...]
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + 
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+                + "\"forbidden\":false,\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
 
+        
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
+        Assert.assertNotNull(state);
+    }
+
+    @Test
+    public void testNotFoundWorkerGroup() throws Exception {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectId(1);
+        taskInstance.setProcessDefine(processDefinition);
+
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+
+        TimeUnit.SECONDS.sleep(10);
+
+        Assert.assertNotNull(taskInstance);
+
+    }
+
+    @Test
+    public void testDispatch() throws Exception {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectId(1);
+        taskInstance.setProcessDefine(processDefinition);
+
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        boolean res  = taskPriorityQueueConsumer.dispatch(1);
+
+        Assert.assertFalse(res);
+    }
+
+    @Test
+    public void testGetTaskExecutionContext() throws Exception {
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectId(1);
+        taskInstance.setProcessDefine(processDefinition);
+
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        TaskExecutionContext taskExecutionContext  = 
taskPriorityQueueConsumer.getTaskExecutionContext(1);
 
-        Mockito.when( 
processService.findTaskInstanceById(1)).thenReturn(taskInstance);
 
-        taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
+
+        Assert.assertNotNull(taskExecutionContext);
+    }
+
+    @Test
+    public void testGetResourceFullNames() throws Exception {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+        // task node
+        TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), 
TaskNode.class);
+
+        Map<String, String> map = 
taskPriorityQueueConsumer.getResourceFullNames(taskNode);
+
+        List<Resource> resourcesList = new ArrayList<Resource>();
+        Resource resource = new Resource();
+        resource.setFileName("fileName");
+        resourcesList.add(resource);
+
+        
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new 
Integer[]{123});
+        
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(),
 ResourceType.FILE);
+        Assert.assertNotNull(map);
+
     }
 
+    @Test
+    public void testSetDataxTaskRelation() throws Exception {
+
+        DataxTaskExecutionContext dataxTaskExecutionContext = new 
DataxTaskExecutionContext();
+        TaskNode taskNode = new TaskNode();
+        taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}");
+        DataSource dataSource = new DataSource();
+        dataSource.setId(1);
+        dataSource.setConnectionParams("");
+        dataSource.setType(DbType.MYSQL);
+        
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
+
+        
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode);
+
+        Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
+        Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
+    }
+
+    @Test
+    public void testRun() throws Exception {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectId(1);
+        taskInstance.setProcessDefine(processDefinition);
+
+        
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+
+        taskPriorityQueueConsumer.run();
+
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertNotEquals(-1,taskPriorityQueue.size());
+
+    }
+
+    @After
+    public void close() {
+        Stopper.stop();
+    }
 
 }

Reply via email to