This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
new 184d64e [BUG FIX] fix bug: dependent task failed when conditions task
exists (#2768)
184d64e is described below
commit 184d64e85291b1e8fb82494bf5a8e2ea755a3dc1
Author: bao liang <[email protected]>
AuthorDate: Thu May 21 10:34:28 2020 +0800
[BUG FIX] fix bug: dependent task failed when conditions task exists (#2768)
* fix bug 2464: change dependent task for process.
* remove unused code
* add ut
* add ut
* update comments
Co-authored-by: baoliang <[email protected]>
---
.../common/enums/ExecutionStatus.java | 11 +++-
.../master/runner/DependentTaskExecThread.java | 2 +-
.../server/master/runner/MasterExecThread.java | 2 +-
.../server/utils/DependentExecute.java | 32 ++++-------
.../server/master/DependentTaskTest.java | 64 +++++++++++++++++++++-
.../service/process/ProcessService.java | 2 +-
6 files changed, 85 insertions(+), 28 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
index 1c336c8..5956de2 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
@@ -86,14 +86,14 @@ public enum ExecutionStatus {
public boolean typeIsFinished(){
return typeIsSuccess() || typeIsFailure() || typeIsCancel() ||
typeIsPause()
- || typeIsWaittingThread();
+ || typeIsStop();
}
/**
* status is waiting thread
* @return status
*/
- public boolean typeIsWaittingThread(){
+ public boolean typeIsWaitingThread(){
return this == WAITTING_THREAD;
}
@@ -104,6 +104,13 @@ public enum ExecutionStatus {
public boolean typeIsPause(){
return this == PAUSE;
}
+ /**
+ * status is pause
+ * @return status
+ */
+ public boolean typeIsStop(){
+ return this == STOP;
+ }
/**
* status is running
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
index 015c200..8be6839 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
@@ -146,7 +146,7 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
if ( allDependentTaskFinish() ||
taskInstance.getState().typeIsFinished()){
break;
}
- // updateProcessInstance task instance
+ // update process task
taskInstance =
processService.findTaskInstanceById(taskInstance.getId());
processInstance =
processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 6379b32..39a5e1e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -338,7 +338,7 @@ public class MasterExecThread implements Runnable {
private void endProcess() {
processInstance.setEndTime(new Date());
processService.updateProcessInstance(processInstance);
- if(processInstance.getState().typeIsWaittingThread()){
+ if(processInstance.getState().typeIsWaitingThread()){
processService.createRecoveryWaitingThreadCommand(null,
processInstance);
}
List<TaskInstance> taskInstances =
processService.findValidTaskListByProcessId(processInstance.getId());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
index 2359546..71c7d95 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
@@ -123,28 +123,16 @@ public class DependentExecute {
/**
* depend type = depend_all
- * skip the condition tasks.
- * judge all the task
* @return
*/
private DependResult dependResultByProcessInstance(ProcessInstance
processInstance){
- DependResult result = DependResult.FAILED;
- List<TaskNode> taskNodes =
-
processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId());
- if(CollectionUtils.isEmpty(taskNodes)) {
- return result;
+ if(!processInstance.getState().typeIsFinished()){
+ return DependResult.WAITING;
}
- for(TaskNode taskNode:taskNodes){
- if(taskNode.isConditionsTask()
- || DagHelper.haveConditionsAfterNode(taskNode.getName(),
taskNodes)){
- continue;
- }
- DependResult tmpResult =
getDependTaskResult(taskNode.getName(),processInstance);
- if(DependResult.SUCCESS != tmpResult){
- return tmpResult;
- }
+ if(processInstance.getState().typeIsSuccess()){
+ return DependResult.SUCCESS;
}
- return DependResult.SUCCESS;
+ return DependResult.FAILED;
}
/**
@@ -168,7 +156,11 @@ public class DependentExecute {
if(taskInstance == null){
// cannot find task in the process instance
// maybe because process instance is running or failed.
- result =
getDependResultByProcessStateWhenTaskNull(processInstance.getState());
+ if(processInstance.getState().typeIsFinished()){
+ result = DependResult.FAILED;
+ }else{
+ return DependResult.WAITING;
+ }
}else{
result = getDependResultByState(taskInstance.getState());
}
@@ -217,9 +209,7 @@ public class DependentExecute {
*/
private DependResult getDependResultByState(ExecutionStatus state) {
- if(state.typeIsRunning()
- || state == ExecutionStatus.SUBMITTED_SUCCESS
- || state == ExecutionStatus.WAITTING_THREAD){
+ if(!state.typeIsFinished()){
return DependResult.WAITING;
}else if(state.typeIsSuccess()){
return DependResult.SUCCESS;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index a65b050..66bc3af 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -94,11 +94,12 @@ public class DependentTaskTest {
}
@Test
- public void test() throws Exception{
+ public void testDependAll() throws Exception{
TaskInstance taskInstance = getTaskInstance();
String dependString =
"{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
taskInstance.setDependency(dependString);
+
Mockito.when(processService.submitTask(taskInstance))
.thenReturn(taskInstance);
DependentTaskExecThread dependentTask =
@@ -107,6 +108,54 @@ public class DependentTaskTest {
dependentTask.call();
Assert.assertEquals(ExecutionStatus.SUCCESS,
dependentTask.getTaskInstance().getState());
+
+ DateInterval dateInterval =DependentDateUtils.getTodayInterval(new
Date()).get(0);
+
+
+ Mockito.when(processService
+ .findLastRunningProcess(4, dateInterval.getStartTime(),
+ dateInterval.getEndTime()))
+ .thenReturn(findLastStopProcessInterval());
+ DependentTaskExecThread dependentFailure = new
DependentTaskExecThread(taskInstance);
+ dependentFailure.call();
+ Assert.assertEquals(ExecutionStatus.FAILURE,
dependentFailure.getTaskInstance().getState());
+ }
+
+ @Test
+ public void testDependTask() throws Exception{
+
+ TaskInstance taskInstance = getTaskInstance();
+ String dependString =
"{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"D\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
+ taskInstance.setDependency(dependString);
+ Mockito.when(processService.submitTask(taskInstance))
+ .thenReturn(taskInstance);
+ DependentTaskExecThread dependentTask =
+ new DependentTaskExecThread(taskInstance);
+
+ dependentTask.call();
+
+ Assert.assertEquals(ExecutionStatus.SUCCESS,
dependentTask.getTaskInstance().getState());
+
+ DateInterval dateInterval =DependentDateUtils.getTodayInterval(new
Date()).get(0);
+ Mockito.when(processService
+ .findLastRunningProcess(4, dateInterval.getStartTime(),
+ dateInterval.getEndTime()))
+ .thenReturn(findLastStopProcessInterval());
+
+ Mockito.when(processService
+ .findValidTaskListByProcessId(11))
+ .thenReturn(getErrorTaskInstances());
+ DependentTaskExecThread dependentFailure = new
DependentTaskExecThread(taskInstance);
+ dependentFailure.call();
+ Assert.assertEquals(ExecutionStatus.FAILURE,
dependentFailure.getTaskInstance().getState());
+ }
+
+ private ProcessInstance findLastStopProcessInterval(){
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(11);
+ processInstance.setProcessDefinitionId(4);
+ processInstance.setState(ExecutionStatus.STOP);
+ return processInstance;
}
private ProcessInstance findLastProcessInterval(){
@@ -142,7 +191,7 @@ public class DependentTaskTest {
return list;
}
- private List<TaskInstance> getTaskInstances(){
+ private List<TaskInstance> getErrorTaskInstances(){
List<TaskInstance> list = new ArrayList<>();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setName("C");
@@ -152,12 +201,23 @@ public class DependentTaskTest {
return list;
}
+ private List<TaskInstance> getTaskInstances(){
+ List<TaskInstance> list = new ArrayList<>();
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setName("D");
+ taskInstance.setState(ExecutionStatus.SUCCESS);
+ taskInstance.setDependency("1231");
+ list.add(taskInstance);
+ return list;
+ }
+
private TaskInstance getTaskInstance(){
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("DEPENDENT");
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setProcessInstanceId(10111);
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
return taskInstance;
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index f0ae76e..6c4356f 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -254,7 +254,7 @@ public class ProcessService {
//process data check
if (null == processData) {
logger.error("process data is null");
- return null;
+ return new ArrayList<>();
}
return processData.getTasks();