This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9f67cc40c7 [Improvement-16872][Master] Select a coordinator from
masters to wake up task group (#16873)
9f67cc40c7 is described below
commit 9f67cc40c786c0aeb283518ddd37b94f1fc8fb04
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 18 20:02:59 2024 +0800
[Improvement-16872][Master] Select a coordinator from masters to wake up
task group (#16873)
* Add MasterCoordinator
* Add MasterCoordinatorListener
* Release the task group queue slot when task instance failover
---
.../common/enums/TaskGroupQueueStatus.java | 1 +
.../common/model/MasterHeartBeat.java | 2 +
.../dao/entity/TaskDefinitionLog.java | 2 +
.../server/master/MasterServer.java | 7 ++
.../master/engine/ITaskGroupCoordinator.java | 98 +++++++++++++++
.../server/master/engine/MasterCoordinator.java | 83 ++++++++++++
.../server/master/engine/TaskGroupCoordinator.java | 140 ++++++++-------------
.../server/master/engine/WorkflowEngine.java | 9 +-
.../handler/RecoverFailureTaskCommandHandler.java | 8 --
.../handler/WorkflowFailoverCommandHandler.java | 12 +-
.../task/runnable/FailoverTaskInstanceFactory.java | 8 ++
.../task/statemachine/AbstractTaskStateAction.java | 13 +-
.../statemachine/TaskSubmittedStateAction.java | 4 -
.../master/registry/MasterHeartBeatTask.java | 8 +-
.../master/registry/MasterRegistryClient.java | 7 +-
.../server/master/utils/TaskGroupUtils.java | 2 +-
.../TaskGroupCoordinatorTest.java | 18 +--
.../integration/WorkflowTestCaseContext.java | 3 +
.../WorkflowTestCaseContextFactory.java | 14 +++
.../integration/cases/WorkflowStartTestCase.java | 32 +++++
...rkflow_with_one_fake_task_using_task_group.yaml | 73 +++++++++++
.../src/test/resources/logback.xml | 2 +-
.../registry/api/enums/RegistryNodeType.java | 1 +
.../registry/api/ha/AbstractHAServer.java | 38 ++++--
.../api/ha/ServerStatusChangeListener.java | 2 +-
25 files changed, 432 insertions(+), 155 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java
index 08ee74d4ce..2d26cb1092 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java
@@ -28,6 +28,7 @@ public enum TaskGroupQueueStatus {
WAIT_QUEUE(-1, "wait queue"),
ACQUIRE_SUCCESS(1, "acquire success"),
+ @Deprecated
RELEASE(2, "release");
@EnumValue
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
index b8ae4512dd..1f134df7bf 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
@@ -24,4 +24,6 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
public class MasterHeartBeat extends BaseHeartBeat implements HeartBeat {
+ private boolean isCoordinator;
+
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
index ce47df869a..48e24576f5 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
@@ -70,6 +70,8 @@ public class TaskDefinitionLog extends TaskDefinition {
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setModifyBy(taskDefinition.getModifyBy());
+ this.setTaskGroupId(taskDefinition.getTaskGroupId());
+ this.setTaskGroupPriority(taskDefinition.getTaskGroupPriority());
this.setCpuQuota(taskDefinition.getCpuQuota());
this.setMemoryMax(taskDefinition.getMemoryMax());
this.setTaskExecuteType(taskDefinition.getTaskExecuteType());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 7695f7adc3..3afd4d0990 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
import org.apache.dolphinscheduler.server.master.cluster.ClusterStateMonitors;
+import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEngine;
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
import
org.apache.dolphinscheduler.server.master.engine.system.SystemEventBusFireWorker;
@@ -95,6 +96,9 @@ public class MasterServer implements IStoppable {
@Autowired
private SystemEventBusFireWorker systemEventBusFireWorker;
+ @Autowired
+ private MasterCoordinator masterCoordinator;
+
public static void main(String[] args) {
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
@@ -122,6 +126,8 @@ public class MasterServer implements IStoppable {
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
+ this.masterCoordinator.start();
+
this.clusterManager.start();
this.clusterStateMonitors.start();
@@ -173,6 +179,7 @@ public class MasterServer implements IStoppable {
WorkflowEngine workflowEngine1 = workflowEngine;
SchedulerApi closedSchedulerApi = schedulerApi;
MasterRpcServer closedRpcServer = masterRPCServer;
+ MasterCoordinator closeMasterCoordinator = masterCoordinator;
MasterRegistryClient closedMasterRegistryClient =
masterRegistryClient;
// close spring Context and will invoke method with
@PreDestroy annotation to destroy beans.
// like
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
new file mode 100644
index 0000000000..75bb63438f
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine;
+
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+/**
+ * The TaskGroupCoordinator use to manage the task group slot. The task group
slot is used to limit the number of {@link TaskInstance} that can be run at the
same time.
+ * <p>
+ * The {@link TaskGroupQueue} is used to represent the task group slot. When a
{@link TaskGroupQueue} which inQueue is YES means the {@link TaskGroupQueue} is
using by a {@link TaskInstance}.
+ * <p>
+ * When the {@link TaskInstance} need to use task group, we should use @{@link
ITaskGroupCoordinator#acquireTaskGroupSlot(TaskInstance)} to acquire the task
group slot,
+ * this method doesn't block should always acquire successfully, and you
should directly stop dispatch the task instance.
+ * When the task group slot is available, the ITaskGroupCoordinator will wake
up the waiting {@link TaskInstance} to dispatch.
+ * <pre>
+ * if(needAcquireTaskGroupSlot(taskInstance)) {
+ * taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
+ * return;
+ * }
+ * </pre>
+ * <p>
+ * When the {@link TaskInstance} is finished, we should use @{@link
ITaskGroupCoordinator#releaseTaskGroupSlot(TaskInstance)} to release the task
group slot.
+ * <pre>
+ * if(needToReleaseTaskGroupSlot(taskInstance)) {
+ * taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
+ * }
+ * </pre>
+ */
+public interface ITaskGroupCoordinator extends AutoCloseable {
+
+ /**
+ * Start the TaskGroupCoordinator, once started, you cannot call this
method until you have closed the coordinator.
+ */
+ void start();
+
+ /**
+ * If the {@link TaskInstance#getTaskGroupId()} > 0, and the TaskGroup
flag is {@link Flag#YES} then the task instance need to use task group.
+ *
+ * @param taskInstance task instance
+ * @return true if the TaskInstance need to acquireTaskGroupSlot
+ */
+ boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance);
+
+ /**
+ * Acquire the task group slot for the given {@link TaskInstance}.
+ * <p>
+ * When taskInstance want to acquire a TaskGroup slot, should call this
method. If acquire successfully, will create a TaskGroupQueue in db which is in
queue and status is {@link TaskGroupQueueStatus#WAIT_QUEUE}.
+ * The TaskInstance shouldn't dispatch until there exist available slot,
the taskGroupCoordinator notify it.
+ *
+ * @param taskInstance the task instance which want to acquire task group
slot.
+ * @throws IllegalArgumentException if the taskInstance is null or the
used taskGroup doesn't exist.
+ */
+ void acquireTaskGroupSlot(TaskInstance taskInstance);
+
+ /**
+ * If the TaskInstance is using TaskGroup then it need to release
TaskGroupSlot.
+ *
+ * @param taskInstance taskInstance
+ * @return true if the TaskInstance need to release TaskGroupSlot
+ */
+ boolean needToReleaseTaskGroupSlot(TaskInstance taskInstance);
+
+ /**
+ * Release the task group slot for the given {@link TaskInstance}.
+ * <p>
+ * When taskInstance want to release a TaskGroup slot, should call this
method. The release method will delete the taskGroupQueue.
+ * This method is idempotent, this means that if the task group slot is
already released, this method will do nothing.
+ *
+ * @param taskInstance the task instance which want to release task group
slot.
+ * @throws IllegalArgumentException If the taskInstance is null or the
task doesn't use task group.
+ */
+ void releaseTaskGroupSlot(TaskInstance taskInstance);
+
+ /**
+ * Close the TaskGroupCoordinator, once closed, the coordinator will not
work until you have started the coordinator again.
+ */
+ @Override
+ void close();
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
new file mode 100644
index 0000000000..eaaa6187c9
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
+import
org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * The MasterCoordinator is singleton at the clusters, which is used to do
some control work, e.g manage the {@link ITaskGroupCoordinator}
+ */
+@Slf4j
+@Component
+public class MasterCoordinator extends AbstractHAServer {
+
+ private final ITaskGroupCoordinator taskGroupCoordinator;
+
+ public MasterCoordinator(final Registry registry,
+ final MasterConfig masterConfig,
+ final ITaskGroupCoordinator taskGroupCoordinator)
{
+ super(
+ registry,
+ RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
+ masterConfig.getMasterAddress());
+ this.taskGroupCoordinator = taskGroupCoordinator;
+ addServerStatusChangeListener(new
MasterCoordinatorListener(taskGroupCoordinator));
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ log.info("MasterCoordinator started...");
+ }
+
+ @Override
+ public void close() {
+ taskGroupCoordinator.close();
+ log.info("MasterCoordinator shutdown...");
+ }
+
+ public static class MasterCoordinatorListener extends
AbstractServerStatusChangeListener {
+
+ private final ITaskGroupCoordinator taskGroupCoordinator;
+
+ public MasterCoordinatorListener(ITaskGroupCoordinator
taskGroupCoordinator) {
+ this.taskGroupCoordinator = checkNotNull(taskGroupCoordinator);
+ }
+
+ @Override
+ public void changeToActive() {
+ taskGroupCoordinator.start();
+ }
+
+ @Override
+ public void changeToStandBy() {
+ taskGroupCoordinator.close();
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index 19a20dff9a..4fe69e9ac4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -37,8 +37,6 @@ import
org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcqui
import
org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcquireSuccessNotifyResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.utils.TaskGroupUtils;
import org.apache.commons.collections4.CollectionUtils;
@@ -47,6 +45,7 @@ import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -55,34 +54,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-/**
- * The TaskGroupCoordinator use to manage the task group slot. The task group
slot is used to limit the number of {@link TaskInstance} that can be run at the
same time.
- * <p>
- * The {@link TaskGroupQueue} is used to represent the task group slot. When a
{@link TaskGroupQueue} which inQueue is YES means the {@link TaskGroupQueue} is
using by a {@link TaskInstance}.
- * <p>
- * When the {@link TaskInstance} need to use task group, we should use @{@link
TaskGroupCoordinator#acquireTaskGroupSlot(TaskInstance)} to acquire the task
group slot,
- * this method doesn't block should always acquire successfully, and you
should directly stop dispatch the task instance.
- * When the task group slot is available, the TaskGroupCoordinator will wake
up the waiting {@link TaskInstance} to dispatch.
- * <pre>
- * if(needAcquireTaskGroupSlot(taskInstance)) {
- * taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
- * return;
- * }
- * </pre>
- * <p>
- * When the {@link TaskInstance} is finished, we should use @{@link
TaskGroupCoordinator#releaseTaskGroupSlot(TaskInstance)} to release the task
group slot.
- * <pre>
- * if(needToReleaseTaskGroupSlot(taskInstance)) {
- * taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
- * }
- * </pre>
- */
+import com.google.common.annotations.VisibleForTesting;
+
@Slf4j
@Component
-public class TaskGroupCoordinator extends BaseDaemonThread implements
AutoCloseable {
-
- @Autowired
- private RegistryClient registryClient;
+public class TaskGroupCoordinator implements ITaskGroupCoordinator,
AutoCloseable {
@Autowired
private TaskGroupDao taskGroupDao;
@@ -96,40 +72,48 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
- private boolean flag = true;
+ private boolean flag = false;
- private static final int DEFAULT_LIMIT = 1000;
+ private Thread internalThread;
- public TaskGroupCoordinator() {
- super("TaskGroupCoordinator");
- }
+ private static final int DEFAULT_LIMIT = 1000;
- @Override
public synchronized void start() {
log.info("TaskGroupCoordinator starting...");
+ if (flag) {
+ throw new IllegalStateException("TaskGroupCoordinator is already
started");
+ }
+ if (internalThread != null) {
+ throw new IllegalStateException("InternalThread is already
started");
+ }
flag = true;
- super.start();
+ internalThread = new BaseDaemonThread(this::doStart) {
+ };
+ internalThread.start();
log.info("TaskGroupCoordinator started...");
}
- @Override
- public void run() {
+ @VisibleForTesting
+ boolean isStarted() {
+ return flag;
+ }
+
+ private void doStart() {
+ // Sleep 1 minutes here to make sure the previous task group slot has
been released.
+ // This step is not necessary, since the wakeup operation is
idempotent, but we can avoid confusion warning.
+ ThreadUtils.sleep(TimeUnit.MINUTES.toMillis(1));
+
while (flag) {
try {
-
registryClient.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
- try {
- StopWatch taskGroupCoordinatorRoundCost =
StopWatch.createStarted();
+ final StopWatch taskGroupCoordinatorRoundCost =
StopWatch.createStarted();
- amendTaskGroupUseSize();
- amendTaskGroupQueueStatus();
- dealWithForceStartTaskGroupQueue();
- dealWithWaitingTaskGroupQueue();
+ amendTaskGroupUseSize();
+ amendTaskGroupQueueStatus();
+ dealWithForceStartTaskGroupQueue();
+ dealWithWaitingTaskGroupQueue();
- taskGroupCoordinatorRoundCost.stop();
- log.debug("TaskGroupCoordinator round cost: {}/ms",
taskGroupCoordinatorRoundCost.getTime());
- } finally {
-
registryClient.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
- }
+ taskGroupCoordinatorRoundCost.stop();
+ log.debug("TaskGroupCoordinator round cost: {}/ms",
taskGroupCoordinatorRoundCost.getTime());
} catch (Throwable e) {
log.error("TaskGroupCoordinator error", e);
} finally {
@@ -212,7 +196,6 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
log.warn("The TaskInstance: {} state: {} finished, will
release the TaskGroupQueue: {}",
taskInstance.getName(), taskInstance.getState(),
taskGroupQueue);
deleteTaskGroupQueueSlot(taskGroupQueue);
- continue;
}
}
}
@@ -226,7 +209,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
int limit = DEFAULT_LIMIT;
StopWatch taskGroupCoordinatorRoundTimeCost =
StopWatch.createStarted();
while (true) {
- List<TaskGroupQueue> taskGroupQueues =
+ final List<TaskGroupQueue> taskGroupQueues =
taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(minTaskGroupQueueId,
limit);
if (CollectionUtils.isEmpty(taskGroupQueues)) {
break;
@@ -245,7 +228,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
// Find the force start task group queue(Which is inQueue and
forceStart is YES)
// Notify the related waiting task instance
// Set the taskGroupQueue status to RELEASE and remove it from queue
- for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
+ for (final TaskGroupQueue taskGroupQueue : taskGroupQueues) {
try {
LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId());
// notify the waiting task instance
@@ -312,11 +295,12 @@ public class TaskGroupCoordinator extends
BaseDaemonThread implements AutoClosea
// next time.
notifyWaitingTaskInstance(taskGroupQueue);
- // Set the taskGroupQueue status to RUNNING and remove
from queue
+ // Set the taskGroupQueue status to ACQUIRE_SUCCESS and
remove from WAITING queue
taskGroupQueue.setInQueue(Flag.YES.getCode());
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueDao.updateById(taskGroupQueue);
+ log.info("Success acquire TaskGroupSlot for
TaskGroupQueue: {}", taskGroupQueue);
} catch (UnsupportedOperationException
unsupportedOperationException) {
deleteTaskGroupQueueSlot(taskGroupQueue);
log.info(
@@ -331,13 +315,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
}
}
- /**
- * If the {@link TaskInstance#getTaskGroupId()} > 0, and the TaskGroup
flag is {@link Flag#YES} then the task instance need to use task group.
- *
- * @param taskInstance task instance
- * @return true if the TaskInstance need to acquireTaskGroupSlot
- */
- public boolean needAcquireTaskGroupSlot(TaskInstance taskInstance) {
+ @Override
+ public boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance) {
if (taskInstance == null) {
throw new IllegalArgumentException("The TaskInstance is null");
}
@@ -354,15 +333,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
return Flag.YES.equals(taskGroup.getStatus());
}
- /**
- * Acquire the task group slot for the given {@link TaskInstance}.
- * <p>
- * When taskInstance want to acquire a TaskGroup slot, should call this
method. If acquire successfully, will create a TaskGroupQueue in db which is in
queue and status is {@link TaskGroupQueueStatus#WAIT_QUEUE}.
- * The TaskInstance shouldn't dispatch until there exist available slot,
the taskGroupCoordinator notify it.
- *
- * @param taskInstance the task instance which want to acquire task group
slot.
- * @throws IllegalArgumentException if the taskInstance is null or the
used taskGroup doesn't exist.
- */
+ @Override
public void acquireTaskGroupSlot(TaskInstance taskInstance) {
if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) {
throw new IllegalArgumentException("The current TaskInstance does
not use task group");
@@ -393,12 +364,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
taskGroupQueueDao.insert(taskGroupQueue);
}
- /**
- * If the TaskInstance is using TaskGroup then it need to release
TaskGroupSlot.
- *
- * @param taskInstance taskInsatnce
- * @return true if the TaskInstance need to release TaskGroupSlot
- */
+ @Override
public boolean needToReleaseTaskGroupSlot(TaskInstance taskInstance) {
if (taskInstance == null) {
throw new IllegalArgumentException("The TaskInstance is null");
@@ -410,15 +376,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
return true;
}
- /**
- * Release the task group slot for the given {@link TaskInstance}.
- * <p>
- * When taskInstance want to release a TaskGroup slot, should call this
method. The release method will delete the taskGroupQueue.
- * This method is idempotent, this means that if the task group slot is
already released, this method will do nothing.
- *
- * @param taskInstance the task instance which want to release task group
slot.
- * @throws IllegalArgumentException If the taskInstance is null or the
task doesn't use task group.
- */
+ @Override
public void releaseTaskGroupSlot(TaskInstance taskInstance) {
if (taskInstance == null) {
throw new IllegalArgumentException("The TaskInstance is null");
@@ -488,8 +446,20 @@ public class TaskGroupCoordinator extends BaseDaemonThread
implements AutoClosea
}
@Override
- public void close() throws Exception {
+ public synchronized void close() {
+ if (!flag) {
+ log.warn("TaskGroupCoordinator is already closed");
+ return;
+ }
flag = false;
+ try {
+ if (internalThread != null) {
+ internalThread.interrupt();
+ }
+ } catch (Exception ex) {
+ log.error("Close internalThread failed", ex);
+ }
+ internalThread = null;
log.info("TaskGroupCoordinator closed");
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
index 75bc6b4527..6639fc553b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
@@ -30,9 +30,6 @@ import org.springframework.stereotype.Component;
@Component
public class WorkflowEngine implements AutoCloseable {
- @Autowired
- private TaskGroupCoordinator taskGroupCoordinator;
-
@Autowired
private WorkflowEventBusCoordinator workflowEventBusCoordinator;
@@ -47,8 +44,6 @@ public class WorkflowEngine implements AutoCloseable {
public void start() {
- taskGroupCoordinator.start();
-
workflowEventBusCoordinator.start();
commandEngine.start();
@@ -65,9 +60,7 @@ public class WorkflowEngine implements AutoCloseable {
try (
final CommandEngine ignore1 = commandEngine;
final WorkflowEventBusCoordinator ignore2 =
workflowEventBusCoordinator;
- final GlobalTaskDispatchWaitingQueueLooper ignore3 =
- globalTaskDispatchWaitingQueueLooper;
- final TaskGroupCoordinator ignore4 = taskGroupCoordinator;
+ final GlobalTaskDispatchWaitingQueueLooper ignore3 =
globalTaskDispatchWaitingQueueLooper;
final LogicTaskEngineDelegator ignore5 =
logicTaskEngineDelegator) {
// closed the resource
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
index 3be4cbc5e8..5ad06eaa48 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
@@ -25,14 +25,12 @@ import
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories;
-import
org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
import java.util.ArrayList;
@@ -63,12 +61,6 @@ public class RecoverFailureTaskCommandHandler extends
AbstractCommandHandler {
@Autowired
private TaskInstanceDao taskInstanceDao;
- @Autowired
- private TaskExecutionContextFactory taskExecutionContextFactory;
-
- @Autowired
- private TaskGroupCoordinator taskGroupCoordinator;
-
@Autowired
private ApplicationContext applicationContext;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
index 506fcdffb7..24d117fd87 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
@@ -22,16 +22,14 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import
org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
+import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
-import
org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
import java.util.Map;
@@ -55,13 +53,7 @@ public class WorkflowFailoverCommandHandler extends
AbstractCommandHandler {
private WorkflowInstanceDao workflowInstanceDao;
@Autowired
- private TaskInstanceDao taskInstanceDao;
-
- @Autowired
- private TaskExecutionContextFactory taskExecutionContextFactory;
-
- @Autowired
- private TaskGroupCoordinator taskGroupCoordinator;
+ private ITaskGroupCoordinator taskGroupCoordinator;
@Autowired
private ApplicationContext applicationContext;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FailoverTaskInstanceFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FailoverTaskInstanceFactory.java
index 765fcfa24b..67a2eb87fc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FailoverTaskInstanceFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FailoverTaskInstanceFactory.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.FailoverTaskInstanceFactory.FailoverTaskInstanceBuilder;
import java.util.Date;
@@ -35,6 +36,9 @@ public class FailoverTaskInstanceFactory extends
AbstractTaskInstanceFactory<Fai
@Autowired
private TaskInstanceDao taskInstanceDao;
+ @Autowired
+ private ITaskGroupCoordinator taskGroupCoordinator;
+
@Override
public FailoverTaskInstanceFactory.FailoverTaskInstanceBuilder builder() {
return new FailoverTaskInstanceBuilder(this);
@@ -54,6 +58,10 @@ public class FailoverTaskInstanceFactory extends
AbstractTaskInstanceFactory<Fai
taskInstance.setExecutePath(null);
taskInstanceDao.insert(taskInstance);
+ if
(taskGroupCoordinator.needToReleaseTaskGroupSlot(needFailoverTaskInstance)) {
+
taskGroupCoordinator.releaseTaskGroupSlot(needFailoverTaskInstance);
+ }
+
needFailoverTaskInstance.setFlag(Flag.NO);
needFailoverTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstanceDao.updateById(needFailoverTaskInstance);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index 867a6edeec..3f79d71efa 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -25,10 +25,9 @@ import
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent;
+import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
import
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
@@ -41,7 +40,6 @@ import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRuntimeContextChangedEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
@@ -57,20 +55,14 @@ import com.google.common.collect.Lists;
public abstract class AbstractTaskStateAction implements ITaskStateAction {
@Autowired
- protected TaskGroupCoordinator taskGroupCoordinator;
+ protected ITaskGroupCoordinator taskGroupCoordinator;
@Autowired
protected TaskInstanceDao taskInstanceDao;
- @Autowired
- protected TaskInstanceFactories taskInstanceFactories;
-
@Autowired
protected IWorkflowRepository workflowRepository;
- @Autowired
- private MasterConfig masterConfig;
-
@Autowired
protected ITaskExecutorClient taskExecutorClient;
@@ -237,6 +229,7 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
protected void tryToDispatchTask(final ITaskExecutionRunnable
taskExecutionRunnable) {
if (isTaskNeedAcquireTaskGroupSlot(taskExecutionRunnable)) {
acquireTaskGroupSlot(taskExecutionRunnable);
+ log.info("Task{} using taskGroup, success acquire taskGroup slot",
taskExecutionRunnable.getName());
return;
}
taskExecutionRunnable.getWorkflowEventBus().publish(TaskDispatchLifecycleEvent.of(taskExecutionRunnable));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
index 184e5d4c67..cc82c1030c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -53,9 +52,6 @@ public class TaskSubmittedStateAction extends
AbstractTaskStateAction {
@Autowired
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
- @Autowired
- private TaskGroupCoordinator taskGroupCoordinator;
-
@Autowired
private TaskInstanceDao taskInstanceDao;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index 88306bfa6c..cf33c4824a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -30,6 +30,7 @@ import
org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
+import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import lombok.NonNull;
@@ -44,17 +45,21 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
private final RegistryClient registryClient;
+ private final MasterCoordinator masterCoordinator;
+
private final String heartBeatPath;
private final int processId;
public MasterHeartBeatTask(@NonNull MasterConfig masterConfig,
@NonNull MetricsProvider metricsProvider,
- @NonNull RegistryClient registryClient) {
+ @NonNull RegistryClient registryClient,
+ @NonNull MasterCoordinator masterCoordinator) {
super("MasterHeartBeatTask",
masterConfig.getMaxHeartbeatInterval().toMillis());
this.masterConfig = masterConfig;
this.metricsProvider = metricsProvider;
this.registryClient = registryClient;
+ this.masterCoordinator = masterCoordinator;
this.heartBeatPath = masterConfig.getMasterRegistryPath();
this.processId = OSUtils.getProcessID();
}
@@ -75,6 +80,7 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
.serverStatus(serverStatus)
.host(NetUtils.getHost())
.port(masterConfig.getListenPort())
+ .isCoordinator(masterCoordinator.isActive())
.build();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 94e2dc0513..626673ab0a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -30,6 +30,7 @@ import
org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
import lombok.extern.slf4j.Slf4j;
@@ -53,11 +54,15 @@ public class MasterRegistryClient implements AutoCloseable {
@Autowired
private MetricsProvider metricsProvider;
+ @Autowired
+ private MasterCoordinator masterCoordinator;
+
private MasterHeartBeatTask masterHeartBeatTask;
public void start() {
try {
- this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig,
metricsProvider, registryClient);
+ this.masterHeartBeatTask =
+ new MasterHeartBeatTask(masterConfig, metricsProvider,
registryClient, masterCoordinator);
// master registry
registry();
registryClient.addConnectionStateListener(new
MasterConnectionStateListener(registryClient));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskGroupUtils.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskGroupUtils.java
index 8306275a3a..7f8d90e74c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskGroupUtils.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskGroupUtils.java
@@ -24,7 +24,7 @@ public class TaskGroupUtils {
/**
* Check if the task instance is using task group
*/
- public static boolean isUsingTaskGroup(TaskInstance taskInstance) {
+ public static boolean isUsingTaskGroup(final TaskInstance taskInstance) {
return taskInstance.getTaskGroupId() > 0;
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
similarity index 90%
rename from
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
rename to
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
index 9384f19f14..4568e329da 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.taskgroup;
+package org.apache.dolphinscheduler.server.master.engine;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -33,9 +33,6 @@ import
org.apache.dolphinscheduler.dao.repository.TaskGroupDao;
import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
-import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator;
import java.util.List;
@@ -58,9 +55,6 @@ class TaskGroupCoordinatorTest {
@InjectMocks
private TaskGroupCoordinator taskGroupCoordinator;
- @Mock
- private RegistryClient registryClient;
-
@Mock
private TaskGroupDao taskGroupDao;
@@ -74,15 +68,13 @@ class TaskGroupCoordinatorTest {
private WorkflowInstanceDao workflowInstanceDao;
@Test
- void start() throws InterruptedException {
+ void start() {
// Get the Lock from Registry
taskGroupCoordinator.start();
- Thread.sleep(1_000);
- verify(registryClient, Mockito.times(1))
-
.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
- verify(registryClient, Mockito.times(1))
-
.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
+ assertTrue(taskGroupCoordinator.isStarted());
+ taskGroupCoordinator.close();
+ assertFalse(taskGroupCoordinator.isStarted());
}
@Test
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java
index 2c5853f33b..7d152ea284 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.integration;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
@@ -49,6 +50,8 @@ public class WorkflowTestCaseContext {
private List<WorkflowTaskRelation> taskRelations;
+ private List<TaskGroup> taskGroups;
+
public WorkflowDefinition getOneWorkflow() {
if (CollectionUtils.isEmpty(workflows)) {
throw new IllegalStateException("workflows is empty");
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java
index ef5c210bb2..1a5804fd99 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.integration;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
@@ -30,6 +31,7 @@ import
org.apache.dolphinscheduler.dao.mapper.WorkflowTaskRelationMapper;
import org.apache.dolphinscheduler.dao.repository.ProjectDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskGroupDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
@@ -74,6 +76,9 @@ public class WorkflowTestCaseContextFactory {
@Autowired
private TaskInstanceDao taskInstanceDao;
+ @Autowired
+ private TaskGroupDao taskGroupDao;
+
public WorkflowTestCaseContext initializeContextFromYaml(final String
yamlPath) {
final WorkflowTestCaseContext workflowTestCaseContext =
YamlFactory.load(yamlPath);
initializeProjectToDB(workflowTestCaseContext.getProject());
@@ -86,6 +91,9 @@ public class WorkflowTestCaseContextFactory {
if
(CollectionUtils.isNotEmpty(workflowTestCaseContext.getTaskInstances())) {
initializeTaskInstancesToDB(workflowTestCaseContext.getTaskInstances());
}
+ if
(CollectionUtils.isNotEmpty(workflowTestCaseContext.getTaskGroups())) {
+ initializeTaskGroupsToDB(workflowTestCaseContext.getTaskGroups());
+ }
return workflowTestCaseContext;
}
@@ -134,4 +142,10 @@ public class WorkflowTestCaseContextFactory {
projectDao.insert(project);
}
+ private void initializeTaskGroupsToDB(final List<TaskGroup> taskGroups) {
+ for (final TaskGroup taskGroup : taskGroups) {
+ taskGroupDao.insert(taskGroup);
+ }
+ }
+
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 97f8e19fee..55558eece2 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -121,6 +121,38 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow with one fake task(A) using task
group")
+ public void testStartWorkflow_with_oneSuccessTaskUsingTaskGroup() {
+ final String yaml =
"/it/start/workflow_with_one_fake_task_using_task_group.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .dryRun(Flag.YES)
+ .build();
+
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(2))
+ .atLeast(Duration.ofSeconds(20))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(4)
+ .allMatch(taskInstance ->
TaskExecutionStatus.SUCCESS.equals(taskInstance.getState())
+ && taskInstance.getTaskGroupId() ==
context.getTaskGroups().get(0).getId());
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with one sub workflow task(A) success")
public void testStartWorkflow_with_subWorkflowTask_success() {
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
new file mode 100644
index 0000000000..d846e0f7ea
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_success
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: default
+ taskGroupId: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+
+taskGroups:
+ - id: 1
+ name: default
+ projectCode: 1
+ groupSize: 2
+ useSize: 0
+ userId: 1
+ status: YES
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/test/resources/logback.xml
b/dolphinscheduler-master/src/test/resources/logback.xml
index 5debebf4ee..2af91b3daa 100644
--- a/dolphinscheduler-master/src/test/resources/logback.xml
+++ b/dolphinscheduler-master/src/test/resources/logback.xml
@@ -68,7 +68,7 @@
</appender>
<!-- We use OFF here to avoid too many exception log in CI -->
- <root level="INFO">
+ <root level="OFF">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="MASTERLOGFILE"/>
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index d39e95a84f..75deb8a02d 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -28,6 +28,7 @@ public enum RegistryNodeType {
MASTER("Master", "/nodes/master"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
+ MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock",
"/lock/master-task-group-coordinator"),
MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator",
"/lock/master-serial-workflow-coordinator"),
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
index 822b4d93f4..c87f341a30 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.registry.api.ha;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
@@ -41,6 +42,10 @@ public abstract class AbstractHAServer implements HAServer {
private final List<ServerStatusChangeListener> serverStatusChangeListeners;
+ private static final long DEFAULT_RETRY_INTERVAL = 5_000;
+
+ private static final int DEFAULT_MAX_RETRY_TIMES = 20;
+
public AbstractHAServer(final Registry registry, final String
selectorPath, final String serverIdentify) {
this.registry = registry;
this.selectorPath = checkNotNull(selectorPath);
@@ -78,21 +83,30 @@ public abstract class AbstractHAServer implements HAServer {
@Override
public boolean participateElection() {
final String electionLock = selectorPath + "-lock";
- try {
- if (registry.acquireLock(electionLock)) {
- if (!registry.exists(selectorPath)) {
- registry.put(selectorPath, serverIdentify, true);
- return true;
+ // If meet exception during participate election, will retry.
+ // This can avoid the situation that the server is not elected as
leader due to network jitter.
+ for (int i = 0; i < DEFAULT_MAX_RETRY_TIMES; i++) {
+ try {
+ try {
+ if (registry.acquireLock(electionLock)) {
+ if (!registry.exists(selectorPath)) {
+ registry.put(selectorPath, serverIdentify, true);
+ return true;
+ }
+ return
serverIdentify.equals(registry.get(selectorPath));
+ }
+ return false;
+ } finally {
+ registry.releaseLock(electionLock);
}
- return serverIdentify.equals(registry.get(selectorPath));
+ } catch (Exception e) {
+ log.error("Participate election error, meet an exception, will
retry after {}ms",
+ DEFAULT_RETRY_INTERVAL, e);
+ ThreadUtils.sleep(DEFAULT_RETRY_INTERVAL);
}
- return false;
- } catch (Exception e) {
- log.error("participate election error", e);
- return false;
- } finally {
- registry.releaseLock(electionLock);
}
+ throw new IllegalStateException(
+ "Participate election failed after retry " +
DEFAULT_MAX_RETRY_TIMES + " times");
}
@Override
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java
index af109228e2..18fbed8c79 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java
@@ -19,6 +19,6 @@ package org.apache.dolphinscheduler.registry.api.ha;
public interface ServerStatusChangeListener {
- void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus
currentStatus);
+ void change(final HAServer.ServerStatus originStatus, final
HAServer.ServerStatus currentStatus);
}