This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9f67cc40c7 [Improvement-16872][Master] Select a coordinator from 
masters to wake up task group (#16873)
9f67cc40c7 is described below

commit 9f67cc40c786c0aeb283518ddd37b94f1fc8fb04
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 18 20:02:59 2024 +0800

    [Improvement-16872][Master] Select a coordinator from masters to wake up 
task group (#16873)
    
    * Add MasterCoordinator
    * Add MasterCoordinatorListener
    * Release the task group queue slot when task instance failover
---
 .../common/enums/TaskGroupQueueStatus.java         |   1 +
 .../common/model/MasterHeartBeat.java              |   2 +
 .../dao/entity/TaskDefinitionLog.java              |   2 +
 .../server/master/MasterServer.java                |   7 ++
 .../master/engine/ITaskGroupCoordinator.java       |  98 +++++++++++++++
 .../server/master/engine/MasterCoordinator.java    |  83 ++++++++++++
 .../server/master/engine/TaskGroupCoordinator.java | 140 ++++++++-------------
 .../server/master/engine/WorkflowEngine.java       |   9 +-
 .../handler/RecoverFailureTaskCommandHandler.java  |   8 --
 .../handler/WorkflowFailoverCommandHandler.java    |  12 +-
 .../task/runnable/FailoverTaskInstanceFactory.java |   8 ++
 .../task/statemachine/AbstractTaskStateAction.java |  13 +-
 .../statemachine/TaskSubmittedStateAction.java     |   4 -
 .../master/registry/MasterHeartBeatTask.java       |   8 +-
 .../master/registry/MasterRegistryClient.java      |   7 +-
 .../server/master/utils/TaskGroupUtils.java        |   2 +-
 .../TaskGroupCoordinatorTest.java                  |  18 +--
 .../integration/WorkflowTestCaseContext.java       |   3 +
 .../WorkflowTestCaseContextFactory.java            |  14 +++
 .../integration/cases/WorkflowStartTestCase.java   |  32 +++++
 ...rkflow_with_one_fake_task_using_task_group.yaml |  73 +++++++++++
 .../src/test/resources/logback.xml                 |   2 +-
 .../registry/api/enums/RegistryNodeType.java       |   1 +
 .../registry/api/ha/AbstractHAServer.java          |  38 ++++--
 .../api/ha/ServerStatusChangeListener.java         |   2 +-
 25 files changed, 432 insertions(+), 155 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java
index 08ee74d4ce..2d26cb1092 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java
@@ -28,6 +28,7 @@ public enum TaskGroupQueueStatus {
 
     WAIT_QUEUE(-1, "wait queue"),
     ACQUIRE_SUCCESS(1, "acquire success"),
+    @Deprecated
     RELEASE(2, "release");
 
     @EnumValue
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
index b8ae4512dd..1f134df7bf 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
@@ -24,4 +24,6 @@ import lombok.experimental.SuperBuilder;
 @NoArgsConstructor
 public class MasterHeartBeat extends BaseHeartBeat implements HeartBeat {
 
+    private boolean isCoordinator;
+
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
index ce47df869a..48e24576f5 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
@@ -70,6 +70,8 @@ public class TaskDefinitionLog extends TaskDefinition {
         this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
         this.setFlag(taskDefinition.getFlag());
         this.setModifyBy(taskDefinition.getModifyBy());
+        this.setTaskGroupId(taskDefinition.getTaskGroupId());
+        this.setTaskGroupPriority(taskDefinition.getTaskGroupPriority());
         this.setCpuQuota(taskDefinition.getCpuQuota());
         this.setMemoryMax(taskDefinition.getMemoryMax());
         this.setTaskExecuteType(taskDefinition.getTaskExecuteType());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 7695f7adc3..3afd4d0990 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
 import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
 import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
 import org.apache.dolphinscheduler.server.master.cluster.ClusterStateMonitors;
+import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
 import org.apache.dolphinscheduler.server.master.engine.WorkflowEngine;
 import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
 import 
org.apache.dolphinscheduler.server.master.engine.system.SystemEventBusFireWorker;
@@ -95,6 +96,9 @@ public class MasterServer implements IStoppable {
     @Autowired
     private SystemEventBusFireWorker systemEventBusFireWorker;
 
+    @Autowired
+    private MasterCoordinator masterCoordinator;
+
     public static void main(String[] args) {
         
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
 
@@ -122,6 +126,8 @@ public class MasterServer implements IStoppable {
         this.masterRegistryClient.start();
         this.masterRegistryClient.setRegistryStoppable(this);
 
+        this.masterCoordinator.start();
+
         this.clusterManager.start();
         this.clusterStateMonitors.start();
 
@@ -173,6 +179,7 @@ public class MasterServer implements IStoppable {
                 WorkflowEngine workflowEngine1 = workflowEngine;
                 SchedulerApi closedSchedulerApi = schedulerApi;
                 MasterRpcServer closedRpcServer = masterRPCServer;
+                MasterCoordinator closeMasterCoordinator = masterCoordinator;
                 MasterRegistryClient closedMasterRegistryClient = 
masterRegistryClient;
                 // close spring Context and will invoke method with 
@PreDestroy annotation to destroy beans.
                 // like 
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
new file mode 100644
index 0000000000..75bb63438f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine;
+
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+/**
+ * The TaskGroupCoordinator use to manage the task group slot. The task group 
slot is used to limit the number of {@link TaskInstance} that can be run at the 
same time.
+ * <p>
+ * The {@link TaskGroupQueue} is used to represent the task group slot. When a 
{@link TaskGroupQueue} which inQueue is YES means the {@link TaskGroupQueue} is 
using by a {@link TaskInstance}.
+ * <p>
+ * When the {@link TaskInstance} need to use task group, we should use @{@link 
ITaskGroupCoordinator#acquireTaskGroupSlot(TaskInstance)} to acquire the task 
group slot,
+ * this method doesn't block should always acquire successfully, and you 
should directly stop dispatch the task instance.
+ * When the task group slot is available, the ITaskGroupCoordinator will wake 
up the waiting {@link TaskInstance} to dispatch.
+ * <pre>
+ *     if(needAcquireTaskGroupSlot(taskInstance)) {
+ *         taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
+ *         return;
+ *     }
+ * </pre>
+ * <p>
+ * When the {@link TaskInstance} is finished, we should use @{@link 
ITaskGroupCoordinator#releaseTaskGroupSlot(TaskInstance)} to release the task 
group slot.
+ * <pre>
+ *     if(needToReleaseTaskGroupSlot(taskInstance)) {
+ *         taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
+ *     }
+ * </pre>
+ */
+public interface ITaskGroupCoordinator extends AutoCloseable {
+
+    /**
+     * Start the TaskGroupCoordinator, once started, you cannot call this 
method until you have closed the coordinator.
+     */
+    void start();
+
+    /**
+     * If the {@link TaskInstance#getTaskGroupId()} > 0, and the TaskGroup 
flag is {@link Flag#YES} then the task instance need to use task group.
+     *
+     * @param taskInstance task instance
+     * @return true if the TaskInstance need to acquireTaskGroupSlot
+     */
+    boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance);
+
+    /**
+     * Acquire the task group slot for the given {@link TaskInstance}.
+     * <p>
+     * When taskInstance want to acquire a TaskGroup slot, should call this 
method. If acquire successfully, will create a TaskGroupQueue in db which is in 
queue and status is {@link TaskGroupQueueStatus#WAIT_QUEUE}.
+     * The TaskInstance shouldn't dispatch until there exist available slot, 
the taskGroupCoordinator notify it.
+     *
+     * @param taskInstance the task instance which want to acquire task group 
slot.
+     * @throws IllegalArgumentException if the taskInstance is null or the 
used taskGroup doesn't exist.
+     */
+    void acquireTaskGroupSlot(TaskInstance taskInstance);
+
+    /**
+     * If the TaskInstance is using TaskGroup then it need to release 
TaskGroupSlot.
+     *
+     * @param taskInstance taskInstance
+     * @return true if the TaskInstance need to release TaskGroupSlot
+     */
+    boolean needToReleaseTaskGroupSlot(TaskInstance taskInstance);
+
+    /**
+     * Release the task group slot for the given {@link TaskInstance}.
+     * <p>
+     * When taskInstance want to release a TaskGroup slot, should call this 
method. The release method will delete the taskGroupQueue.
+     * This method is idempotent, this means that if the task group slot is 
already released, this method will do nothing.
+     *
+     * @param taskInstance the task instance which want to release task group 
slot.
+     * @throws IllegalArgumentException If the taskInstance is null or the 
task doesn't use task group.
+     */
+    void releaseTaskGroupSlot(TaskInstance taskInstance);
+
+    /**
+     * Close the TaskGroupCoordinator, once closed, the coordinator will not 
work until you have started the coordinator again.
+     */
+    @Override
+    void close();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
new file mode 100644
index 0000000000..eaaa6187c9
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
+import 
org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * The MasterCoordinator is singleton at the clusters, which is used to do 
some control work, e.g manage the {@link ITaskGroupCoordinator}
+ */
+@Slf4j
+@Component
+public class MasterCoordinator extends AbstractHAServer {
+
+    private final ITaskGroupCoordinator taskGroupCoordinator;
+
+    public MasterCoordinator(final Registry registry,
+                             final MasterConfig masterConfig,
+                             final ITaskGroupCoordinator taskGroupCoordinator) 
{
+        super(
+                registry,
+                RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
+                masterConfig.getMasterAddress());
+        this.taskGroupCoordinator = taskGroupCoordinator;
+        addServerStatusChangeListener(new 
MasterCoordinatorListener(taskGroupCoordinator));
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        log.info("MasterCoordinator started...");
+    }
+
+    @Override
+    public void close() {
+        taskGroupCoordinator.close();
+        log.info("MasterCoordinator shutdown...");
+    }
+
+    public static class MasterCoordinatorListener extends 
AbstractServerStatusChangeListener {
+
+        private final ITaskGroupCoordinator taskGroupCoordinator;
+
+        public MasterCoordinatorListener(ITaskGroupCoordinator 
taskGroupCoordinator) {
+            this.taskGroupCoordinator = checkNotNull(taskGroupCoordinator);
+        }
+
+        @Override
+        public void changeToActive() {
+            taskGroupCoordinator.start();
+        }
+
+        @Override
+        public void changeToStandBy() {
+            taskGroupCoordinator.close();
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index 19a20dff9a..4fe69e9ac4 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -37,8 +37,6 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcqui
 import 
org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcquireSuccessNotifyResponse;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.server.master.utils.TaskGroupUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -47,6 +45,7 @@ import org.apache.commons.lang3.time.StopWatch;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -55,34 +54,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-/**
- * The TaskGroupCoordinator use to manage the task group slot. The task group 
slot is used to limit the number of {@link TaskInstance} that can be run at the 
same time.
- * <p>
- * The {@link TaskGroupQueue} is used to represent the task group slot. When a 
{@link TaskGroupQueue} which inQueue is YES means the {@link TaskGroupQueue} is 
using by a {@link TaskInstance}.
- * <p>
- * When the {@link TaskInstance} need to use task group, we should use @{@link 
TaskGroupCoordinator#acquireTaskGroupSlot(TaskInstance)} to acquire the task 
group slot,
- * this method doesn't block should always acquire successfully, and you 
should directly stop dispatch the task instance.
- * When the task group slot is available, the TaskGroupCoordinator will wake 
up the waiting {@link TaskInstance} to dispatch.
- * <pre>
- *     if(needAcquireTaskGroupSlot(taskInstance)) {
- *         taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
- *         return;
- *     }
- * </pre>
- * <p>
- * When the {@link TaskInstance} is finished, we should use @{@link 
TaskGroupCoordinator#releaseTaskGroupSlot(TaskInstance)} to release the task 
group slot.
- * <pre>
- *     if(needToReleaseTaskGroupSlot(taskInstance)) {
- *         taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
- *     }
- * </pre>
- */
+import com.google.common.annotations.VisibleForTesting;
+
 @Slf4j
 @Component
-public class TaskGroupCoordinator extends BaseDaemonThread implements 
AutoCloseable {
-
-    @Autowired
-    private RegistryClient registryClient;
+public class TaskGroupCoordinator implements ITaskGroupCoordinator, 
AutoCloseable {
 
     @Autowired
     private TaskGroupDao taskGroupDao;
@@ -96,40 +72,48 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
     @Autowired
     private WorkflowInstanceDao workflowInstanceDao;
 
-    private boolean flag = true;
+    private boolean flag = false;
 
-    private static final int DEFAULT_LIMIT = 1000;
+    private Thread internalThread;
 
-    public TaskGroupCoordinator() {
-        super("TaskGroupCoordinator");
-    }
+    private static final int DEFAULT_LIMIT = 1000;
 
-    @Override
     public synchronized void start() {
         log.info("TaskGroupCoordinator starting...");
+        if (flag) {
+            throw new IllegalStateException("TaskGroupCoordinator is already 
started");
+        }
+        if (internalThread != null) {
+            throw new IllegalStateException("InternalThread is already 
started");
+        }
         flag = true;
-        super.start();
+        internalThread = new BaseDaemonThread(this::doStart) {
+        };
+        internalThread.start();
         log.info("TaskGroupCoordinator started...");
     }
 
-    @Override
-    public void run() {
+    @VisibleForTesting
+    boolean isStarted() {
+        return flag;
+    }
+
+    private void doStart() {
+        // Sleep 1 minutes here to make sure the previous task group slot has 
been released.
+        // This step is not necessary, since the wakeup operation is 
idempotent, but we can avoid confusion warning.
+        ThreadUtils.sleep(TimeUnit.MINUTES.toMillis(1));
+
         while (flag) {
             try {
-                
registryClient.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
-                try {
-                    StopWatch taskGroupCoordinatorRoundCost = 
StopWatch.createStarted();
+                final StopWatch taskGroupCoordinatorRoundCost = 
StopWatch.createStarted();
 
-                    amendTaskGroupUseSize();
-                    amendTaskGroupQueueStatus();
-                    dealWithForceStartTaskGroupQueue();
-                    dealWithWaitingTaskGroupQueue();
+                amendTaskGroupUseSize();
+                amendTaskGroupQueueStatus();
+                dealWithForceStartTaskGroupQueue();
+                dealWithWaitingTaskGroupQueue();
 
-                    taskGroupCoordinatorRoundCost.stop();
-                    log.debug("TaskGroupCoordinator round cost: {}/ms", 
taskGroupCoordinatorRoundCost.getTime());
-                } finally {
-                    
registryClient.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
-                }
+                taskGroupCoordinatorRoundCost.stop();
+                log.debug("TaskGroupCoordinator round cost: {}/ms", 
taskGroupCoordinatorRoundCost.getTime());
             } catch (Throwable e) {
                 log.error("TaskGroupCoordinator error", e);
             } finally {
@@ -212,7 +196,6 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
                 log.warn("The TaskInstance: {} state: {} finished, will 
release the TaskGroupQueue: {}",
                         taskInstance.getName(), taskInstance.getState(), 
taskGroupQueue);
                 deleteTaskGroupQueueSlot(taskGroupQueue);
-                continue;
             }
         }
     }
@@ -226,7 +209,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
         int limit = DEFAULT_LIMIT;
         StopWatch taskGroupCoordinatorRoundTimeCost = 
StopWatch.createStarted();
         while (true) {
-            List<TaskGroupQueue> taskGroupQueues =
+            final List<TaskGroupQueue> taskGroupQueues =
                     
taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(minTaskGroupQueueId, 
limit);
             if (CollectionUtils.isEmpty(taskGroupQueues)) {
                 break;
@@ -245,7 +228,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
         // Find the force start task group queue(Which is inQueue and 
forceStart is YES)
         // Notify the related waiting task instance
         // Set the taskGroupQueue status to RELEASE and remove it from queue
-        for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
+        for (final TaskGroupQueue taskGroupQueue : taskGroupQueues) {
             try {
                 LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId());
                 // notify the waiting task instance
@@ -312,11 +295,12 @@ public class TaskGroupCoordinator extends 
BaseDaemonThread implements AutoClosea
                     // next time.
                     notifyWaitingTaskInstance(taskGroupQueue);
 
-                    // Set the taskGroupQueue status to RUNNING and remove 
from queue
+                    // Set the taskGroupQueue status to ACQUIRE_SUCCESS and 
remove from WAITING queue
                     taskGroupQueue.setInQueue(Flag.YES.getCode());
                     
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
                     taskGroupQueue.setUpdateTime(new Date());
                     taskGroupQueueDao.updateById(taskGroupQueue);
+                    log.info("Success acquire TaskGroupSlot for 
TaskGroupQueue: {}", taskGroupQueue);
                 } catch (UnsupportedOperationException 
unsupportedOperationException) {
                     deleteTaskGroupQueueSlot(taskGroupQueue);
                     log.info(
@@ -331,13 +315,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
         }
     }
 
-    /**
-     * If the {@link TaskInstance#getTaskGroupId()} > 0, and the TaskGroup 
flag is {@link Flag#YES} then the task instance need to use task group.
-     *
-     * @param taskInstance task instance
-     * @return true if the TaskInstance need to acquireTaskGroupSlot
-     */
-    public boolean needAcquireTaskGroupSlot(TaskInstance taskInstance) {
+    @Override
+    public boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance) {
         if (taskInstance == null) {
             throw new IllegalArgumentException("The TaskInstance is null");
         }
@@ -354,15 +333,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
         return Flag.YES.equals(taskGroup.getStatus());
     }
 
-    /**
-     * Acquire the task group slot for the given {@link TaskInstance}.
-     * <p>
-     * When taskInstance want to acquire a TaskGroup slot, should call this 
method. If acquire successfully, will create a TaskGroupQueue in db which is in 
queue and status is {@link TaskGroupQueueStatus#WAIT_QUEUE}.
-     * The TaskInstance shouldn't dispatch until there exist available slot, 
the taskGroupCoordinator notify it.
-     *
-     * @param taskInstance the task instance which want to acquire task group 
slot.
-     * @throws IllegalArgumentException if the taskInstance is null or the 
used taskGroup doesn't exist.
-     */
+    @Override
     public void acquireTaskGroupSlot(TaskInstance taskInstance) {
         if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) {
             throw new IllegalArgumentException("The current TaskInstance does 
not use task group");
@@ -393,12 +364,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
         taskGroupQueueDao.insert(taskGroupQueue);
     }
 
-    /**
-     * If the TaskInstance is using TaskGroup then it need to release 
TaskGroupSlot.
-     *
-     * @param taskInstance taskInsatnce
-     * @return true if the TaskInstance need to release TaskGroupSlot
-     */
+    @Override
     public boolean needToReleaseTaskGroupSlot(TaskInstance taskInstance) {
         if (taskInstance == null) {
             throw new IllegalArgumentException("The TaskInstance is null");
@@ -410,15 +376,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
         return true;
     }
 
-    /**
-     * Release the task group slot for the given {@link TaskInstance}.
-     * <p>
-     * When taskInstance want to release a TaskGroup slot, should call this 
method. The release method will delete the taskGroupQueue.
-     * This method is idempotent, this means that if the task group slot is 
already released, this method will do nothing.
-     *
-     * @param taskInstance the task instance which want to release task group 
slot.
-     * @throws IllegalArgumentException If the taskInstance is null or the 
task doesn't use task group.
-     */
+    @Override
     public void releaseTaskGroupSlot(TaskInstance taskInstance) {
         if (taskInstance == null) {
             throw new IllegalArgumentException("The TaskInstance is null");
@@ -488,8 +446,20 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
implements AutoClosea
     }
 
     @Override
-    public void close() throws Exception {
+    public synchronized void close() {
+        if (!flag) {
+            log.warn("TaskGroupCoordinator is already closed");
+            return;
+        }
         flag = false;
+        try {
+            if (internalThread != null) {
+                internalThread.interrupt();
+            }
+        } catch (Exception ex) {
+            log.error("Close internalThread failed", ex);
+        }
+        internalThread = null;
         log.info("TaskGroupCoordinator closed");
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
index 75bc6b4527..6639fc553b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
@@ -30,9 +30,6 @@ import org.springframework.stereotype.Component;
 @Component
 public class WorkflowEngine implements AutoCloseable {
 
-    @Autowired
-    private TaskGroupCoordinator taskGroupCoordinator;
-
     @Autowired
     private WorkflowEventBusCoordinator workflowEventBusCoordinator;
 
@@ -47,8 +44,6 @@ public class WorkflowEngine implements AutoCloseable {
 
     public void start() {
 
-        taskGroupCoordinator.start();
-
         workflowEventBusCoordinator.start();
 
         commandEngine.start();
@@ -65,9 +60,7 @@ public class WorkflowEngine implements AutoCloseable {
         try (
                 final CommandEngine ignore1 = commandEngine;
                 final WorkflowEventBusCoordinator ignore2 = 
workflowEventBusCoordinator;
-                final GlobalTaskDispatchWaitingQueueLooper ignore3 =
-                        globalTaskDispatchWaitingQueueLooper;
-                final TaskGroupCoordinator ignore4 = taskGroupCoordinator;
+                final GlobalTaskDispatchWaitingQueueLooper ignore3 = 
globalTaskDispatchWaitingQueueLooper;
                 final LogicTaskEngineDelegator ignore5 = 
logicTaskEngineDelegator) {
             // closed the resource
         }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
index 3be4cbc5e8..5ad06eaa48 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
@@ -25,14 +25,12 @@ import 
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
 import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
 import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
 import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories;
-import 
org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
 
 import java.util.ArrayList;
@@ -63,12 +61,6 @@ public class RecoverFailureTaskCommandHandler extends 
AbstractCommandHandler {
     @Autowired
     private TaskInstanceDao taskInstanceDao;
 
-    @Autowired
-    private TaskExecutionContextFactory taskExecutionContextFactory;
-
-    @Autowired
-    private TaskGroupCoordinator taskGroupCoordinator;
-
     @Autowired
     private ApplicationContext applicationContext;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
index 506fcdffb7..24d117fd87 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
@@ -22,16 +22,14 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import 
org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
+import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
 import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
 import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
 import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
-import 
org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
 
 import java.util.Map;
@@ -55,13 +53,7 @@ public class WorkflowFailoverCommandHandler extends 
AbstractCommandHandler {
     private WorkflowInstanceDao workflowInstanceDao;
 
     @Autowired
-    private TaskInstanceDao taskInstanceDao;
-
-    @Autowired
-    private TaskExecutionContextFactory taskExecutionContextFactory;
-
-    @Autowired
-    private TaskGroupCoordinator taskGroupCoordinator;
+    private ITaskGroupCoordinator taskGroupCoordinator;
 
     @Autowired
     private ApplicationContext applicationContext;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FailoverTaskInstanceFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FailoverTaskInstanceFactory.java
index 765fcfa24b..67a2eb87fc 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FailoverTaskInstanceFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FailoverTaskInstanceFactory.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.FailoverTaskInstanceFactory.FailoverTaskInstanceBuilder;
 
 import java.util.Date;
@@ -35,6 +36,9 @@ public class FailoverTaskInstanceFactory extends 
AbstractTaskInstanceFactory<Fai
     @Autowired
     private TaskInstanceDao taskInstanceDao;
 
+    @Autowired
+    private ITaskGroupCoordinator taskGroupCoordinator;
+
     @Override
     public FailoverTaskInstanceFactory.FailoverTaskInstanceBuilder builder() {
         return new FailoverTaskInstanceBuilder(this);
@@ -54,6 +58,10 @@ public class FailoverTaskInstanceFactory extends 
AbstractTaskInstanceFactory<Fai
         taskInstance.setExecutePath(null);
         taskInstanceDao.insert(taskInstance);
 
+        if 
(taskGroupCoordinator.needToReleaseTaskGroupSlot(needFailoverTaskInstance)) {
+            
taskGroupCoordinator.releaseTaskGroupSlot(needFailoverTaskInstance);
+        }
+
         needFailoverTaskInstance.setFlag(Flag.NO);
         
needFailoverTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
         taskInstanceDao.updateById(needFailoverTaskInstance);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index 867a6edeec..3f79d71efa 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -25,10 +25,9 @@ import 
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent;
+import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
 import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
 import 
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
@@ -41,7 +40,6 @@ import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRuntimeContextChangedEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
 
@@ -57,20 +55,14 @@ import com.google.common.collect.Lists;
 public abstract class AbstractTaskStateAction implements ITaskStateAction {
 
     @Autowired
-    protected TaskGroupCoordinator taskGroupCoordinator;
+    protected ITaskGroupCoordinator taskGroupCoordinator;
 
     @Autowired
     protected TaskInstanceDao taskInstanceDao;
 
-    @Autowired
-    protected TaskInstanceFactories taskInstanceFactories;
-
     @Autowired
     protected IWorkflowRepository workflowRepository;
 
-    @Autowired
-    private MasterConfig masterConfig;
-
     @Autowired
     protected ITaskExecutorClient taskExecutorClient;
 
@@ -237,6 +229,7 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
     protected void tryToDispatchTask(final ITaskExecutionRunnable 
taskExecutionRunnable) {
         if (isTaskNeedAcquireTaskGroupSlot(taskExecutionRunnable)) {
             acquireTaskGroupSlot(taskExecutionRunnable);
+            log.info("Task{} using taskGroup, success acquire taskGroup slot", 
taskExecutionRunnable.getName());
             return;
         }
         
taskExecutionRunnable.getWorkflowEventBus().publish(TaskDispatchLifecycleEvent.of(taskExecutionRunnable));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
index 184e5d4c67..cc82c1030c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -53,9 +52,6 @@ public class TaskSubmittedStateAction extends 
AbstractTaskStateAction {
     @Autowired
     private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
 
-    @Autowired
-    private TaskGroupCoordinator taskGroupCoordinator;
-
     @Autowired
     private TaskInstanceDao taskInstanceDao;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index 88306bfa6c..cf33c4824a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -30,6 +30,7 @@ import 
org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
+import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 
 import lombok.NonNull;
@@ -44,17 +45,21 @@ public class MasterHeartBeatTask extends 
BaseHeartBeatTask<MasterHeartBeat> {
 
     private final RegistryClient registryClient;
 
+    private final MasterCoordinator masterCoordinator;
+
     private final String heartBeatPath;
 
     private final int processId;
 
     public MasterHeartBeatTask(@NonNull MasterConfig masterConfig,
                                @NonNull MetricsProvider metricsProvider,
-                               @NonNull RegistryClient registryClient) {
+                               @NonNull RegistryClient registryClient,
+                               @NonNull MasterCoordinator masterCoordinator) {
         super("MasterHeartBeatTask", 
masterConfig.getMaxHeartbeatInterval().toMillis());
         this.masterConfig = masterConfig;
         this.metricsProvider = metricsProvider;
         this.registryClient = registryClient;
+        this.masterCoordinator = masterCoordinator;
         this.heartBeatPath = masterConfig.getMasterRegistryPath();
         this.processId = OSUtils.getProcessID();
     }
@@ -75,6 +80,7 @@ public class MasterHeartBeatTask extends 
BaseHeartBeatTask<MasterHeartBeat> {
                 .serverStatus(serverStatus)
                 .host(NetUtils.getHost())
                 .port(masterConfig.getListenPort())
+                .isCoordinator(masterCoordinator.isActive())
                 .build();
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 94e2dc0513..626673ab0a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -30,6 +30,7 @@ import 
org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -53,11 +54,15 @@ public class MasterRegistryClient implements AutoCloseable {
     @Autowired
     private MetricsProvider metricsProvider;
 
+    @Autowired
+    private MasterCoordinator masterCoordinator;
+
     private MasterHeartBeatTask masterHeartBeatTask;
 
     public void start() {
         try {
-            this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, 
metricsProvider, registryClient);
+            this.masterHeartBeatTask =
+                    new MasterHeartBeatTask(masterConfig, metricsProvider, 
registryClient, masterCoordinator);
             // master registry
             registry();
             registryClient.addConnectionStateListener(new 
MasterConnectionStateListener(registryClient));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskGroupUtils.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskGroupUtils.java
index 8306275a3a..7f8d90e74c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskGroupUtils.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskGroupUtils.java
@@ -24,7 +24,7 @@ public class TaskGroupUtils {
     /**
      * Check if the task instance is using task group
      */
-    public static boolean isUsingTaskGroup(TaskInstance taskInstance) {
+    public static boolean isUsingTaskGroup(final TaskInstance taskInstance) {
         return taskInstance.getTaskGroupId() > 0;
     }
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
similarity index 90%
rename from 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
rename to 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
index 9384f19f14..4568e329da 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.taskgroup;
+package org.apache.dolphinscheduler.server.master.engine;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -33,9 +33,6 @@ import 
org.apache.dolphinscheduler.dao.repository.TaskGroupDao;
 import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
 
 import java.util.List;
 
@@ -58,9 +55,6 @@ class TaskGroupCoordinatorTest {
     @InjectMocks
     private TaskGroupCoordinator taskGroupCoordinator;
 
-    @Mock
-    private RegistryClient registryClient;
-
     @Mock
     private TaskGroupDao taskGroupDao;
 
@@ -74,15 +68,13 @@ class TaskGroupCoordinatorTest {
     private WorkflowInstanceDao workflowInstanceDao;
 
     @Test
-    void start() throws InterruptedException {
+    void start() {
         // Get the Lock from Registry
         taskGroupCoordinator.start();
-        Thread.sleep(1_000);
-        verify(registryClient, Mockito.times(1))
-                
.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
-        verify(registryClient, Mockito.times(1))
-                
.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
+        assertTrue(taskGroupCoordinator.isStarted());
 
+        taskGroupCoordinator.close();
+        assertFalse(taskGroupCoordinator.isStarted());
     }
 
     @Test
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java
index 2c5853f33b..7d152ea284 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.integration;
 
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskGroup;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
@@ -49,6 +50,8 @@ public class WorkflowTestCaseContext {
 
     private List<WorkflowTaskRelation> taskRelations;
 
+    private List<TaskGroup> taskGroups;
+
     public WorkflowDefinition getOneWorkflow() {
         if (CollectionUtils.isEmpty(workflows)) {
             throw new IllegalStateException("workflows is empty");
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java
index ef5c210bb2..1a5804fd99 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.integration;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskGroup;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
@@ -30,6 +31,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.WorkflowTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.repository.ProjectDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskGroupDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
@@ -74,6 +76,9 @@ public class WorkflowTestCaseContextFactory {
     @Autowired
     private TaskInstanceDao taskInstanceDao;
 
+    @Autowired
+    private TaskGroupDao taskGroupDao;
+
     public WorkflowTestCaseContext initializeContextFromYaml(final String 
yamlPath) {
         final WorkflowTestCaseContext workflowTestCaseContext = 
YamlFactory.load(yamlPath);
         initializeProjectToDB(workflowTestCaseContext.getProject());
@@ -86,6 +91,9 @@ public class WorkflowTestCaseContextFactory {
         if 
(CollectionUtils.isNotEmpty(workflowTestCaseContext.getTaskInstances())) {
             
initializeTaskInstancesToDB(workflowTestCaseContext.getTaskInstances());
         }
+        if 
(CollectionUtils.isNotEmpty(workflowTestCaseContext.getTaskGroups())) {
+            initializeTaskGroupsToDB(workflowTestCaseContext.getTaskGroups());
+        }
         return workflowTestCaseContext;
     }
 
@@ -134,4 +142,10 @@ public class WorkflowTestCaseContextFactory {
         projectDao.insert(project);
     }
 
+    private void initializeTaskGroupsToDB(final List<TaskGroup> taskGroups) {
+        for (final TaskGroup taskGroup : taskGroups) {
+            taskGroupDao.insert(taskGroup);
+        }
+    }
+
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 97f8e19fee..55558eece2 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -121,6 +121,38 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
         masterContainer.assertAllResourceReleased();
     }
 
+    @Test
+    @DisplayName("Test start a workflow with one fake task(A) using task 
group")
+    public void testStartWorkflow_with_oneSuccessTaskUsingTaskGroup() {
+        final String yaml = 
"/it/start/workflow_with_one_fake_task_using_task_group.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .dryRun(Flag.YES)
+                .build();
+
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(2))
+                .atLeast(Duration.ofSeconds(20))
+                .untilAsserted(() -> {
+                    Assertions
+                            .assertThat(repository.queryTaskInstance(workflow))
+                            .hasSize(4)
+                            .allMatch(taskInstance -> 
TaskExecutionStatus.SUCCESS.equals(taskInstance.getState())
+                                    && taskInstance.getTaskGroupId() == 
context.getTaskGroups().get(0).getId());
+                });
+
+        masterContainer.assertAllResourceReleased();
+    }
+
     @Test
     @DisplayName("Test start a workflow with one sub workflow task(A) success")
     public void testStartWorkflow_with_subWorkflowTask_success() {
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
new file mode 100644
index 0000000000..d846e0f7ea
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_one_fake_task_success
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+    workerGroup: default
+    taskGroupId: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+
+taskGroups:
+  - id: 1
+    name: default
+    projectCode: 1
+    groupSize: 2
+    useSize: 0
+    userId: 1
+    status: YES
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/test/resources/logback.xml 
b/dolphinscheduler-master/src/test/resources/logback.xml
index 5debebf4ee..2af91b3daa 100644
--- a/dolphinscheduler-master/src/test/resources/logback.xml
+++ b/dolphinscheduler-master/src/test/resources/logback.xml
@@ -68,7 +68,7 @@
     </appender>
 
     <!-- We use OFF here to avoid too many exception log in CI   -->
-    <root level="INFO">
+    <root level="OFF">
         <appender-ref ref="STDOUT"/>
         <appender-ref ref="TASKLOGFILE"/>
         <appender-ref ref="MASTERLOGFILE"/>
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index d39e95a84f..75deb8a02d 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -28,6 +28,7 @@ public enum RegistryNodeType {
 
     MASTER("Master", "/nodes/master"),
     MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
+    MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),
     MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", 
"/lock/master-task-group-coordinator"),
     MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator", 
"/lock/master-serial-workflow-coordinator"),
 
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
index 822b4d93f4..c87f341a30 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.registry.api.ha;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.registry.api.Event;
 import org.apache.dolphinscheduler.registry.api.Registry;
 
@@ -41,6 +42,10 @@ public abstract class AbstractHAServer implements HAServer {
 
     private final List<ServerStatusChangeListener> serverStatusChangeListeners;
 
+    private static final long DEFAULT_RETRY_INTERVAL = 5_000;
+
+    private static final int DEFAULT_MAX_RETRY_TIMES = 20;
+
     public AbstractHAServer(final Registry registry, final String 
selectorPath, final String serverIdentify) {
         this.registry = registry;
         this.selectorPath = checkNotNull(selectorPath);
@@ -78,21 +83,30 @@ public abstract class AbstractHAServer implements HAServer {
     @Override
     public boolean participateElection() {
         final String electionLock = selectorPath + "-lock";
-        try {
-            if (registry.acquireLock(electionLock)) {
-                if (!registry.exists(selectorPath)) {
-                    registry.put(selectorPath, serverIdentify, true);
-                    return true;
+        // If meet exception during participate election, will retry.
+        // This can avoid the situation that the server is not elected as 
leader due to network jitter.
+        for (int i = 0; i < DEFAULT_MAX_RETRY_TIMES; i++) {
+            try {
+                try {
+                    if (registry.acquireLock(electionLock)) {
+                        if (!registry.exists(selectorPath)) {
+                            registry.put(selectorPath, serverIdentify, true);
+                            return true;
+                        }
+                        return 
serverIdentify.equals(registry.get(selectorPath));
+                    }
+                    return false;
+                } finally {
+                    registry.releaseLock(electionLock);
                 }
-                return serverIdentify.equals(registry.get(selectorPath));
+            } catch (Exception e) {
+                log.error("Participate election error, meet an exception, will 
retry after {}ms",
+                        DEFAULT_RETRY_INTERVAL, e);
+                ThreadUtils.sleep(DEFAULT_RETRY_INTERVAL);
             }
-            return false;
-        } catch (Exception e) {
-            log.error("participate election error", e);
-            return false;
-        } finally {
-            registry.releaseLock(electionLock);
         }
+        throw new IllegalStateException(
+                "Participate election failed after retry " + 
DEFAULT_MAX_RETRY_TIMES + " times");
     }
 
     @Override
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java
index af109228e2..18fbed8c79 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java
@@ -19,6 +19,6 @@ package org.apache.dolphinscheduler.registry.api.ha;
 
 public interface ServerStatusChangeListener {
 
-    void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus 
currentStatus);
+    void change(final HAServer.ServerStatus originStatus, final 
HAServer.ServerStatus currentStatus);
 
 }

Reply via email to