This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 40ed67f97a [DSIP-92][Master] Refactor workflow serial strategy (#17531)
40ed67f97a is described below
commit 40ed67f97a31801f0db48570403bbfdb42d796c7
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Nov 27 23:00:42 2025 +0800
[DSIP-92][Master] Refactor workflow serial strategy (#17531)
---
.github/workflows/backend.yml | 2 +-
docs/docs/en/guide/upgrade/incompatible.md | 25 ++-
.../api/dto/workflow/WorkflowCreateRequest.java | 2 +-
.../PauseWorkflowInstanceExecutorDelegate.java | 21 ++-
.../StopWorkflowInstanceExecutorDelegate.java | 21 ++-
.../dolphinscheduler/common/enums/CommandType.java | 1 -
.../common/enums/WorkflowExecutionTypeEnum.java | 5 +-
.../dao/entity/SerialCommand.java} | 41 ++++-
.../dao/entity/WorkflowInstance.java | 5 -
.../dao/mapper/SerialCommandMapper.java} | 20 ++-
.../dao/model/SerialCommandDto.java | 119 ++++++++++++++
.../dao/repository/SerialCommandDao.java} | 17 +-
.../dao/repository/WorkflowInstanceDao.java | 7 -
.../dao/repository/impl/SerialCommandDaoImpl.java | 50 ++++++
.../repository/impl/WorkflowInstanceDaoImpl.java | 9 --
.../dao/mapper/SerialCommandMapper.xml | 36 +++++
.../src/main/resources/sql/dolphinscheduler_h2.sql | 21 +++
.../main/resources/sql/dolphinscheduler_mysql.sql | 21 +++
.../resources/sql/dolphinscheduler_postgresql.sql | 28 ++++
.../mysql/dolphinscheduler_ddl.sql | 17 ++
.../mysql}/dolphinscheduler_dml.sql | 2 +-
.../postgresql/dolphinscheduler_ddl.sql | 45 ++++++
.../postgresql}/dolphinscheduler_dml.sql | 0
.../master/engine/IWorkflowSerialCoordinator.java | 16 +-
.../server/master/engine/MasterCoordinator.java | 37 +++--
.../server/master/engine/TaskGroupCoordinator.java | 1 +
.../handler/ReRunWorkflowCommandHandler.java | 4 -
...r.java => RecoverSerialWaitCommandHandler.java} | 55 +------
.../command/handler/RunWorkflowCommandHandler.java | 8 -
.../handler/WorkflowFailoverCommandHandler.java | 4 -
.../trigger/SubWorkflowManualTrigger.java | 12 +-
.../serial/AbstractSerialCommandHandler.java | 114 ++++++++++++++
.../workflow/serial/ISerialCommandHandler.java | 13 +-
.../serial/SerialCommandDiscardHandler.java | 59 +++++++
.../serial/SerialCommandPriorityHandler.java | 59 +++++++
.../workflow/serial/SerialCommandWaitHandler.java | 51 ++++++
.../workflow/serial/SerialCommandsGroup.java | 27 +++-
.../workflow/serial/WorkflowSerialCoordinator.java | 175 +++++++++++++++++++++
.../statemachine/AbstractWorkflowStateAction.java | 29 +++-
.../trigger/AbstractWorkflowInstanceTrigger.java | 36 ++++-
.../workflow/trigger/AbstractWorkflowTrigger.java | 31 +++-
.../workflow/trigger/WorkflowBackfillTrigger.java | 5 +-
.../workflow/trigger/WorkflowManualTrigger.java | 5 +-
.../workflow/trigger/WorkflowScheduleTrigger.java | 5 +-
.../integration/cases/WorkflowStartTestCase.java | 102 ++++++++++++
.../workflow_with_serial_discard_strategy.yaml | 61 +++++++
.../workflow_with_serial_priority_strategy.yaml | 61 +++++++
.../start/workflow_with_serial_wait_strategy.yaml | 61 +++++++
.../worker/executor/PhysicalTaskExecutor.java | 2 +-
49 files changed, 1375 insertions(+), 173 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index b185a44f9f..8c8b66965f 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -143,7 +143,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- version: ["3.1.9", "3.2.0"]
+ version: ["3.1.9", "3.2.0", "3.3.1"]
case:
- name: schema-check-with-mysql
script: .github/workflows/schema-check/mysql/start-job.sh
diff --git a/docs/docs/en/guide/upgrade/incompatible.md
b/docs/docs/en/guide/upgrade/incompatible.md
index 1c1b679d83..c6a8f3023c 100644
--- a/docs/docs/en/guide/upgrade/incompatible.md
+++ b/docs/docs/en/guide/upgrade/incompatible.md
@@ -2,21 +2,6 @@
This document records the incompatible updates between each version. You need
to check this document before you upgrade to related version.
-## dev
-
-* Upgrade mysql driver version from 8.0.16 to 8.0.33
([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
-* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to
`DATAX_LAUNCHER`
([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
-* Change regex matching sql params in SQL task plugin
([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
-* Remove the spark version of spark task
([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
-* Change the default unix shell executor from sh to bash
([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
-* Remove `deleteSource` in `download()` of `StorageOperate`
([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
-* Remove default key for attribute `data-quality.jar.name` in
`common.properties`
([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
-* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in
`common.properties` and represent for directory
([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
-
-## 3.2.0
-
-* Remove parameter `description` from public interfaces of new resource center
([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
-
## 3.0.0
* Copy and import workflow without 'copy' suffix
[#10607](https://github.com/apache/dolphinscheduler/pull/10607)
@@ -24,7 +9,16 @@ This document records the incompatible updates between each
version. You need to
## 3.2.0
+* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in
`common.properties` and represent for directory
([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
+* Remove default key for attribute `data-quality.jar.name` in
`common.properties`
([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
+* Remove `deleteSource` in `download()` of `StorageOperate`
([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
+* Change the default unix shell executor from sh to bash
([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
+* Remove the spark version of spark task
([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
+* Change regex matching sql params in SQL task plugin
([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
+* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to
`DATAX_LAUNCHER`
([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
+* Upgrade mysql driver version from 8.0.16 to 8.0.33
([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
* Add required field `database` in /datasources/tables &&
/datasources/tableColumns Api
[#14406](https://github.com/apache/dolphinscheduler/pull/14406)
+* Remove parameter `description` from public interfaces of new resource center
([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
## 3.3.0
@@ -41,4 +35,5 @@ This document records the incompatible updates between each
version. You need to
## 3.4.0
* Renamed the publicKey field to privateKey in the SSH connection parameters
under the datasource configuration.
([#17666])(https://github.com/apache/dolphinscheduler/pull/17666)
+* Add table t_ds_serial_command.
([#17531])(https://github.com/apache/dolphinscheduler/pull/17531)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
index 9cf38fe6a4..8fa660f61c 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
@@ -56,7 +56,7 @@ public class WorkflowCreateRequest {
private int timeout;
@Schema(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD /
SERIAL_PRIORITY", example = "PARALLEL", description = "default PARALLEL if not
provide.")
- private String executionType;
+ private String executionType = WorkflowExecutionTypeEnum.PARALLEL.name();
public WorkflowDefinition convert2WorkflowDefinition() {
WorkflowDefinition workflowDefinition = new WorkflowDefinition();
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java
index 0e67388294..97b79dad0b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java
@@ -21,6 +21,7 @@ import
org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import org.springframework.transaction.support.TransactionTemplate;
@Slf4j
@Component
@@ -41,6 +43,12 @@ public class PauseWorkflowInstanceExecutorDelegate
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private SerialCommandDao serialCommandDao;
+
@Override
public Void execute(PauseWorkflowInstanceOperation
workflowInstanceControlRequest) {
final WorkflowInstance workflowInstance =
workflowInstanceControlRequest.workflowInstance;
@@ -64,10 +72,15 @@ public class PauseWorkflowInstanceExecutorDelegate
}
private void directPauseInDB(WorkflowInstance workflowInstance) {
- workflowInstanceDao.updateWorkflowInstanceState(
- workflowInstance.getId(),
- workflowInstance.getState(),
- WorkflowExecutionStatus.PAUSE);
+ // todo: move the pause logic to master
+ transactionTemplate.execute(status -> {
+ workflowInstanceDao.updateWorkflowInstanceState(
+ workflowInstance.getId(),
+ workflowInstance.getState(),
+ WorkflowExecutionStatus.PAUSE);
+
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
+ return null;
+ });
log.info("Update workflow instance {} state from: {} to {} success",
workflowInstance.getName(),
workflowInstance.getState().name(),
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java
index 124c4c6285..3fde96a82e 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java
@@ -21,6 +21,7 @@ import
org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import org.springframework.transaction.support.TransactionTemplate;
@Slf4j
@Component
@@ -41,6 +43,12 @@ public class StopWorkflowInstanceExecutorDelegate
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private SerialCommandDao serialCommandDao;
+
@Override
public Void execute(StopWorkflowInstanceOperation
workflowInstanceControlRequest) {
final WorkflowInstance workflowInstance =
workflowInstanceControlRequest.workflowInstance;
@@ -65,10 +73,15 @@ public class StopWorkflowInstanceExecutorDelegate
}
void directStopInDB(WorkflowInstance workflowInstance) {
- workflowInstanceDao.updateWorkflowInstanceState(
- workflowInstance.getId(),
- workflowInstance.getState(),
- WorkflowExecutionStatus.STOP);
+ // todo: move the stop logic to master
+ transactionTemplate.execute(status -> {
+ workflowInstanceDao.updateWorkflowInstanceState(
+ workflowInstance.getId(),
+ workflowInstance.getState(),
+ WorkflowExecutionStatus.STOP);
+
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
+ return null;
+ });
log.info("Update workflow instance {} state from: {} to {} success",
workflowInstance.getName(),
workflowInstance.getState().name(),
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 8db4feebe0..23d47cbef5 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -89,7 +89,6 @@ public enum CommandType {
STOP(9, "stop a workflow"),
/**
* Recover from the serial-wait state.
- * todo: We may need to remove these command, and use the workflow
instance origin command type when notify from serial wait.
*/
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
/**
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java
index 46ce18c31c..8b8d7e7688 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java
@@ -27,7 +27,6 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
public enum WorkflowExecutionTypeEnum {
PARALLEL(0, "parallel"),
- // todo: the serial is unstable, so we don't support them now
SERIAL_WAIT(1, "serial wait"),
SERIAL_DISCARD(2, "serial discard"),
SERIAL_PRIORITY(3, "serial priority");
@@ -56,4 +55,8 @@ public enum WorkflowExecutionTypeEnum {
throw new IllegalArgumentException("invalid status : " +
executionType);
}
+ public boolean isSerial() {
+ return this != PARALLEL;
+ }
+
}
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/SerialCommand.java
similarity index 50%
copy from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
copy to
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/SerialCommand.java
index 67ac6192f8..b5cb3d4f3b 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/SerialCommand.java
@@ -13,6 +13,43 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params,
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE
'%"publicKey"%';
+package org.apache.dolphinscheduler.dao.entity;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("t_ds_serial_command")
+public class SerialCommand {
+
+ @TableId(value = "id", type = IdType.AUTO)
+ private Integer id;
+
+ private Integer workflowInstanceId;
+
+ private Long workflowDefinitionCode;
+
+ private Integer workflowDefinitionVersion;
+
+ private String command;
+
+ private int state;
+
+ private Date createTime;
+
+ private Date updateTime;
+
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java
index 80b386220c..8abf6a4666 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java
@@ -210,11 +210,6 @@ public class WorkflowInstance {
return commandType;
}
- /**
- * set state with desc
- * @param state
- * @param stateDesc
- */
public void setStateWithDesc(WorkflowExecutionStatus state, String
stateDesc) {
this.setState(state);
if (StringUtils.isEmpty(this.getStateHistory())) {
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.java
similarity index 62%
copy from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
copy to
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.java
index 67ac6192f8..875481e8ca 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.java
@@ -13,6 +13,22 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params,
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE
'%"publicKey"%';
+package org.apache.dolphinscheduler.dao.mapper;
+
+import org.apache.dolphinscheduler.dao.entity.SerialCommand;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface SerialCommandMapper extends BaseMapper<SerialCommand> {
+
+ List<SerialCommand> fetchSerialCommands(@Param("fetchSize") int fetchSize);
+
+ int deleteByWorkflowInstanceId(@Param("workflowInstanceId") int
workflowInstanceId);
+
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/SerialCommandDto.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/SerialCommandDto.java
new file mode 100644
index 0000000000..cf4a8ecdf7
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/SerialCommandDto.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.model;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.SerialCommand;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SerialCommandDto {
+
+ private Integer id;
+
+ private Integer workflowInstanceId;
+
+ private Long workflowDefinitionCode;
+
+ private Integer workflowDefinitionVersion;
+
+ private Command command;
+
+ private State state;
+
+ private Date createTime;
+
+ private Date updateTime;
+
+ public static SerialCommandDto newSerialCommand(Command command) {
+ return SerialCommandDto.builder()
+ .workflowInstanceId(command.getWorkflowInstanceId())
+ .command(command)
+ .workflowDefinitionCode(command.getWorkflowDefinitionCode())
+
.workflowDefinitionVersion(command.getWorkflowDefinitionVersion())
+ .state(State.WAITING)
+ .createTime(new Date())
+ .updateTime(new Date())
+ .build();
+ }
+
+ public static SerialCommandDto fromEntity(SerialCommand serialCommand) {
+ return SerialCommandDto.builder()
+ .id(serialCommand.getId())
+ .workflowInstanceId(serialCommand.getWorkflowInstanceId())
+
.workflowDefinitionCode(serialCommand.getWorkflowDefinitionCode())
+
.workflowDefinitionVersion(serialCommand.getWorkflowDefinitionVersion())
+ .command(JSONUtils.parseObject(serialCommand.getCommand(), new
TypeReference<Command>() {
+ }))
+ .state(State.of(serialCommand.getState()))
+ .createTime(serialCommand.getCreateTime())
+ .updateTime(serialCommand.getUpdateTime())
+ .build();
+ }
+
+ public SerialCommand toEntity() {
+ return SerialCommand.builder()
+ .id(this.id)
+ .workflowInstanceId(this.workflowInstanceId)
+ .workflowDefinitionCode(this.workflowDefinitionCode)
+ .workflowDefinitionVersion(this.workflowDefinitionVersion)
+ .command(JSONUtils.toJsonString(this.command))
+ .state(this.state.getValue())
+ .createTime(this.createTime)
+ .updateTime(this.updateTime)
+ .build();
+ }
+
+ @Getter
+ public enum State {
+
+ // If the workflow instance is finished, then we directly delete the
item from the queue
+ // so there are no finished state here
+ WAITING(0),
+ LAUNCHED(1),
+ ;
+ private final int value;
+
+ State(int value) {
+ this.value = value;
+ }
+
+ public static State of(int value) {
+ for (State state : values()) {
+ if (state.value == value) {
+ return state;
+ }
+ }
+ throw new IllegalArgumentException("Invalid State value: " +
value);
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/SerialCommandDao.java
similarity index 66%
copy from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
copy to
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/SerialCommandDao.java
index 67ac6192f8..065fd1a7ef 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/SerialCommandDao.java
@@ -13,6 +13,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params,
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE
'%"publicKey"%';
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.SerialCommand;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import java.util.List;
+
+public interface SerialCommandDao extends IDao<SerialCommand> {
+
+ List<SerialCommandDto> fetchSerialCommands(int fetchSize);
+
+ int deleteByWorkflowInstanceId(Integer workflowInstanceId);
+
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
index 9f8f11687f..fb1a093bf3 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
@@ -39,13 +39,6 @@ public interface WorkflowInstanceDao extends
IDao<WorkflowInstance> {
WorkflowExecutionStatus originState,
WorkflowExecutionStatus targetState);
- /**
- * performs an "upsert" operation (update or insert) on a WorkflowInstance
object within a new transaction
- *
- * @param workflowInstance workflowInstance
- */
- void performTransactionalUpsert(WorkflowInstance workflowInstance);
-
/**
* find last scheduler workflow instance in the date interval
*
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/SerialCommandDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/SerialCommandDaoImpl.java
new file mode 100644
index 0000000000..499a0c3fc7
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/SerialCommandDaoImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.SerialCommand;
+import org.apache.dolphinscheduler.dao.mapper.SerialCommandMapper;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.BaseDao;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.NonNull;
+
+import org.springframework.stereotype.Repository;
+
+@Repository
+public class SerialCommandDaoImpl extends BaseDao<SerialCommand,
SerialCommandMapper> implements SerialCommandDao {
+
+ public SerialCommandDaoImpl(@NonNull SerialCommandMapper
serialCommandMapper) {
+ super(serialCommandMapper);
+ }
+
+ @Override
+ public List<SerialCommandDto> fetchSerialCommands(int fetchSize) {
+ return
mybatisMapper.fetchSerialCommands(fetchSize).stream().map(SerialCommandDto::fromEntity)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public int deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+ return mybatisMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
index 7e58b8495c..b9b92f54dd 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
@@ -33,9 +33,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
-import org.springframework.transaction.annotation.Isolation;
-import org.springframework.transaction.annotation.Propagation;
-import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Repository
@@ -75,12 +72,6 @@ public class WorkflowInstanceDaoImpl extends
BaseDao<WorkflowInstance, WorkflowI
}
}
- @Override
- @Transactional(propagation = Propagation.REQUIRES_NEW, isolation =
Isolation.READ_COMMITTED, rollbackFor = Exception.class)
- public void performTransactionalUpsert(WorkflowInstance workflowInstance) {
- this.upsertWorkflowInstance(workflowInstance);
- }
-
/**
* find last scheduler process instance in the date interval
*
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.xml
new file mode 100644
index 0000000000..9649758252
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.SerialCommandMapper">
+ <sql id="baseSql">
+ id, workflow_instance_id, workflow_definition_code,
workflow_definition_version, command, state, create_time, update_time
+ </sql>
+
+ <select id="fetchSerialCommands"
resultType="org.apache.dolphinscheduler.dao.entity.SerialCommand">
+ select
+ <include refid="baseSql"/>
+ from t_ds_serial_command
+ order by id asc
+ limit #{fetchSize}
+ </select>
+
+ <delete id="deleteByWorkflowInstanceId">
+ delete from t_ds_serial_command where workflow_instance_id =
#{workflowInstanceId}
+ </delete>
+
+</mapper>
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 2565eaad7b..b86a1707e8 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -342,6 +342,27 @@ CREATE TABLE t_ds_command
-- Records of t_ds_command
-- ----------------------------
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS `t_ds_serial_command`;
+CREATE TABLE `t_ds_serial_command` (
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition
code',
+ `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition
code',
+ `workflow_instance_id` bigint(20) NOT NULL COMMENT 'workflow instance id',
+ `state` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'state of the serial queue: 0
waiting, 1 fired',
+ `command` text COMMENT 'command json',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create
time',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'update time',
+ PRIMARY KEY (`id`),
+ KEY `idx_workflow_instance_id` (`workflow_instance_id`)
+);
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+
-- ----------------------------
-- Table structure for t_ds_datasource
-- ----------------------------
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index f79867123b..ea80068e49 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -350,6 +350,27 @@ CREATE TABLE `t_ds_command` (
-- Records of t_ds_command
-- ----------------------------
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS `t_ds_serial_command`;
+CREATE TABLE `t_ds_serial_command` (
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition
code',
+ `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition
version',
+ `workflow_instance_id` bigint(20) NOT NULL COMMENT 'workflow instance id',
+ `state` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'state of the serial queue: 0
waiting, 1 fired',
+ `command` text COMMENT 'command json',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create
time',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'update time',
+ PRIMARY KEY (`id`),
+ KEY `idx_workflow_instance_id` (`workflow_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+
-- ----------------------------
-- Table structure for t_ds_datasource
-- ----------------------------
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 2882d904bf..afe1b957cd 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -274,6 +274,34 @@ CREATE TABLE t_ds_command (
create index priority_id_index on t_ds_command (workflow_instance_priority,id);
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS t_ds_serial_command;
+CREATE TABLE t_ds_serial_command (
+ id SERIAL PRIMARY KEY,
+ workflow_definition_code BIGINT NOT NULL,
+ workflow_definition_version INT NOT NULL,
+ workflow_instance_id BIGINT NOT NULL,
+ state SMALLINT NOT NULL DEFAULT 0,
+ command TEXT,
+ create_time TIMESTAMP NOT NULL DEFAULT now(),
+ update_time TIMESTAMP NOT NULL DEFAULT now()
+);
+CREATE INDEX idx_workflow_instance_id ON t_ds_serial_command
(workflow_instance_id);
+COMMENT ON TABLE t_ds_serial_command IS 'serial command queue table';
+COMMENT ON COLUMN t_ds_serial_command.id IS 'primary key';
+COMMENT ON COLUMN t_ds_serial_command.workflow_definition_code IS 'workflow
definition code';
+COMMENT ON COLUMN t_ds_serial_command.workflow_definition_version IS 'workflow
definition version';
+COMMENT ON COLUMN t_ds_serial_command.workflow_instance_id IS 'workflow
instance id';
+COMMENT ON COLUMN t_ds_serial_command.state IS 'state of the serial queue: 0
waiting, 1 fired';
+COMMENT ON COLUMN t_ds_serial_command.command IS 'command json';
+COMMENT ON COLUMN t_ds_serial_command.create_time IS 'create time';
+COMMENT ON COLUMN t_ds_serial_command.update_time IS 'update time';
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+
--
-- Table structure for table t_ds_datasource
--
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_ddl.sql
similarity index 53%
copy from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
copy to
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_ddl.sql
index 04518fb8b8..3a9fff25ef 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -19,3 +19,20 @@ ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_task_id`
(`task_id`);
ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_group_id` (`group_id`);
ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_status` (`status`);
ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_workflow_instance_id`
(`workflow_instance_id`);
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `t_ds_serial_command` (
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition
code',
+ `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition
version',
+ `workflow_instance_id` bigint(20) NOT NULL COMMENT 'workflow instance id',
+ `state` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'state of the serial queue: 0
waiting, 1 fired',
+ `command` text COMMENT 'command json',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create
time',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'update time',
+ PRIMARY KEY (`id`),
+ KEY `idx_workflow_instance_id` (`workflow_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
+
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_dml.sql
similarity index 96%
copy from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
copy to
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_dml.sql
index 67ac6192f8..06c6660f2b 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_dml.sql
@@ -15,4 +15,4 @@
* limitations under the License.
*/
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params,
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE
'%"publicKey"%';
+UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params,
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE
'%"publicKey"%';
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_ddl.sql
new file mode 100644
index 0000000000..5d2beea9a0
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+create index idx_t_ds_task_group_queue_task_id on
t_ds_task_group_queue(task_id);
+create index idx_t_ds_task_group_queue_group_id on
t_ds_task_group_queue(group_id);
+create index idx_t_ds_task_group_queue_status on t_ds_task_group_queue(status);
+create index idx_t_ds_task_group_queue_workflow_instance_id on
t_ds_task_group_queue(workflow_instance_id);
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS t_ds_serial_command (
+ id SERIAL PRIMARY KEY,
+ workflow_definition_code BIGINT NOT NULL,
+ workflow_definition_version INT NOT NULL,
+ workflow_instance_id BIGINT NOT NULL,
+ state SMALLINT NOT NULL DEFAULT 0,
+ command TEXT,
+ create_time TIMESTAMP NOT NULL DEFAULT now(),
+ update_time TIMESTAMP NOT NULL DEFAULT now()
+);
+CREATE INDEX "idx_workflow_instance_id" ON "t_ds_serial_command"
("workflow_instance_id");
+COMMENT ON TABLE "t_ds_serial_command" IS 'serial command queue table';
+COMMENT ON COLUMN "t_ds_serial_command"."id" IS 'primary key';
+COMMENT ON COLUMN "t_ds_serial_command"."workflow_definition_code" IS
'workflow definition code';
+COMMENT ON COLUMN "t_ds_serial_command"."workflow_definition_version" IS
'workflow definition version';
+COMMENT ON COLUMN "t_ds_serial_command"."workflow_instance_id" IS 'workflow
instance id';
+COMMENT ON COLUMN "t_ds_serial_command"."state" IS 'state of the serial queue:
0 waiting, 1 fired';
+COMMENT ON COLUMN "t_ds_serial_command"."command" IS 'command json';
+COMMENT ON COLUMN "t_ds_serial_command"."create_time" IS 'create time';
+COMMENT ON COLUMN "t_ds_serial_command"."update_time" IS 'update time';
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_dml.sql
similarity index 100%
rename from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_dml.sql
rename to
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_dml.sql
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/IWorkflowSerialCoordinator.java
similarity index 71%
rename from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/IWorkflowSerialCoordinator.java
index 04518fb8b8..bbd9c248c6 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/IWorkflowSerialCoordinator.java
@@ -13,9 +13,15 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_task_id` (`task_id`);
-ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_group_id` (`group_id`);
-ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_status` (`status`);
-ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_workflow_instance_id`
(`workflow_instance_id`);
+package org.apache.dolphinscheduler.server.master.engine;
+
+public interface IWorkflowSerialCoordinator extends AutoCloseable {
+
+ void start();
+
+ @Override
+ void close();
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
index da2f9c156c..eb2a9dda02 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
@@ -27,6 +27,7 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.failover.IFailoverCoordinator;
import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -44,17 +45,22 @@ public class MasterCoordinator extends AbstractHAServer {
private final IFailoverCoordinator failoverCoordinator;
+ private final IWorkflowSerialCoordinator workflowSerialCoordinator;
+
public MasterCoordinator(final Registry registry,
final MasterConfig masterConfig,
final ITaskGroupCoordinator taskGroupCoordinator,
- final IFailoverCoordinator failoverCoordinator) {
+ final IFailoverCoordinator failoverCoordinator,
+ final IWorkflowSerialCoordinator
workflowSerialCoordinator) {
super(
registry,
RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
masterConfig.getMasterAddress());
this.taskGroupCoordinator = taskGroupCoordinator;
this.failoverCoordinator = failoverCoordinator;
- addServerStatusChangeListener(new
MasterCoordinatorListener(taskGroupCoordinator, failoverCoordinator));
+ this.workflowSerialCoordinator = workflowSerialCoordinator;
+ addServerStatusChangeListener(
+ new MasterCoordinatorListener(taskGroupCoordinator,
failoverCoordinator, workflowSerialCoordinator));
}
@Override
@@ -75,27 +81,38 @@ public class MasterCoordinator extends AbstractHAServer {
private final IFailoverCoordinator failoverCoordinator;
+ private final IWorkflowSerialCoordinator workflowSerialCoordinator;
+ private Future<?> failoverCoordinatorFuture;
+
public MasterCoordinatorListener(ITaskGroupCoordinator
taskGroupCoordinator,
- IFailoverCoordinator
failoverCoordinator) {
+ IFailoverCoordinator
failoverCoordinator,
+ IWorkflowSerialCoordinator
workflowSerialCoordinator) {
this.taskGroupCoordinator = checkNotNull(taskGroupCoordinator);
this.failoverCoordinator = checkNotNull(failoverCoordinator);
+ this.workflowSerialCoordinator =
checkNotNull(workflowSerialCoordinator);
}
@Override
public void changeToActive() {
taskGroupCoordinator.start();
-
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(()
-> {
- try {
- failoverCoordinator.cleanHistoryFailoverFinishedMarks();
- } catch (Exception e) {
- log.error("FailoverCoordinator
cleanHistoryFailoverFinishedMarks failed", e);
- }
- }, 0, 1, TimeUnit.DAYS);
+ workflowSerialCoordinator.start();
+ failoverCoordinatorFuture =
+
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(()
-> {
+ try {
+
failoverCoordinator.cleanHistoryFailoverFinishedMarks();
+ } catch (Exception e) {
+ log.error("FailoverCoordinator
cleanHistoryFailoverFinishedMarks failed", e);
+ }
+ }, 0, 1, TimeUnit.DAYS);
}
@Override
public void changeToStandBy() {
taskGroupCoordinator.close();
+ workflowSerialCoordinator.close();
+ if (failoverCoordinatorFuture != null) {
+ failoverCoordinatorFuture.cancel(true);
+ }
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index c4cf3c1f92..1a0312c70f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -90,6 +90,7 @@ public class TaskGroupCoordinator implements
ITaskGroupCoordinator, AutoCloseabl
flag = true;
internalThread = new BaseDaemonThread(this::doStart) {
};
+ internalThread.setName("TaskGroupCoordinator-Thread");
internalThread.start();
log.info("TaskGroupCoordinator started...");
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
index c685adf7f5..524c7a225a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
@@ -31,7 +31,6 @@ import java.util.Date;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
/**
@@ -46,9 +45,6 @@ public class ReRunWorkflowCommandHandler extends
RunWorkflowCommandHandler {
@Autowired
private TaskInstanceDao taskInstanceDao;
- @Autowired
- private ApplicationContext applicationContext;
-
@Autowired
private MasterConfig masterConfig;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java
similarity index 50%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java
index c685adf7f5..09486db8ba 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java
@@ -20,87 +20,42 @@ package
org.apache.dolphinscheduler.server.master.engine.command.handler;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
-
-import java.util.Date;
-import java.util.List;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
-/**
- * This handler used to handle {@link CommandType#REPEAT_RUNNING} which will
rerun the workflow instance.
- */
@Component
-public class ReRunWorkflowCommandHandler extends RunWorkflowCommandHandler {
+public class RecoverSerialWaitCommandHandler extends AbstractCommandHandler {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
- @Autowired
- private TaskInstanceDao taskInstanceDao;
-
- @Autowired
- private ApplicationContext applicationContext;
-
@Autowired
private MasterConfig masterConfig;
- /**
- * Generate the repeat running workflow instance.
- * <p> Will use the origin workflow instance, but will update the
following fields. Need to note we cannot not
- * update the command params here, since this will make the origin command
params lost.
- * <ul>
- * <li>state</li>
- * <li>command type</li>
- * <li>start time</li>
- * <li>restart time</li>
- * <li>end time</li>
- * <li>run times</li>
- * </ul>
- */
@Override
- protected void assembleWorkflowInstance(final
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
+ protected void
assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteContextBuilder
workflowExecuteContextBuilder) {
final Command command = workflowExecuteContextBuilder.getCommand();
final int workflowInstanceId = command.getWorkflowInstanceId();
final WorkflowInstance workflowInstance =
workflowInstanceDao.queryOptionalById(workflowInstanceId)
.orElseThrow(() -> new IllegalArgumentException("Cannot find
WorkflowInstance:" + workflowInstanceId));
- workflowInstance.setVarPool(null);
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
command.getCommandType().name());
- workflowInstance.setCommandType(command.getCommandType());
- workflowInstance.setRestartTime(new Date());
workflowInstance.setHost(masterConfig.getMasterAddress());
- workflowInstance.setEndTime(null);
- workflowInstance.setRunTimes(workflowInstance.getRunTimes() + 1);
workflowInstanceDao.updateById(workflowInstance);
-
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}
- /**
- * Generate the workflow execution graph.
- * <p> Will clear the history task instance and assembly the start tasks
into the WorkflowExecutionGraph.
- */
@Override
- protected void assembleWorkflowExecutionGraph(final
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
- markAllTaskInstanceInvalid(workflowExecuteContextBuilder);
- super.assembleWorkflowExecutionGraph(workflowExecuteContextBuilder);
- }
+ protected void
assembleWorkflowExecutionGraph(WorkflowExecuteContext.WorkflowExecuteContextBuilder
workflowExecuteContextBuilder) {
- private void markAllTaskInstanceInvalid(final
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
- final WorkflowInstance workflowInstance =
workflowExecuteContextBuilder.getWorkflowInstance();
- final List<TaskInstance> taskInstances =
getValidTaskInstance(workflowInstance);
- taskInstanceDao.markTaskInstanceInvalid(taskInstances);
}
@Override
public CommandType commandType() {
- return CommandType.REPEAT_RUNNING;
+ return CommandType.RECOVER_SERIAL_WAIT;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
index a127ae6cea..5aa3b22e56 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.extract.master.command.ICommandParam;
import
org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
@@ -35,7 +34,6 @@ import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopol
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.commons.collections4.CollectionUtils;
@@ -60,18 +58,12 @@ public class RunWorkflowCommandHandler extends
AbstractCommandHandler {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
- @Autowired
- private TaskInstanceDao taskInstanceDao;
-
@Autowired
private MasterConfig masterConfig;
@Autowired
private ApplicationContext applicationContext;
- @Autowired
- private CuringParamsService curingParamsService;
-
/**
* Will generate a new workflow instance based on the command.
*/
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
index 8348375b6e..e18d02b400 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
@@ -25,7 +25,6 @@ import
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import
org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
@@ -54,9 +53,6 @@ public class WorkflowFailoverCommandHandler extends
AbstractCommandHandler {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
- @Autowired
- private ITaskGroupCoordinator taskGroupCoordinator;
-
@Autowired
private ApplicationContext applicationContext;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/trigger/SubWorkflowManualTrigger.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/trigger/SubWorkflowManualTrigger.java
index cbbc903008..375edb44c1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/trigger/SubWorkflowManualTrigger.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/trigger/SubWorkflowManualTrigger.java
@@ -18,10 +18,13 @@
package
org.apache.dolphinscheduler.server.master.engine.executor.plugin.subworkflow.trigger;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest;
import
org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowManualTrigger;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
import org.springframework.stereotype.Component;
/**
@@ -31,10 +34,11 @@ import org.springframework.stereotype.Component;
public class SubWorkflowManualTrigger extends WorkflowManualTrigger {
@Override
- protected WorkflowInstance constructWorkflowInstance(final
WorkflowManualTriggerRequest workflowManualTriggerRequest) {
- final WorkflowInstance workflowInstance =
super.constructWorkflowInstance(workflowManualTriggerRequest);
- workflowInstance.setIsSubWorkflow(Flag.YES);
- return workflowInstance;
+ protected ImmutablePair<WorkflowDefinition, WorkflowInstance>
constructWorkflowInstance(final WorkflowManualTriggerRequest
workflowManualTriggerRequest) {
+ final ImmutablePair<WorkflowDefinition, WorkflowInstance> pair =
+ super.constructWorkflowInstance(workflowManualTriggerRequest);
+ pair.getRight().setIsSubWorkflow(Flag.YES);
+ return pair;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java
new file mode 100644
index 0000000000..c6a43bc10c
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
+import org.apache.dolphinscheduler.extract.base.client.Clients;
+import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+@Slf4j
+public abstract class AbstractSerialCommandHandler implements
ISerialCommandHandler {
+
+ @Autowired
+ protected WorkflowInstanceDao workflowInstanceDao;
+
+ @Autowired
+ protected IWorkflowControlClient workflowControlClient;
+
+ @Autowired
+ protected TransactionTemplate transactionTemplate;
+
+ @Autowired
+ protected MasterConfig masterConfig;
+
+ @Autowired
+ protected CommandDao commandDao;
+
+ @Autowired
+ protected SerialCommandDao serialCommandDao;
+
+ protected void launchSerialCommand(SerialCommandDto serialCommand) {
+ transactionTemplate.execute(new TransactionCallback<Object>() {
+
+ @Override
+ public Void doInTransaction(TransactionStatus status) {
+ final Command command = serialCommand.getCommand();
+ commandDao.insert(command);
+
+ serialCommand.setState(SerialCommandDto.State.LAUNCHED);
+ serialCommandDao.updateById(serialCommand.toEntity());
+ return null;
+ }
+ });
+ }
+
+ protected void
discardSerialCommandAndStopWorkflowInstanceInDB(SerialCommandDto serialCommand)
{
+ transactionTemplate.execute(new TransactionCallback<Object>() {
+
+ @Override
+ public Void doInTransaction(TransactionStatus status) {
+ serialCommandDao.deleteById(serialCommand.getId());
+
+ // todo: call api to stop the workflow instance
+ final Integer workflowInstanceId =
serialCommand.getWorkflowInstanceId();
+ final WorkflowInstance workflowInstance =
workflowInstanceDao.queryById(workflowInstanceId);
+ workflowInstance.setState(WorkflowExecutionStatus.STOP);
+ workflowInstanceDao.upsertWorkflowInstance(workflowInstance);
+ return null;
+ }
+ });
+ }
+
+ protected void stopWorkflowInstanceInMaster(SerialCommandDto
serialCommand) {
+
+ // todo: directly call master api
+ // todo: call api to stop the workflow instance
+ // todo: We might need to set a status discarding in serial command.
then we can avoid duplicate stop
+ // workflow instance
+ final Integer workflowInstanceId =
serialCommand.getWorkflowInstanceId();
+ final WorkflowInstance workflowInstance =
workflowInstanceDao.queryById(workflowInstanceId);
+ if (!workflowInstance.getState().isCanStop()) {
+ return;
+ }
+
+ final String workflowInstanceLaunchedHost = workflowInstance.getHost();
+ final WorkflowInstanceStopRequest stopRequest = new
WorkflowInstanceStopRequest(workflowInstanceId);
+ if
(masterConfig.getMasterAddress().equals(workflowInstanceLaunchedHost)) {
+ workflowControlClient.stopWorkflowInstance(stopRequest);
+ } else {
+ Clients.withService(IWorkflowControlClient.class)
+ .withHost(workflowInstanceLaunchedHost)
+ .stopWorkflowInstance(stopRequest);
+ }
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_ddl.sql
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/ISerialCommandHandler.java
similarity index 69%
rename from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_ddl.sql
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/ISerialCommandHandler.java
index 89bc78442c..6bac40a7d2 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/ISerialCommandHandler.java
@@ -13,9 +13,12 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-create index idx_t_ds_task_group_queue_task_id on
t_ds_task_group_queue(task_id);
-create index idx_t_ds_task_group_queue_group_id on
t_ds_task_group_queue(group_id);
-create index idx_t_ds_task_group_queue_status on t_ds_task_group_queue(status);
-create index idx_t_ds_task_group_queue_workflow_instance_id on
t_ds_task_group_queue(workflow_instance_id);
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+public interface ISerialCommandHandler {
+
+ void handle(SerialCommandsGroup serialCommandsGroup);
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandDiscardHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandDiscardHandler.java
new file mode 100644
index 0000000000..f2ce439a87
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandDiscardHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * This strategy will discard the new workflow instance if there is a running
workflow instance.
+ */
+@Slf4j
+@Component
+public class SerialCommandDiscardHandler extends AbstractSerialCommandHandler {
+
+ @Override
+ public void handle(SerialCommandsGroup serialCommandsGroup) {
+ // If the first item in the queue is not running, then notify it to
run.
+ // Discard all other items in the queue.
+ List<SerialCommandDto> serialCommands =
serialCommandsGroup.getSerialCommands();
+ for (int i = 0; i < serialCommands.size(); i++) {
+ SerialCommandDto serialCommand = serialCommands.get(i);
+ if (i == 0) {
+ if (serialCommand.getState() ==
SerialCommandDto.State.WAITING) {
+ launchSerialCommand(serialCommand);
+ log.info("Launched SerialCommand: {}", serialCommand);
+ }
+ continue;
+ }
+ // Discard all other items in the queue.
+ if (serialCommand.getState() != SerialCommandDto.State.WAITING) {
+ throw new IllegalStateException(
+ "The post SerialCommand except WAITING state but -> "
+ serialCommand.getState());
+ }
+ discardSerialCommandAndStopWorkflowInstanceInDB(serialCommand);
+ log.info("Discard SerialCommand: {}", serialCommand);
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandPriorityHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandPriorityHandler.java
new file mode 100644
index 0000000000..a6a1745ded
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandPriorityHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * This strategy will stop the previous workflow instance and notify the newly
workflow instance.
+ */
+@Slf4j
+@Component
+public class SerialCommandPriorityHandler extends AbstractSerialCommandHandler
{
+
+ @Override
+ public void handle(SerialCommandsGroup serialCommandsGroup) {
+ final List<SerialCommandDto> serialCommands =
serialCommandsGroup.getSerialCommands();
+ // Stop the 1 ~ n-1 item
+ for (int i = 0; i < serialCommands.size(); i++) {
+ final SerialCommandDto serialCommand = serialCommands.get(i);
+ if (i == serialCommands.size() - 1) {
+ if (serialCommand.getState() ==
SerialCommandDto.State.WAITING) {
+ launchSerialCommand(serialCommand);
+ log.info("Launched SerialCommand: {}", serialCommand);
+ }
+ continue;
+ }
+
+ if (serialCommand.getState() == SerialCommandDto.State.WAITING) {
+ discardSerialCommandAndStopWorkflowInstanceInDB(serialCommand);
+ log.info("Discard SerialCommand: {}", serialCommand);
+ } else {
+ stopWorkflowInstanceInMaster(serialCommand);
+ log.info("Stop the pre WorkflowInstance: {} due to the
workflow using SERIAL_PRIORITY strategy",
+ serialCommand.getWorkflowInstanceId());
+ }
+ }
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandWaitHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandWaitHandler.java
new file mode 100644
index 0000000000..453b5ac000
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandWaitHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * This strategy will wait the oldest workflow instance.
+ */
+@Slf4j
+@Component
+public class SerialCommandWaitHandler extends AbstractSerialCommandHandler {
+
+ @Override
+ public void handle(SerialCommandsGroup serialCommandsGroup) {
+ // If the first command is not fired, then fire it.
+ final List<SerialCommandDto> serialCommands =
serialCommandsGroup.getSerialCommands();
+ if (CollectionUtils.isEmpty(serialCommands)) {
+ return;
+ }
+ final SerialCommandDto serialCommand = serialCommands.get(0);
+ if (serialCommand.getState() == SerialCommandDto.State.WAITING) {
+ launchSerialCommand(serialCommand);
+ log.info("Launched SerialCommand: {}", serialCommand);
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandsGroup.java
similarity index 55%
rename from
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandsGroup.java
index 67ac6192f8..ce324f2ab2 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandsGroup.java
@@ -13,6 +13,29 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params,
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE
'%"publicKey"%';
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionTypeEnum;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SerialCommandsGroup {
+
+ private Long workflowDefinitionCode;
+ private Integer workflowDefinitionVersion;
+ private WorkflowExecutionTypeEnum executionType;
+ private List<SerialCommandDto> serialCommands;
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java
new file mode 100644
index 0000000000..5bd42d020c
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
+import
org.apache.dolphinscheduler.server.master.engine.IWorkflowSerialCoordinator;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Used to coordinate the serial execution of workflows.
+ * <p> Once a workflow instance is submitted with serial wait strategy, it
will be added to the serial queue.
+ * <p> A dedicated thread will poll the serial queue and check if the workflow
instance can be executed.
+ */
+@Slf4j
+@Component
+public class WorkflowSerialCoordinator implements IWorkflowSerialCoordinator {
+
+ @Autowired
+ private SerialCommandDao serialCommandDao;
+
+ @Autowired
+ private WorkflowDefinitionLogDao workflowDefinitionLogDao;
+
+ @Autowired
+ private SerialCommandWaitHandler serialCommandWaitHandler;
+
+ @Autowired
+ private SerialCommandDiscardHandler serialCommandDiscardHandler;
+
+ @Autowired
+ private SerialCommandPriorityHandler serialCommandPriorityHandler;
+
+ private volatile boolean flag = false;
+
+ private Thread internalThread;
+
+ private static final int DEFAULT_FETCH_SIZE = 1000;
+
+ private static final int DEFAULT_FETCH_INTERVAL_SECONDS = 5;
+
+ @Override
+ public synchronized void start() {
+ log.info("WorkflowSerialCoordinator starting...");
+ if (flag) {
+ throw new IllegalStateException("WorkflowSerialCoordinator is
already started");
+ }
+ if (internalThread != null) {
+ throw new IllegalStateException("InternalThread is already
started");
+ }
+ flag = true;
+ internalThread = new BaseDaemonThread(this::doStart) {
+ };
+ internalThread.setName("WorkflowSerialCoordinator-Thread");
+ internalThread.start();
+ log.info("WorkflowSerialCoordinator started...");
+ }
+
+ private void doStart() {
+ while (flag) {
+ try {
+ final StopWatch workflowSerialCoordinatorRoundCost =
StopWatch.createStarted();
+ final List<SerialCommandsGroup> serialCommandsGroups =
fetchSerialCommands();
+ serialCommandsGroups.forEach(this::handleSerialCommand);
+ log.debug("WorkflowSerialCoordinator handled
SerialCommandsGroup size: {}, cost: {}/ms ",
+ serialCommandsGroups.size(),
+
workflowSerialCoordinatorRoundCost.getDuration().toMillis());
+ } catch (Throwable e) {
+ log.error("WorkflowSerialCoordinator error", e);
+ } finally {
+ // sleep 5s
+
ThreadUtils.sleep(TimeUnit.SECONDS.toMillis(DEFAULT_FETCH_INTERVAL_SECONDS));
+ }
+ }
+ }
+
+ private void handleSerialCommand(SerialCommandsGroup serialCommandsGroup) {
+ try {
+ if (serialCommandsGroup.getExecutionType() == null) {
+ log.error("Cannot find the ExecutionType for workflow: {}-{}",
+ serialCommandsGroup.getWorkflowDefinitionCode(),
+ serialCommandsGroup.getWorkflowDefinitionVersion());
+ return;
+ }
+ switch (serialCommandsGroup.getExecutionType()) {
+ case PARALLEL:
+ throw new IllegalStateException(
+ "SerialCommand with ExecutionType=PARALLEL is not
supported, this shouldn't happen");
+ case SERIAL_WAIT:
+ serialCommandWaitHandler.handle(serialCommandsGroup);
+ break;
+ case SERIAL_DISCARD:
+ serialCommandDiscardHandler.handle(serialCommandsGroup);
+ break;
+ case SERIAL_PRIORITY:
+ serialCommandPriorityHandler.handle(serialCommandsGroup);
+ break;
+ default:
+ }
+ } catch (Exception ex) {
+ log.error("Handle SerialCommandsGroup: {} error",
serialCommandsGroup, ex);
+ }
+ }
+
+ private List<SerialCommandsGroup> fetchSerialCommands() {
+ // todo: set a limit here and fetch by incremental id
+ final List<SerialCommandDto> serialCommands =
serialCommandDao.fetchSerialCommands(DEFAULT_FETCH_SIZE);
+ if (CollectionUtils.isEmpty(serialCommands)) {
+ return Collections.emptyList();
+ }
+
+ // workflowCode-> workflowVersion -> commandGroup
+ // Right now, we think each workflow has its own queue
+ final Map<Long, Map<Integer, SerialCommandsGroup>>
serialCommandsGroupMap = new HashMap<>();
+ for (SerialCommandDto serialCommand : serialCommands) {
+
serialCommandsGroupMap.computeIfAbsent(serialCommand.getWorkflowDefinitionCode(),
k -> new HashMap<>())
+
.computeIfAbsent(serialCommand.getWorkflowDefinitionVersion(),
+ v -> createSerialCommandsGroup(serialCommand))
+ .getSerialCommands().add(serialCommand);
+ }
+ return serialCommandsGroupMap.values().stream().flatMap(m ->
m.values().stream()).collect(Collectors.toList());
+ }
+
+ private SerialCommandsGroup createSerialCommandsGroup(SerialCommandDto
serialCommand) {
+ Long workflowDefinitionCode =
serialCommand.getWorkflowDefinitionCode();
+ Integer workflowDefinitionVersion =
serialCommand.getWorkflowDefinitionVersion();
+ final WorkflowDefinitionLog workflowDefinitionLog =
workflowDefinitionLogDao
+ .queryByDefinitionCodeAndVersion(workflowDefinitionCode,
workflowDefinitionVersion);
+ return SerialCommandsGroup.builder()
+ .workflowDefinitionCode(workflowDefinitionCode)
+ .workflowDefinitionVersion(workflowDefinitionVersion)
+ .executionType(workflowDefinitionLog.getExecutionType())
+ .serialCommands(new ArrayList<>())
+ .build();
+ }
+
+ @Override
+ public void close() {
+ flag = false;
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
index e8596c0dcc..488e3aabe9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
@@ -21,6 +21,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent;
@@ -47,6 +48,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.support.TransactionTemplate;
@Slf4j
public abstract class AbstractWorkflowStateAction implements
IWorkflowStateAction {
@@ -66,6 +68,12 @@ public abstract class AbstractWorkflowStateAction implements
IWorkflowStateActio
@Autowired
protected WorkflowAlertManager workflowAlertManager;
+ @Autowired
+ protected TransactionTemplate transactionTemplate;
+
+ @Autowired
+ protected SerialCommandDao serialCommandDao;
+
/**
* Try to trigger the tasks if the trigger condition is met.
* <p> If all the given tasks trigger condition is not met then will try
to emit workflow finish event.
@@ -140,11 +148,22 @@ public abstract class AbstractWorkflowStateAction
implements IWorkflowStateActio
protected void workflowFinish(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
final WorkflowExecutionStatus
workflowExecutionStatus) {
- final WorkflowInstance workflowInstance =
workflowExecutionRunnable.getWorkflowInstance();
- workflowInstance.setEndTime(new Date());
- transformWorkflowInstanceState(workflowExecutionRunnable,
workflowExecutionStatus);
- workflowExecutionRunnable.getWorkflowEventBus()
-
.publish(WorkflowFinalizeLifecycleEvent.of(workflowExecutionRunnable));
+ // todo: add transaction configuration in lifecycle event, all sync
lifecycle should be in transaction
+ transactionTemplate.execute(status -> {
+ final WorkflowInstance workflowInstance =
workflowExecutionRunnable.getWorkflowInstance();
+ workflowInstance.setEndTime(new Date());
+ transformWorkflowInstanceState(workflowExecutionRunnable,
workflowExecutionStatus);
+ if
(workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowDefinition().getExecutionType()
+ .isSerial()) {
+ if
(serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId()) > 0) {
+ log.info("Success clear SerialCommand for
WorkflowExecuteRunnable: {}",
+ workflowExecutionRunnable.getName());
+ }
+ }
+ workflowExecutionRunnable.getWorkflowEventBus()
+
.publish(WorkflowFinalizeLifecycleEvent.of(workflowExecutionRunnable));
+ return null;
+ });
}
/**
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
index 866d758419..f65708b8fc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
@@ -17,10 +17,14 @@
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionTypeEnum;
import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
-import org.apache.dolphinscheduler.dao.repository.UserDao;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
@@ -35,31 +39,47 @@ public abstract class
AbstractWorkflowInstanceTrigger<TriggerRequest, TriggerRes
IWorkflowTrigger<TriggerRequest, TriggerResponse> {
@Autowired
- private WorkflowDefinitionLogDao workflowDefinitionDao;
+ private WorkflowInstanceDao workflowInstanceDao;
@Autowired
- private WorkflowInstanceDao workflowInstanceDao;
+ private CommandDao commandDao;
@Autowired
- private UserDao userDao;
+ protected SerialCommandDao serialCommandDao;
@Autowired
- private CommandDao commandDao;
+ protected WorkflowDefinitionLogDao workflowDefinitionLogDao;
@Override
@Transactional
public TriggerResponse triggerWorkflow(final TriggerRequest
triggerRequest) {
final WorkflowInstance workflowInstance =
constructWorkflowInstance(triggerRequest);
- workflowInstanceDao.updateById(workflowInstance);
-
+ final Long workflowDefinitionCode =
workflowInstance.getWorkflowDefinitionCode();
+ final int workflowDefinitionVersion =
workflowInstance.getWorkflowDefinitionVersion();
+ final WorkflowDefinitionLog workflowDefinition =
workflowDefinitionLogDao.queryByDefinitionCodeAndVersion(
+ workflowDefinitionCode, workflowDefinitionVersion);
+ if (workflowDefinition == null) {
+ throw new IllegalStateException(
+ "Workflow definition not found: " + workflowDefinitionCode
+ " version: "
+ + workflowDefinitionVersion);
+ }
final Command command = constructTriggerCommand(triggerRequest,
workflowInstance);
- commandDao.insert(command);
+ if (workflowDefinition.getExecutionType() ==
WorkflowExecutionTypeEnum.PARALLEL) {
+ workflowInstanceDao.updateById(workflowInstance);
+ commandDao.insert(command);
+ } else {
+ workflowInstance.setState(WorkflowExecutionStatus.SERIAL_WAIT);
+ workflowInstanceDao.updateById(workflowInstance);
+
serialCommandDao.insert(SerialCommandDto.newSerialCommand(command).toEntity());
+ }
return onTriggerSuccess(workflowInstance);
}
+ // todo: 使用WorkflowInstanceConstructor封装
protected abstract WorkflowInstance constructWorkflowInstance(final
TriggerRequest triggerRequest);
+ // todo: 使用CommandConstructor封装
protected abstract Command constructTriggerCommand(final TriggerRequest
triggerRequest,
final WorkflowInstance
workflowInstance);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowTrigger.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowTrigger.java
index 99c2bec0f2..8ff8ceb141 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowTrigger.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowTrigger.java
@@ -17,16 +17,22 @@
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionTypeEnum;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
import org.apache.dolphinscheduler.dao.repository.UserDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -49,20 +55,33 @@ public abstract class
AbstractWorkflowTrigger<TriggerRequest, TriggerResponse>
@Autowired
private CommandDao commandDao;
+ @Autowired
+ private SerialCommandDao serialCommandDao;
+
@Override
@Transactional
public TriggerResponse triggerWorkflow(final TriggerRequest
triggerRequest) {
- final WorkflowInstance workflowInstance =
constructWorkflowInstance(triggerRequest);
- workflowInstanceDao.insert(workflowInstance);
-
- final Command command = constructTriggerCommand(triggerRequest,
workflowInstance);
- commandDao.insert(command);
+ final ImmutablePair<WorkflowDefinition, WorkflowInstance> pair =
constructWorkflowInstance(triggerRequest);
+ final WorkflowDefinition workflowDefinition = pair.getLeft();
+ final WorkflowInstance workflowInstance = pair.getRight();
+ if (workflowDefinition.getExecutionType() ==
WorkflowExecutionTypeEnum.PARALLEL) {
+ workflowInstanceDao.insert(workflowInstance);
+ final Command command = constructTriggerCommand(triggerRequest,
workflowInstance);
+ commandDao.insert(command);
+ } else {
+
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, "Waiting
for serial execution");
+ workflowInstanceDao.insert(workflowInstance);
+ final Command command = constructTriggerCommand(triggerRequest,
workflowInstance);
+
serialCommandDao.insert(SerialCommandDto.newSerialCommand(command).toEntity());
+ }
return onTriggerSuccess(workflowInstance);
}
- protected abstract WorkflowInstance constructWorkflowInstance(final
TriggerRequest triggerRequest);
+ // todo: 使用WorkflowInstanceConstructor封装
+ protected abstract ImmutablePair<WorkflowDefinition, WorkflowInstance>
constructWorkflowInstance(final TriggerRequest triggerRequest);
+ // todo: 使用CommandConstructor封装
protected abstract Command constructTriggerCommand(final TriggerRequest
triggerRequest,
final WorkflowInstance
workflowInstance);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java
index 9714b2f254..aac061bfa3 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowB
import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse;
import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import java.util.Date;
import java.util.List;
@@ -48,7 +49,7 @@ public class WorkflowBackfillTrigger
AbstractWorkflowTrigger<WorkflowBackfillTriggerRequest,
WorkflowBackfillTriggerResponse> {
@Override
- protected WorkflowInstance
constructWorkflowInstance(WorkflowBackfillTriggerRequest
backfillTriggerRequest) {
+ protected ImmutablePair<WorkflowDefinition, WorkflowInstance>
constructWorkflowInstance(WorkflowBackfillTriggerRequest
backfillTriggerRequest) {
final CommandType commandType = CommandType.COMPLEMENT_DATA;
final Long workflowCode = backfillTriggerRequest.getWorkflowCode();
final Integer workflowVersion =
backfillTriggerRequest.getWorkflowVersion();
@@ -84,7 +85,7 @@ public class WorkflowBackfillTrigger
EnvironmentUtils.getEnvironmentCodeOrDefault(backfillTriggerRequest.getEnvironmentCode()));
workflowInstance.setTimeout(workflowDefinition.getTimeout());
workflowInstance.setDryRun(backfillTriggerRequest.getDryRun().getCode());
- return workflowInstance;
+ return ImmutablePair.of(workflowDefinition, workflowInstance);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java
index 45f5ccc0a2..5d04ecf0ed 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowM
import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerResponse;
import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import java.util.Date;
@@ -47,7 +48,7 @@ public class WorkflowManualTrigger
AbstractWorkflowTrigger<WorkflowManualTriggerRequest,
WorkflowManualTriggerResponse> {
@Override
- protected WorkflowInstance constructWorkflowInstance(final
WorkflowManualTriggerRequest workflowManualTriggerRequest) {
+ protected ImmutablePair<WorkflowDefinition, WorkflowInstance>
constructWorkflowInstance(final WorkflowManualTriggerRequest
workflowManualTriggerRequest) {
final CommandType commandType = CommandType.START_PROCESS;
final Long workflowCode =
workflowManualTriggerRequest.getWorkflowDefinitionCode();
final Integer workflowVersion =
workflowManualTriggerRequest.getWorkflowDefinitionVersion();
@@ -81,7 +82,7 @@ public class WorkflowManualTrigger
EnvironmentUtils.getEnvironmentCodeOrDefault(workflowManualTriggerRequest.getEnvironmentCode()));
workflowInstance.setTimeout(workflowDefinition.getTimeout());
workflowInstance.setDryRun(workflowManualTriggerRequest.getDryRun().getCode());
- return workflowInstance;
+ return ImmutablePair.of(workflowDefinition, workflowInstance);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowScheduleTrigger.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowScheduleTrigger.java
index d0a7a282d3..6fedd1cc2f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowScheduleTrigger.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowScheduleTrigger.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowS
import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerResponse;
import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import java.util.Date;
@@ -44,7 +45,7 @@ public class WorkflowScheduleTrigger
AbstractWorkflowTrigger<WorkflowScheduleTriggerRequest,
WorkflowScheduleTriggerResponse> {
@Override
- protected WorkflowInstance
constructWorkflowInstance(WorkflowScheduleTriggerRequest
scheduleTriggerRequest) {
+ protected ImmutablePair<WorkflowDefinition, WorkflowInstance>
constructWorkflowInstance(WorkflowScheduleTriggerRequest
scheduleTriggerRequest) {
final CommandType commandType = CommandType.SCHEDULER;
final Long workflowCode = scheduleTriggerRequest.getWorkflowCode();
final Integer workflowVersion =
scheduleTriggerRequest.getWorkflowVersion();
@@ -79,7 +80,7 @@ public class WorkflowScheduleTrigger
EnvironmentUtils.getEnvironmentCodeOrDefault(scheduleTriggerRequest.getEnvironmentCode()));
workflowInstance.setTimeout(workflowDefinition.getTimeout());
workflowInstance.setDryRun(scheduleTriggerRequest.getDryRun().getCode());
- return workflowInstance;
+ return ImmutablePair.of(workflowDefinition, workflowInstance);
}
@Override
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index b61355195e..1ae906aa3f 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -124,6 +124,108 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow with one fake task(A) using serial
wait strategy")
+ public void testStartWorkflow_with_serialWaitStrategy() {
+ final String yaml =
"/it/start/workflow_with_serial_wait_strategy.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId1 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ final Integer workflowInstanceId2 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ final Integer workflowInstanceId3 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId1).getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId2).getState())
+ .isEqualTo(WorkflowExecutionStatus.SERIAL_WAIT);
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId3).getState())
+ .isEqualTo(WorkflowExecutionStatus.SERIAL_WAIT);
+ });
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ final WorkflowInstance workflowInstance1 =
repository.queryWorkflowInstance(workflowInstanceId1);
+ final WorkflowInstance workflowInstance2 =
repository.queryWorkflowInstance(workflowInstanceId2);
+ final WorkflowInstance workflowInstance3 =
repository.queryWorkflowInstance(workflowInstanceId3);
+
assertThat(workflowInstance1.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
+
assertThat(workflowInstance2.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
+ assertThat(workflowInstance2.getEndTime())
+
.isAtLeast(DateUtils.addSeconds(workflowInstance1.getEndTime(), 5));
+
assertThat(workflowInstance3.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
+ assertThat(workflowInstance3.getEndTime())
+
.isAtLeast(DateUtils.addSeconds(workflowInstance2.getEndTime(), 5));
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow with one fake task(A) using serial
discard strategy")
+ public void testStartWorkflow_with_serialDiscardStrategy() {
+ final String yaml =
"/it/start/workflow_with_serial_discard_strategy.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId1 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ final Integer workflowInstanceId2 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ final Integer workflowInstanceId3 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId1).getState())
+ .isEqualTo(WorkflowExecutionStatus.SUCCESS);
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId2).getState())
+ .isEqualTo(WorkflowExecutionStatus.STOP);
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId3).getState())
+ .isEqualTo(WorkflowExecutionStatus.STOP);
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow with one fake task(A) using serial
priority strategy")
+ public void testStartWorkflow_with_serialPriorityStrategy() {
+ final String yaml =
"/it/start/workflow_with_serial_priority_strategy.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId1 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ final Integer workflowInstanceId2 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+ final Integer workflowInstanceId3 =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId1).getState())
+ .isEqualTo(WorkflowExecutionStatus.STOP);
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId2).getState())
+ .isEqualTo(WorkflowExecutionStatus.STOP);
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId3).getState())
+ .isEqualTo(WorkflowExecutionStatus.SUCCESS);
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with two fake task(A) using task
group")
public void testStartWorkflow_with_successTaskUsingTaskGroup() {
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_discard_strategy.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_discard_strategy.yaml
new file mode 100644
index 0000000000..3bc580ec03
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_discard_strategy.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_serial_discard_strategy
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: SERIAL_DISCARD
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_priority_strategy.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_priority_strategy.yaml
new file mode 100644
index 0000000000..e21fa5798a
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_priority_strategy.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_serial_priority_strategy
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: SERIAL_PRIORITY
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 10"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_wait_strategy.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_wait_strategy.yaml
new file mode 100644
index 0000000000..0cd531d37d
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_wait_strategy.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_serial_wait_strategy
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: SERIAL_WAIT
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
index 1d4c7b038b..7118a099d5 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
@@ -135,7 +135,7 @@ public class PhysicalTaskExecutor extends
AbstractTaskExecutor {
storageOperator,
taskExecutionContext);
taskExecutionContext.setResourceContext(resourceContext);
- log.info("Download resources successfully: \n{}",
taskExecutionContext.getResourceContext());
+ log.info("Download resources successfully: {}",
taskExecutionContext.getResourceContext());
log.info(TaskLogMarkers.excludeInTaskLog(), "Initialized Task
Context{}",
JSONUtils.toPrettyJsonString(taskExecutionContext));