This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
     new 041e1fd3e6 Set master's task running status in `runTask` to avoid the 
task group acquire failed, but the task status is in running (#11451) (#12011)
041e1fd3e6 is described below

commit 041e1fd3e61292427b5746c92ce850405ef2f2c4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Sep 17 22:55:40 2022 +0800

    Set master's task running status in `runTask` to avoid the task group 
acquire failed, but the task status is in running (#11451) (#12011)
    
    (cherry picked from commit 05589606a2fe6b7287b146e82f927b1eceaed701)
---
 .../master/runner/task/BlockingTaskProcessor.java  |  6 ++-
 .../master/runner/task/ConditionTaskProcessor.java |  6 ++-
 .../master/runner/task/SwitchTaskProcessor.java    | 43 ++++++++++------------
 3 files changed, 28 insertions(+), 27 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 1fa6b28625..494b9e8434 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -110,19 +110,21 @@ public class BlockingTaskProcessor extends 
BaseTaskProcessor {
             return false;
         }
         this.setTaskExecutionLogger();
-        initTaskParameters();
-        logger.info("blocking task start");
+        logger.info("blocking task submit success");
         return true;
     }
 
     @Override
     protected boolean runTask() {
+        logger.info("blocking task starting");
+        initTaskParameters();
         if (conditionResult.equals(DependResult.WAITING)) {
             setConditionResult();
             endTask();
         } else {
             endTask();
         }
+        logger.info("blocking task finished");
         return true;
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 4749e20f0f..f98174b19d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -67,19 +67,21 @@ public class ConditionTaskProcessor extends 
BaseTaskProcessor {
             return false;
         }
         this.setTaskExecutionLogger();
-        initTaskParameters();
-        logger.info("condition task start");
+        logger.info("condition task submit success");
         return true;
     }
 
     @Override
     public boolean runTask() {
+        initTaskParameters();
+        logger.info("condition task start");
         if (conditionResult.equals(DependResult.WAITING)) {
             setConditionResult();
             endTask();
         } else {
             endTask();
         }
+        logger.info("condition task finished");
         return true;
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 444d41622c..ffd124c18b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -17,8 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
-
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -30,9 +31,6 @@ import 
org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -41,7 +39,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import com.google.auto.service.AutoService;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
 
 /**
  * switch task processor
@@ -64,29 +62,28 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
             return false;
         }
         this.setTaskExecutionLogger();
-        
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
 processInstance.getProcessDefinitionCode(),
-                processInstance.getProcessDefinitionVersion(),
-                taskInstance.getProcessInstanceId(),
-                taskInstance.getId()));
-        taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
-        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
-        taskInstance.setStartTime(new Date());
-        processService.updateTaskInstance(taskInstance);
+        logger.info("switch task submit success");
         return true;
     }
 
     @Override
     public boolean runTask() {
-        try {
-            if (!this.taskInstance().getState().typeIsFinished() && 
setSwitchResult()) {
-                endTaskState();
-            }
-        } catch (Exception e) {
-            logger.error("update work flow {} switch task {} state error:",
-                    this.processInstance.getId(),
-                    this.taskInstance.getId(),
-                    e);
+        logger.info("switch task starting");
+        taskInstance.setLogPath(
+                LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), 
processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion(),
+                        taskInstance.getProcessInstanceId(),
+                        taskInstance.getId()));
+        taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
+        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        taskInstance.setStartTime(new Date());
+        processService.updateTaskInstance(taskInstance);
+
+        if (!this.taskInstance().getState().typeIsFinished()) {
+            setSwitchResult();
         }
+        endTaskState();
+        logger.info("switch task finished");
         return true;
     }
 

Reply via email to