This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 14272dafab Fix failover Master might not release taskGroup (#15287)
14272dafab is described below

commit 14272dafab40588c18eafb4e9aad70b8f79aaead
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 6 13:31:46 2023 +0800

    Fix failover Master might not release taskGroup (#15287)
---
 .../server/master/runner/WorkflowExecuteRunnable.java   | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 35ad1e8c82..9006812431 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -465,12 +465,13 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
      * release task group
      *
      */
-    public void releaseTaskGroup(TaskInstance taskInstance) throws 
InterruptedException {
+    public void releaseTaskGroup(TaskInstance taskInstance) {
         ProcessInstance workflowInstance = 
workflowExecuteContext.getWorkflowInstance();
         // todo: use Integer
         if (taskInstance.getTaskGroupId() <= 0) {
             log.info("The current TaskInstance: {} doesn't use taskGroup, no 
need to release taskGroup",
                     taskInstance.getName());
+            return;
         }
         TaskInstance nextTaskInstance = 
processService.releaseTaskGroup(taskInstance);
         if (nextTaskInstance == null) {
@@ -1347,9 +1348,11 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                 TaskExecutionStatus state = existTaskInstance.getState();
                 if (state == TaskExecutionStatus.RUNNING_EXECUTION
                         || state == TaskExecutionStatus.DISPATCH
-                        || state == TaskExecutionStatus.SUBMITTED_SUCCESS) {
+                        || state == TaskExecutionStatus.SUBMITTED_SUCCESS
+                        || state == TaskExecutionStatus.DELAY_EXECUTION) {
                     // try to take over task instance
                     if (state != TaskExecutionStatus.SUBMITTED_SUCCESS
+                            && state != TaskExecutionStatus.DELAY_EXECUTION
                             && tryToTakeOverTaskInstance(existTaskInstance)) {
                         log.info("Success take over task {}", 
existTaskInstance.getName());
                         continue;
@@ -1357,6 +1360,8 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                         // set the task instance state to fault tolerance
                         existTaskInstance.setFlag(Flag.NO);
                         
existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
+                        releaseTaskGroup(existTaskInstance);
+
                         validTaskMap.remove(existTaskInstance.getTaskCode());
                         taskInstanceDao.updateById(existTaskInstance);
                         existTaskInstance = 
cloneTolerantTaskInstance(existTaskInstance);
@@ -1444,12 +1449,12 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
             ITaskInstanceOperator iTaskInstanceOperator =
                     SingletonJdkDynamicRpcClientProxyFactory
                             .getProxyClient(taskInstance.getHost(), 
ITaskInstanceOperator.class);
-            UpdateWorkflowHostResponse updateWorkflowHostResponse = 
iTaskInstanceOperator.updateWorkflowInstanceHost(
+            UpdateWorkflowHostResponse response = 
iTaskInstanceOperator.updateWorkflowInstanceHost(
                     new UpdateWorkflowHostRequest(taskInstance.getId(), 
masterConfig.getMasterAddress()));
-            if (!updateWorkflowHostResponse.isSuccess()) {
+            if (!response.isSuccess()) {
                 log.error(
-                        "Takeover TaskInstance failed, receive a failed 
response from worker: {}, will try to create a new TaskInstance",
-                        taskInstance.getHost());
+                        "Takeover TaskInstance failed, receive a failed 
response: {} from worker: {}, will try to create a new TaskInstance",
+                        response, taskInstance.getHost());
                 return false;
             }
 

Reply via email to