This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 40ed67f97a [DSIP-92][Master] Refactor workflow serial strategy (#17531)
40ed67f97a is described below

commit 40ed67f97a31801f0db48570403bbfdb42d796c7
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Nov 27 23:00:42 2025 +0800

    [DSIP-92][Master] Refactor workflow serial strategy (#17531)
---
 .github/workflows/backend.yml                      |   2 +-
 docs/docs/en/guide/upgrade/incompatible.md         |  25 ++-
 .../api/dto/workflow/WorkflowCreateRequest.java    |   2 +-
 .../PauseWorkflowInstanceExecutorDelegate.java     |  21 ++-
 .../StopWorkflowInstanceExecutorDelegate.java      |  21 ++-
 .../dolphinscheduler/common/enums/CommandType.java |   1 -
 .../common/enums/WorkflowExecutionTypeEnum.java    |   5 +-
 .../dao/entity/SerialCommand.java}                 |  41 ++++-
 .../dao/entity/WorkflowInstance.java               |   5 -
 .../dao/mapper/SerialCommandMapper.java}           |  20 ++-
 .../dao/model/SerialCommandDto.java                | 119 ++++++++++++++
 .../dao/repository/SerialCommandDao.java}          |  17 +-
 .../dao/repository/WorkflowInstanceDao.java        |   7 -
 .../dao/repository/impl/SerialCommandDaoImpl.java  |  50 ++++++
 .../repository/impl/WorkflowInstanceDaoImpl.java   |   9 --
 .../dao/mapper/SerialCommandMapper.xml             |  36 +++++
 .../src/main/resources/sql/dolphinscheduler_h2.sql |  21 +++
 .../main/resources/sql/dolphinscheduler_mysql.sql  |  21 +++
 .../resources/sql/dolphinscheduler_postgresql.sql  |  28 ++++
 .../mysql/dolphinscheduler_ddl.sql                 |  17 ++
 .../mysql}/dolphinscheduler_dml.sql                |   2 +-
 .../postgresql/dolphinscheduler_ddl.sql            |  45 ++++++
 .../postgresql}/dolphinscheduler_dml.sql           |   0
 .../master/engine/IWorkflowSerialCoordinator.java  |  16 +-
 .../server/master/engine/MasterCoordinator.java    |  37 +++--
 .../server/master/engine/TaskGroupCoordinator.java |   1 +
 .../handler/ReRunWorkflowCommandHandler.java       |   4 -
 ...r.java => RecoverSerialWaitCommandHandler.java} |  55 +------
 .../command/handler/RunWorkflowCommandHandler.java |   8 -
 .../handler/WorkflowFailoverCommandHandler.java    |   4 -
 .../trigger/SubWorkflowManualTrigger.java          |  12 +-
 .../serial/AbstractSerialCommandHandler.java       | 114 ++++++++++++++
 .../workflow/serial/ISerialCommandHandler.java     |  13 +-
 .../serial/SerialCommandDiscardHandler.java        |  59 +++++++
 .../serial/SerialCommandPriorityHandler.java       |  59 +++++++
 .../workflow/serial/SerialCommandWaitHandler.java  |  51 ++++++
 .../workflow/serial/SerialCommandsGroup.java       |  27 +++-
 .../workflow/serial/WorkflowSerialCoordinator.java | 175 +++++++++++++++++++++
 .../statemachine/AbstractWorkflowStateAction.java  |  29 +++-
 .../trigger/AbstractWorkflowInstanceTrigger.java   |  36 ++++-
 .../workflow/trigger/AbstractWorkflowTrigger.java  |  31 +++-
 .../workflow/trigger/WorkflowBackfillTrigger.java  |   5 +-
 .../workflow/trigger/WorkflowManualTrigger.java    |   5 +-
 .../workflow/trigger/WorkflowScheduleTrigger.java  |   5 +-
 .../integration/cases/WorkflowStartTestCase.java   | 102 ++++++++++++
 .../workflow_with_serial_discard_strategy.yaml     |  61 +++++++
 .../workflow_with_serial_priority_strategy.yaml    |  61 +++++++
 .../start/workflow_with_serial_wait_strategy.yaml  |  61 +++++++
 .../worker/executor/PhysicalTaskExecutor.java      |   2 +-
 49 files changed, 1375 insertions(+), 173 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index b185a44f9f..8c8b66965f 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -143,7 +143,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        version: ["3.1.9", "3.2.0"]
+        version: ["3.1.9", "3.2.0", "3.3.1"]
         case:
           - name: schema-check-with-mysql
             script: .github/workflows/schema-check/mysql/start-job.sh
diff --git a/docs/docs/en/guide/upgrade/incompatible.md 
b/docs/docs/en/guide/upgrade/incompatible.md
index 1c1b679d83..c6a8f3023c 100644
--- a/docs/docs/en/guide/upgrade/incompatible.md
+++ b/docs/docs/en/guide/upgrade/incompatible.md
@@ -2,21 +2,6 @@
 
 This document records the incompatible updates between each version. You need 
to check this document before you upgrade to related version.
 
-## dev
-
-* Upgrade mysql driver version from 8.0.16 to 8.0.33 
([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
-* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to 
`DATAX_LAUNCHER` 
([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
-* Change regex matching sql params in SQL task plugin 
([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
-* Remove the spark version of spark task 
([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
-* Change the default unix shell executor from sh to bash 
([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
-* Remove `deleteSource` in `download()` of `StorageOperate` 
([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
-* Remove default key for attribute `data-quality.jar.name` in 
`common.properties` 
([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
-* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in 
`common.properties` and represent for directory 
([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
-
-## 3.2.0
-
-* Remove parameter `description` from public interfaces of new resource center 
 ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
-
 ## 3.0.0
 
 * Copy and import workflow without 'copy' suffix 
[#10607](https://github.com/apache/dolphinscheduler/pull/10607)
@@ -24,7 +9,16 @@ This document records the incompatible updates between each 
version. You need to
 
 ## 3.2.0
 
+* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in 
`common.properties` and represent for directory 
([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
+* Remove default key for attribute `data-quality.jar.name` in 
`common.properties` 
([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
+* Remove `deleteSource` in `download()` of `StorageOperate` 
([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
+* Change the default unix shell executor from sh to bash 
([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
+* Remove the spark version of spark task 
([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
+* Change regex matching sql params in SQL task plugin 
([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
+* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to 
`DATAX_LAUNCHER` 
([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
+* Upgrade mysql driver version from 8.0.16 to 8.0.33 
([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
 * Add required field `database` in /datasources/tables && 
/datasources/tableColumns Api 
[#14406](https://github.com/apache/dolphinscheduler/pull/14406)
+* Remove parameter `description` from public interfaces of new resource center 
 ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
 
 ## 3.3.0
 
@@ -41,4 +35,5 @@ This document records the incompatible updates between each 
version. You need to
 ## 3.4.0
 
 * Renamed the publicKey field to privateKey in the SSH connection parameters 
under the datasource configuration. 
([#17666])(https://github.com/apache/dolphinscheduler/pull/17666)
+* Add table t_ds_serial_command. 
([#17531])(https://github.com/apache/dolphinscheduler/pull/17531)
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
index 9cf38fe6a4..8fa660f61c 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
@@ -56,7 +56,7 @@ public class WorkflowCreateRequest {
     private int timeout;
 
     @Schema(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD / 
SERIAL_PRIORITY", example = "PARALLEL", description = "default PARALLEL if not 
provide.")
-    private String executionType;
+    private String executionType = WorkflowExecutionTypeEnum.PARALLEL.name();
 
     public WorkflowDefinition convert2WorkflowDefinition() {
         WorkflowDefinition workflowDefinition = new WorkflowDefinition();
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java
index 0e67388294..97b79dad0b 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java
@@ -21,6 +21,7 @@ import 
org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import org.apache.dolphinscheduler.extract.base.client.Clients;
 import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.support.TransactionTemplate;
 
 @Slf4j
 @Component
@@ -41,6 +43,12 @@ public class PauseWorkflowInstanceExecutorDelegate
     @Autowired
     private WorkflowInstanceDao workflowInstanceDao;
 
+    @Autowired
+    private TransactionTemplate transactionTemplate;
+
+    @Autowired
+    private SerialCommandDao serialCommandDao;
+
     @Override
     public Void execute(PauseWorkflowInstanceOperation 
workflowInstanceControlRequest) {
         final WorkflowInstance workflowInstance = 
workflowInstanceControlRequest.workflowInstance;
@@ -64,10 +72,15 @@ public class PauseWorkflowInstanceExecutorDelegate
     }
 
     private void directPauseInDB(WorkflowInstance workflowInstance) {
-        workflowInstanceDao.updateWorkflowInstanceState(
-                workflowInstance.getId(),
-                workflowInstance.getState(),
-                WorkflowExecutionStatus.PAUSE);
+        // todo: move the pause logic to master
+        transactionTemplate.execute(status -> {
+            workflowInstanceDao.updateWorkflowInstanceState(
+                    workflowInstance.getId(),
+                    workflowInstance.getState(),
+                    WorkflowExecutionStatus.PAUSE);
+            
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
+            return null;
+        });
         log.info("Update workflow instance {} state from: {} to {} success",
                 workflowInstance.getName(),
                 workflowInstance.getState().name(),
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java
index 124c4c6285..3fde96a82e 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java
@@ -21,6 +21,7 @@ import 
org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import org.apache.dolphinscheduler.extract.base.client.Clients;
 import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.support.TransactionTemplate;
 
 @Slf4j
 @Component
@@ -41,6 +43,12 @@ public class StopWorkflowInstanceExecutorDelegate
     @Autowired
     private WorkflowInstanceDao workflowInstanceDao;
 
+    @Autowired
+    private TransactionTemplate transactionTemplate;
+
+    @Autowired
+    private SerialCommandDao serialCommandDao;
+
     @Override
     public Void execute(StopWorkflowInstanceOperation 
workflowInstanceControlRequest) {
         final WorkflowInstance workflowInstance = 
workflowInstanceControlRequest.workflowInstance;
@@ -65,10 +73,15 @@ public class StopWorkflowInstanceExecutorDelegate
     }
 
     void directStopInDB(WorkflowInstance workflowInstance) {
-        workflowInstanceDao.updateWorkflowInstanceState(
-                workflowInstance.getId(),
-                workflowInstance.getState(),
-                WorkflowExecutionStatus.STOP);
+        // todo: move the stop logic to master
+        transactionTemplate.execute(status -> {
+            workflowInstanceDao.updateWorkflowInstanceState(
+                    workflowInstance.getId(),
+                    workflowInstance.getState(),
+                    WorkflowExecutionStatus.STOP);
+            
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
+            return null;
+        });
         log.info("Update workflow instance {} state from: {} to {} success",
                 workflowInstance.getName(),
                 workflowInstance.getState().name(),
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 8db4feebe0..23d47cbef5 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -89,7 +89,6 @@ public enum CommandType {
     STOP(9, "stop a workflow"),
     /**
      * Recover from the serial-wait state.
-     * todo: We may need to remove these command, and use the workflow 
instance origin command type when notify from serial wait.
      */
     RECOVER_SERIAL_WAIT(11, "recover serial wait"),
     /**
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java
index 46ce18c31c..8b8d7e7688 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java
@@ -27,7 +27,6 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
 public enum WorkflowExecutionTypeEnum {
 
     PARALLEL(0, "parallel"),
-    // todo: the serial is unstable, so we don't support them now
     SERIAL_WAIT(1, "serial wait"),
     SERIAL_DISCARD(2, "serial discard"),
     SERIAL_PRIORITY(3, "serial priority");
@@ -56,4 +55,8 @@ public enum WorkflowExecutionTypeEnum {
         throw new IllegalArgumentException("invalid status : " + 
executionType);
     }
 
+    public boolean isSerial() {
+        return this != PARALLEL;
+    }
+
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/SerialCommand.java
similarity index 50%
copy from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
copy to 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/SerialCommand.java
index 67ac6192f8..b5cb3d4f3b 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/SerialCommand.java
@@ -13,6 +13,43 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params, 
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE 
'%"publicKey"%';
+package org.apache.dolphinscheduler.dao.entity;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("t_ds_serial_command")
+public class SerialCommand {
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    private Integer workflowInstanceId;
+
+    private Long workflowDefinitionCode;
+
+    private Integer workflowDefinitionVersion;
+
+    private String command;
+
+    private int state;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java
index 80b386220c..8abf6a4666 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java
@@ -210,11 +210,6 @@ public class WorkflowInstance {
         return commandType;
     }
 
-    /**
-     * set state with desc
-     * @param state
-     * @param stateDesc
-     */
     public void setStateWithDesc(WorkflowExecutionStatus state, String 
stateDesc) {
         this.setState(state);
         if (StringUtils.isEmpty(this.getStateHistory())) {
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.java
similarity index 62%
copy from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
copy to 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.java
index 67ac6192f8..875481e8ca 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.java
@@ -13,6 +13,22 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params, 
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE 
'%"publicKey"%';
+package org.apache.dolphinscheduler.dao.mapper;
+
+import org.apache.dolphinscheduler.dao.entity.SerialCommand;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface SerialCommandMapper extends BaseMapper<SerialCommand> {
+
+    List<SerialCommand> fetchSerialCommands(@Param("fetchSize") int fetchSize);
+
+    int deleteByWorkflowInstanceId(@Param("workflowInstanceId") int 
workflowInstanceId);
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/SerialCommandDto.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/SerialCommandDto.java
new file mode 100644
index 0000000000..cf4a8ecdf7
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/SerialCommandDto.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.model;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.SerialCommand;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SerialCommandDto {
+
+    private Integer id;
+
+    private Integer workflowInstanceId;
+
+    private Long workflowDefinitionCode;
+
+    private Integer workflowDefinitionVersion;
+
+    private Command command;
+
+    private State state;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+    public static SerialCommandDto newSerialCommand(Command command) {
+        return SerialCommandDto.builder()
+                .workflowInstanceId(command.getWorkflowInstanceId())
+                .command(command)
+                .workflowDefinitionCode(command.getWorkflowDefinitionCode())
+                
.workflowDefinitionVersion(command.getWorkflowDefinitionVersion())
+                .state(State.WAITING)
+                .createTime(new Date())
+                .updateTime(new Date())
+                .build();
+    }
+
+    public static SerialCommandDto fromEntity(SerialCommand serialCommand) {
+        return SerialCommandDto.builder()
+                .id(serialCommand.getId())
+                .workflowInstanceId(serialCommand.getWorkflowInstanceId())
+                
.workflowDefinitionCode(serialCommand.getWorkflowDefinitionCode())
+                
.workflowDefinitionVersion(serialCommand.getWorkflowDefinitionVersion())
+                .command(JSONUtils.parseObject(serialCommand.getCommand(), new 
TypeReference<Command>() {
+                }))
+                .state(State.of(serialCommand.getState()))
+                .createTime(serialCommand.getCreateTime())
+                .updateTime(serialCommand.getUpdateTime())
+                .build();
+    }
+
+    public SerialCommand toEntity() {
+        return SerialCommand.builder()
+                .id(this.id)
+                .workflowInstanceId(this.workflowInstanceId)
+                .workflowDefinitionCode(this.workflowDefinitionCode)
+                .workflowDefinitionVersion(this.workflowDefinitionVersion)
+                .command(JSONUtils.toJsonString(this.command))
+                .state(this.state.getValue())
+                .createTime(this.createTime)
+                .updateTime(this.updateTime)
+                .build();
+    }
+
+    @Getter
+    public enum State {
+
+        // If the workflow instance is finished, then we directly delete the 
item from the queue
+        // so there are no finished state here
+        WAITING(0),
+        LAUNCHED(1),
+        ;
+        private final int value;
+
+        State(int value) {
+            this.value = value;
+        }
+
+        public static State of(int value) {
+            for (State state : values()) {
+                if (state.value == value) {
+                    return state;
+                }
+            }
+            throw new IllegalArgumentException("Invalid State value: " + 
value);
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/SerialCommandDao.java
similarity index 66%
copy from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
copy to 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/SerialCommandDao.java
index 67ac6192f8..065fd1a7ef 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/SerialCommandDao.java
@@ -13,6 +13,19 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params, 
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE 
'%"publicKey"%';
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.SerialCommand;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import java.util.List;
+
+public interface SerialCommandDao extends IDao<SerialCommand> {
+
+    List<SerialCommandDto> fetchSerialCommands(int fetchSize);
+
+    int deleteByWorkflowInstanceId(Integer workflowInstanceId);
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
index 9f8f11687f..fb1a093bf3 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
@@ -39,13 +39,6 @@ public interface WorkflowInstanceDao extends 
IDao<WorkflowInstance> {
                                      WorkflowExecutionStatus originState,
                                      WorkflowExecutionStatus targetState);
 
-    /**
-     * performs an "upsert" operation (update or insert) on a WorkflowInstance 
object within a new transaction
-     *
-     * @param workflowInstance workflowInstance
-     */
-    void performTransactionalUpsert(WorkflowInstance workflowInstance);
-
     /**
      * find last scheduler workflow instance in the date interval
      *
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/SerialCommandDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/SerialCommandDaoImpl.java
new file mode 100644
index 0000000000..499a0c3fc7
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/SerialCommandDaoImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.SerialCommand;
+import org.apache.dolphinscheduler.dao.mapper.SerialCommandMapper;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.BaseDao;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.NonNull;
+
+import org.springframework.stereotype.Repository;
+
+@Repository
+public class SerialCommandDaoImpl extends BaseDao<SerialCommand, 
SerialCommandMapper> implements SerialCommandDao {
+
+    public SerialCommandDaoImpl(@NonNull SerialCommandMapper 
serialCommandMapper) {
+        super(serialCommandMapper);
+    }
+
+    @Override
+    public List<SerialCommandDto> fetchSerialCommands(int fetchSize) {
+        return 
mybatisMapper.fetchSerialCommands(fetchSize).stream().map(SerialCommandDto::fromEntity)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public int deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+        return mybatisMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
index 7e58b8495c..b9b92f54dd 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
@@ -33,9 +33,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
-import org.springframework.transaction.annotation.Isolation;
-import org.springframework.transaction.annotation.Propagation;
-import org.springframework.transaction.annotation.Transactional;
 
 @Slf4j
 @Repository
@@ -75,12 +72,6 @@ public class WorkflowInstanceDaoImpl extends 
BaseDao<WorkflowInstance, WorkflowI
         }
     }
 
-    @Override
-    @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = 
Isolation.READ_COMMITTED, rollbackFor = Exception.class)
-    public void performTransactionalUpsert(WorkflowInstance workflowInstance) {
-        this.upsertWorkflowInstance(workflowInstance);
-    }
-
     /**
      * find last scheduler process instance in the date interval
      *
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.xml
new file mode 100644
index 0000000000..9649758252
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/SerialCommandMapper.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"; >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.SerialCommandMapper">
+    <sql id="baseSql">
+        id, workflow_instance_id, workflow_definition_code, 
workflow_definition_version, command, state, create_time, update_time 
+    </sql>
+    
+    <select id="fetchSerialCommands" 
resultType="org.apache.dolphinscheduler.dao.entity.SerialCommand">
+        select 
+        <include refid="baseSql"/>
+        from t_ds_serial_command
+        order by id asc
+        limit #{fetchSize}
+    </select>
+
+    <delete id="deleteByWorkflowInstanceId">
+        delete from t_ds_serial_command where workflow_instance_id = 
#{workflowInstanceId}
+    </delete>
+    
+</mapper>
\ No newline at end of file
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 2565eaad7b..b86a1707e8 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -342,6 +342,27 @@ CREATE TABLE t_ds_command
 -- Records of t_ds_command
 -- ----------------------------
 
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS `t_ds_serial_command`;
+CREATE TABLE `t_ds_serial_command` (
+   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+   `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition 
code',
+   `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition 
code',
+   `workflow_instance_id` bigint(20) NOT NULL COMMENT 'workflow instance id',
+   `state` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'state of the serial queue: 0 
waiting, 1 fired',
+   `command` text COMMENT 'command json',
+   `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create 
time',
+   `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'update time',
+   PRIMARY KEY (`id`),
+   KEY `idx_workflow_instance_id` (`workflow_instance_id`)
+);
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+    
 -- ----------------------------
 -- Table structure for t_ds_datasource
 -- ----------------------------
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index f79867123b..ea80068e49 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -350,6 +350,27 @@ CREATE TABLE `t_ds_command` (
 -- Records of t_ds_command
 -- ----------------------------
 
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS `t_ds_serial_command`;
+CREATE TABLE `t_ds_serial_command` (
+  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+  `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition 
code',
+  `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition 
version',
+  `workflow_instance_id` bigint(20) NOT NULL COMMENT 'workflow instance id',
+  `state` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'state of the serial queue: 0 
waiting, 1 fired',
+  `command` text COMMENT 'command json',
+  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create 
time',
+  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'update time',
+  PRIMARY KEY (`id`),
+  KEY `idx_workflow_instance_id` (`workflow_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+    
 -- ----------------------------
 -- Table structure for t_ds_datasource
 -- ----------------------------
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 2882d904bf..afe1b957cd 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -274,6 +274,34 @@ CREATE TABLE t_ds_command (
 
 create index priority_id_index on t_ds_command (workflow_instance_priority,id);
 
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS t_ds_serial_command;
+CREATE TABLE t_ds_serial_command (
+     id SERIAL PRIMARY KEY,
+     workflow_definition_code BIGINT NOT NULL,
+     workflow_definition_version INT NOT NULL,
+     workflow_instance_id BIGINT NOT NULL,
+     state SMALLINT NOT NULL DEFAULT 0,
+     command TEXT,
+     create_time TIMESTAMP NOT NULL DEFAULT now(),
+     update_time TIMESTAMP NOT NULL DEFAULT now()
+);
+CREATE INDEX idx_workflow_instance_id ON t_ds_serial_command 
(workflow_instance_id);
+COMMENT ON TABLE t_ds_serial_command IS 'serial command queue table';
+COMMENT ON COLUMN t_ds_serial_command.id IS 'primary key';
+COMMENT ON COLUMN t_ds_serial_command.workflow_definition_code IS 'workflow 
definition code';
+COMMENT ON COLUMN t_ds_serial_command.workflow_definition_version IS 'workflow 
definition version';
+COMMENT ON COLUMN t_ds_serial_command.workflow_instance_id IS 'workflow 
instance id';
+COMMENT ON COLUMN t_ds_serial_command.state IS 'state of the serial queue: 0 
waiting, 1 fired';
+COMMENT ON COLUMN t_ds_serial_command.command IS 'command json';
+COMMENT ON COLUMN t_ds_serial_command.create_time IS 'create time';
+COMMENT ON COLUMN t_ds_serial_command.update_time IS 'update time';
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+
 --
 -- Table structure for table t_ds_datasource
 --
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_ddl.sql
similarity index 53%
copy from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
copy to 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_ddl.sql
index 04518fb8b8..3a9fff25ef 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -19,3 +19,20 @@ ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_task_id` 
(`task_id`);
 ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_group_id` (`group_id`);
 ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_status` (`status`);
 ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_workflow_instance_id` 
(`workflow_instance_id`);
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `t_ds_serial_command` (
+   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+   `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition 
code',
+   `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition 
version',
+   `workflow_instance_id` bigint(20) NOT NULL COMMENT 'workflow instance id',
+   `state` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'state of the serial queue: 0 
waiting, 1 fired',
+   `command` text COMMENT 'command json',
+   `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create 
time',
+   `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'update time',
+   PRIMARY KEY (`id`),
+   KEY `idx_workflow_instance_id` (`workflow_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
+
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_dml.sql
similarity index 96%
copy from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
copy to 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_dml.sql
index 67ac6192f8..06c6660f2b 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_dml.sql
@@ -15,4 +15,4 @@
  * limitations under the License.
 */
 
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params, 
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE 
'%"publicKey"%';
+UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params, 
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE 
'%"publicKey"%';
\ No newline at end of file
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_ddl.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_ddl.sql
new file mode 100644
index 0000000000..5d2beea9a0
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+create index idx_t_ds_task_group_queue_task_id on 
t_ds_task_group_queue(task_id);
+create index idx_t_ds_task_group_queue_group_id on 
t_ds_task_group_queue(group_id);
+create index idx_t_ds_task_group_queue_status on t_ds_task_group_queue(status);
+create index idx_t_ds_task_group_queue_workflow_instance_id on 
t_ds_task_group_queue(workflow_instance_id);    
+    
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS t_ds_serial_command (
+     id SERIAL PRIMARY KEY,
+     workflow_definition_code BIGINT NOT NULL,
+     workflow_definition_version INT NOT NULL,
+     workflow_instance_id BIGINT NOT NULL,
+     state SMALLINT NOT NULL DEFAULT 0,
+     command TEXT,
+     create_time TIMESTAMP NOT NULL DEFAULT now(),
+     update_time TIMESTAMP NOT NULL DEFAULT now()
+);
+CREATE INDEX "idx_workflow_instance_id" ON "t_ds_serial_command" 
("workflow_instance_id");
+COMMENT ON TABLE "t_ds_serial_command" IS 'serial command queue table';
+COMMENT ON COLUMN "t_ds_serial_command"."id" IS 'primary key';
+COMMENT ON COLUMN "t_ds_serial_command"."workflow_definition_code" IS 
'workflow definition code';
+COMMENT ON COLUMN "t_ds_serial_command"."workflow_definition_version" IS 
'workflow definition version';
+COMMENT ON COLUMN "t_ds_serial_command"."workflow_instance_id" IS 'workflow 
instance id';
+COMMENT ON COLUMN "t_ds_serial_command"."state" IS 'state of the serial queue: 
0 waiting, 1 fired';
+COMMENT ON COLUMN "t_ds_serial_command"."command" IS 'command json';
+COMMENT ON COLUMN "t_ds_serial_command"."create_time" IS 'create time';
+COMMENT ON COLUMN "t_ds_serial_command"."update_time" IS 'update time';
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_dml.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_dml.sql
similarity index 100%
rename from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_dml.sql
rename to 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/postgresql/dolphinscheduler_dml.sql
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/IWorkflowSerialCoordinator.java
similarity index 71%
rename from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/IWorkflowSerialCoordinator.java
index 04518fb8b8..bbd9c248c6 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/mysql/dolphinscheduler_ddl.sql
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/IWorkflowSerialCoordinator.java
@@ -13,9 +13,15 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_task_id` (`task_id`);
-ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_group_id` (`group_id`);
-ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_status` (`status`);
-ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_workflow_instance_id` 
(`workflow_instance_id`);
+package org.apache.dolphinscheduler.server.master.engine;
+
+public interface IWorkflowSerialCoordinator extends AutoCloseable {
+
+    void start();
+
+    @Override
+    void close();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
index da2f9c156c..eb2a9dda02 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
@@ -27,6 +27,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.failover.IFailoverCoordinator;
 import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
 
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
@@ -44,17 +45,22 @@ public class MasterCoordinator extends AbstractHAServer {
 
     private final IFailoverCoordinator failoverCoordinator;
 
+    private final IWorkflowSerialCoordinator workflowSerialCoordinator;
+
     public MasterCoordinator(final Registry registry,
                              final MasterConfig masterConfig,
                              final ITaskGroupCoordinator taskGroupCoordinator,
-                             final IFailoverCoordinator failoverCoordinator) {
+                             final IFailoverCoordinator failoverCoordinator,
+                             final IWorkflowSerialCoordinator 
workflowSerialCoordinator) {
         super(
                 registry,
                 RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
                 masterConfig.getMasterAddress());
         this.taskGroupCoordinator = taskGroupCoordinator;
         this.failoverCoordinator = failoverCoordinator;
-        addServerStatusChangeListener(new 
MasterCoordinatorListener(taskGroupCoordinator, failoverCoordinator));
+        this.workflowSerialCoordinator = workflowSerialCoordinator;
+        addServerStatusChangeListener(
+                new MasterCoordinatorListener(taskGroupCoordinator, 
failoverCoordinator, workflowSerialCoordinator));
     }
 
     @Override
@@ -75,27 +81,38 @@ public class MasterCoordinator extends AbstractHAServer {
 
         private final IFailoverCoordinator failoverCoordinator;
 
+        private final IWorkflowSerialCoordinator workflowSerialCoordinator;
+        private Future<?> failoverCoordinatorFuture;
+
         public MasterCoordinatorListener(ITaskGroupCoordinator 
taskGroupCoordinator,
-                                         IFailoverCoordinator 
failoverCoordinator) {
+                                         IFailoverCoordinator 
failoverCoordinator,
+                                         IWorkflowSerialCoordinator 
workflowSerialCoordinator) {
             this.taskGroupCoordinator = checkNotNull(taskGroupCoordinator);
             this.failoverCoordinator = checkNotNull(failoverCoordinator);
+            this.workflowSerialCoordinator = 
checkNotNull(workflowSerialCoordinator);
         }
 
         @Override
         public void changeToActive() {
             taskGroupCoordinator.start();
-            
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(()
 -> {
-                try {
-                    failoverCoordinator.cleanHistoryFailoverFinishedMarks();
-                } catch (Exception e) {
-                    log.error("FailoverCoordinator 
cleanHistoryFailoverFinishedMarks failed", e);
-                }
-            }, 0, 1, TimeUnit.DAYS);
+            workflowSerialCoordinator.start();
+            failoverCoordinatorFuture =
+                    
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(()
 -> {
+                        try {
+                            
failoverCoordinator.cleanHistoryFailoverFinishedMarks();
+                        } catch (Exception e) {
+                            log.error("FailoverCoordinator 
cleanHistoryFailoverFinishedMarks failed", e);
+                        }
+                    }, 0, 1, TimeUnit.DAYS);
         }
 
         @Override
         public void changeToStandBy() {
             taskGroupCoordinator.close();
+            workflowSerialCoordinator.close();
+            if (failoverCoordinatorFuture != null) {
+                failoverCoordinatorFuture.cancel(true);
+            }
         }
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index c4cf3c1f92..1a0312c70f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -90,6 +90,7 @@ public class TaskGroupCoordinator implements 
ITaskGroupCoordinator, AutoCloseabl
         flag = true;
         internalThread = new BaseDaemonThread(this::doStart) {
         };
+        internalThread.setName("TaskGroupCoordinator-Thread");
         internalThread.start();
         log.info("TaskGroupCoordinator started...");
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
index c685adf7f5..524c7a225a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
@@ -31,7 +31,6 @@ import java.util.Date;
 import java.util.List;
 
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
 /**
@@ -46,9 +45,6 @@ public class ReRunWorkflowCommandHandler extends 
RunWorkflowCommandHandler {
     @Autowired
     private TaskInstanceDao taskInstanceDao;
 
-    @Autowired
-    private ApplicationContext applicationContext;
-
     @Autowired
     private MasterConfig masterConfig;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java
similarity index 50%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java
index c685adf7f5..09486db8ba 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java
@@ -20,87 +20,42 @@ package 
org.apache.dolphinscheduler.server.master.engine.command.handler;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
-
-import java.util.Date;
-import java.util.List;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext;
 
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
-/**
- * This handler used to handle {@link CommandType#REPEAT_RUNNING} which will 
rerun the workflow instance.
- */
 @Component
-public class ReRunWorkflowCommandHandler extends RunWorkflowCommandHandler {
+public class RecoverSerialWaitCommandHandler extends AbstractCommandHandler {
 
     @Autowired
     private WorkflowInstanceDao workflowInstanceDao;
 
-    @Autowired
-    private TaskInstanceDao taskInstanceDao;
-
-    @Autowired
-    private ApplicationContext applicationContext;
-
     @Autowired
     private MasterConfig masterConfig;
 
-    /**
-     * Generate the repeat running workflow instance.
-     * <p> Will use the origin workflow instance, but will update the 
following fields. Need to note we cannot not
-     * update the command params here, since this will make the origin command 
params lost.
-     * <ul>
-     *     <li>state</li>
-     *     <li>command type</li>
-     *     <li>start time</li>
-     *     <li>restart time</li>
-     *     <li>end time</li>
-     *     <li>run times</li>
-     * </ul>
-     */
     @Override
-    protected void assembleWorkflowInstance(final 
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
+    protected void 
assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteContextBuilder 
workflowExecuteContextBuilder) {
         final Command command = workflowExecuteContextBuilder.getCommand();
         final int workflowInstanceId = command.getWorkflowInstanceId();
         final WorkflowInstance workflowInstance = 
workflowInstanceDao.queryOptionalById(workflowInstanceId)
                 .orElseThrow(() -> new IllegalArgumentException("Cannot find 
WorkflowInstance:" + workflowInstanceId));
-        workflowInstance.setVarPool(null);
         
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, 
command.getCommandType().name());
-        workflowInstance.setCommandType(command.getCommandType());
-        workflowInstance.setRestartTime(new Date());
         workflowInstance.setHost(masterConfig.getMasterAddress());
-        workflowInstance.setEndTime(null);
-        workflowInstance.setRunTimes(workflowInstance.getRunTimes() + 1);
         workflowInstanceDao.updateById(workflowInstance);
-
         workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
     }
 
-    /**
-     * Generate the workflow execution graph.
-     * <p> Will clear the history task instance and assembly the start tasks 
into the WorkflowExecutionGraph.
-     */
     @Override
-    protected void assembleWorkflowExecutionGraph(final 
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
-        markAllTaskInstanceInvalid(workflowExecuteContextBuilder);
-        super.assembleWorkflowExecutionGraph(workflowExecuteContextBuilder);
-    }
+    protected void 
assembleWorkflowExecutionGraph(WorkflowExecuteContext.WorkflowExecuteContextBuilder
 workflowExecuteContextBuilder) {
 
-    private void markAllTaskInstanceInvalid(final 
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
-        final WorkflowInstance workflowInstance = 
workflowExecuteContextBuilder.getWorkflowInstance();
-        final List<TaskInstance> taskInstances = 
getValidTaskInstance(workflowInstance);
-        taskInstanceDao.markTaskInstanceInvalid(taskInstances);
     }
 
     @Override
     public CommandType commandType() {
-        return CommandType.REPEAT_RUNNING;
+        return CommandType.RECOVER_SERIAL_WAIT;
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
index a127ae6cea..5aa3b22e56 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import org.apache.dolphinscheduler.extract.master.command.ICommandParam;
 import 
org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
@@ -35,7 +34,6 @@ import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopol
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -60,18 +58,12 @@ public class RunWorkflowCommandHandler extends 
AbstractCommandHandler {
     @Autowired
     private WorkflowInstanceDao workflowInstanceDao;
 
-    @Autowired
-    private TaskInstanceDao taskInstanceDao;
-
     @Autowired
     private MasterConfig masterConfig;
 
     @Autowired
     private ApplicationContext applicationContext;
 
-    @Autowired
-    private CuringParamsService curingParamsService;
-
     /**
      * Will generate a new workflow instance based on the command.
      */
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
index 8348375b6e..e18d02b400 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
@@ -25,7 +25,6 @@ import 
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import 
org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
 import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
 import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
 import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
@@ -54,9 +53,6 @@ public class WorkflowFailoverCommandHandler extends 
AbstractCommandHandler {
     @Autowired
     private WorkflowInstanceDao workflowInstanceDao;
 
-    @Autowired
-    private ITaskGroupCoordinator taskGroupCoordinator;
-
     @Autowired
     private ApplicationContext applicationContext;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/trigger/SubWorkflowManualTrigger.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/trigger/SubWorkflowManualTrigger.java
index cbbc903008..375edb44c1 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/trigger/SubWorkflowManualTrigger.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/trigger/SubWorkflowManualTrigger.java
@@ -18,10 +18,13 @@
 package 
org.apache.dolphinscheduler.server.master.engine.executor.plugin.subworkflow.trigger;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowManualTrigger;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
 import org.springframework.stereotype.Component;
 
 /**
@@ -31,10 +34,11 @@ import org.springframework.stereotype.Component;
 public class SubWorkflowManualTrigger extends WorkflowManualTrigger {
 
     @Override
-    protected WorkflowInstance constructWorkflowInstance(final 
WorkflowManualTriggerRequest workflowManualTriggerRequest) {
-        final WorkflowInstance workflowInstance = 
super.constructWorkflowInstance(workflowManualTriggerRequest);
-        workflowInstance.setIsSubWorkflow(Flag.YES);
-        return workflowInstance;
+    protected ImmutablePair<WorkflowDefinition, WorkflowInstance> 
constructWorkflowInstance(final WorkflowManualTriggerRequest 
workflowManualTriggerRequest) {
+        final ImmutablePair<WorkflowDefinition, WorkflowInstance> pair =
+                super.constructWorkflowInstance(workflowManualTriggerRequest);
+        pair.getRight().setIsSubWorkflow(Flag.YES);
+        return pair;
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java
new file mode 100644
index 0000000000..c6a43bc10c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
+import org.apache.dolphinscheduler.extract.base.client.Clients;
+import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
+import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+@Slf4j
+public abstract class AbstractSerialCommandHandler implements 
ISerialCommandHandler {
+
+    @Autowired
+    protected WorkflowInstanceDao workflowInstanceDao;
+
+    @Autowired
+    protected IWorkflowControlClient workflowControlClient;
+
+    @Autowired
+    protected TransactionTemplate transactionTemplate;
+
+    @Autowired
+    protected MasterConfig masterConfig;
+
+    @Autowired
+    protected CommandDao commandDao;
+
+    @Autowired
+    protected SerialCommandDao serialCommandDao;
+
+    protected void launchSerialCommand(SerialCommandDto serialCommand) {
+        transactionTemplate.execute(new TransactionCallback<Object>() {
+
+            @Override
+            public Void doInTransaction(TransactionStatus status) {
+                final Command command = serialCommand.getCommand();
+                commandDao.insert(command);
+
+                serialCommand.setState(SerialCommandDto.State.LAUNCHED);
+                serialCommandDao.updateById(serialCommand.toEntity());
+                return null;
+            }
+        });
+    }
+
+    protected void 
discardSerialCommandAndStopWorkflowInstanceInDB(SerialCommandDto serialCommand) 
{
+        transactionTemplate.execute(new TransactionCallback<Object>() {
+
+            @Override
+            public Void doInTransaction(TransactionStatus status) {
+                serialCommandDao.deleteById(serialCommand.getId());
+
+                // todo: call api to stop the workflow instance
+                final Integer workflowInstanceId = 
serialCommand.getWorkflowInstanceId();
+                final WorkflowInstance workflowInstance = 
workflowInstanceDao.queryById(workflowInstanceId);
+                workflowInstance.setState(WorkflowExecutionStatus.STOP);
+                workflowInstanceDao.upsertWorkflowInstance(workflowInstance);
+                return null;
+            }
+        });
+    }
+
+    protected void stopWorkflowInstanceInMaster(SerialCommandDto 
serialCommand) {
+
+        // todo: directly call master api
+        // todo: call api to stop the workflow instance
+        // todo: We might need to set a status discarding in serial command. 
then we can avoid duplicate stop
+        // workflow instance
+        final Integer workflowInstanceId = 
serialCommand.getWorkflowInstanceId();
+        final WorkflowInstance workflowInstance = 
workflowInstanceDao.queryById(workflowInstanceId);
+        if (!workflowInstance.getState().isCanStop()) {
+            return;
+        }
+
+        final String workflowInstanceLaunchedHost = workflowInstance.getHost();
+        final WorkflowInstanceStopRequest stopRequest = new 
WorkflowInstanceStopRequest(workflowInstanceId);
+        if 
(masterConfig.getMasterAddress().equals(workflowInstanceLaunchedHost)) {
+            workflowControlClient.stopWorkflowInstance(stopRequest);
+        } else {
+            Clients.withService(IWorkflowControlClient.class)
+                    .withHost(workflowInstanceLaunchedHost)
+                    .stopWorkflowInstance(stopRequest);
+        }
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_ddl.sql
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/ISerialCommandHandler.java
similarity index 69%
rename from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_ddl.sql
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/ISerialCommandHandler.java
index 89bc78442c..6bac40a7d2 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_ddl.sql
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/ISerialCommandHandler.java
@@ -13,9 +13,12 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-create index idx_t_ds_task_group_queue_task_id on 
t_ds_task_group_queue(task_id);
-create index idx_t_ds_task_group_queue_group_id on 
t_ds_task_group_queue(group_id);
-create index idx_t_ds_task_group_queue_status on t_ds_task_group_queue(status);
-create index idx_t_ds_task_group_queue_workflow_instance_id on 
t_ds_task_group_queue(workflow_instance_id);
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+public interface ISerialCommandHandler {
+
+    void handle(SerialCommandsGroup serialCommandsGroup);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandDiscardHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandDiscardHandler.java
new file mode 100644
index 0000000000..f2ce439a87
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandDiscardHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * This strategy will discard the new workflow instance if there is a running 
workflow instance.
+ */
+@Slf4j
+@Component
+public class SerialCommandDiscardHandler extends AbstractSerialCommandHandler {
+
+    @Override
+    public void handle(SerialCommandsGroup serialCommandsGroup) {
+        // If the first item in the queue is not running, then notify it to 
run.
+        // Discard all other items in the queue.
+        List<SerialCommandDto> serialCommands = 
serialCommandsGroup.getSerialCommands();
+        for (int i = 0; i < serialCommands.size(); i++) {
+            SerialCommandDto serialCommand = serialCommands.get(i);
+            if (i == 0) {
+                if (serialCommand.getState() == 
SerialCommandDto.State.WAITING) {
+                    launchSerialCommand(serialCommand);
+                    log.info("Launched SerialCommand: {}", serialCommand);
+                }
+                continue;
+            }
+            // Discard all other items in the queue.
+            if (serialCommand.getState() != SerialCommandDto.State.WAITING) {
+                throw new IllegalStateException(
+                        "The post SerialCommand except WAITING state but -> " 
+ serialCommand.getState());
+            }
+            discardSerialCommandAndStopWorkflowInstanceInDB(serialCommand);
+            log.info("Discard SerialCommand: {}", serialCommand);
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandPriorityHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandPriorityHandler.java
new file mode 100644
index 0000000000..a6a1745ded
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandPriorityHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * This strategy will stop the previous workflow instance and notify the newly 
workflow instance.
+ */
+@Slf4j
+@Component
+public class SerialCommandPriorityHandler extends AbstractSerialCommandHandler 
{
+
+    @Override
+    public void handle(SerialCommandsGroup serialCommandsGroup) {
+        final List<SerialCommandDto> serialCommands = 
serialCommandsGroup.getSerialCommands();
+        // Stop the 1 ~ n-1 item
+        for (int i = 0; i < serialCommands.size(); i++) {
+            final SerialCommandDto serialCommand = serialCommands.get(i);
+            if (i == serialCommands.size() - 1) {
+                if (serialCommand.getState() == 
SerialCommandDto.State.WAITING) {
+                    launchSerialCommand(serialCommand);
+                    log.info("Launched SerialCommand: {}", serialCommand);
+                }
+                continue;
+            }
+
+            if (serialCommand.getState() == SerialCommandDto.State.WAITING) {
+                discardSerialCommandAndStopWorkflowInstanceInDB(serialCommand);
+                log.info("Discard SerialCommand: {}", serialCommand);
+            } else {
+                stopWorkflowInstanceInMaster(serialCommand);
+                log.info("Stop the pre WorkflowInstance: {} due to the 
workflow using SERIAL_PRIORITY strategy",
+                        serialCommand.getWorkflowInstanceId());
+            }
+        }
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandWaitHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandWaitHandler.java
new file mode 100644
index 0000000000..453b5ac000
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandWaitHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * This strategy will wait the oldest workflow instance.
+ */
+@Slf4j
+@Component
+public class SerialCommandWaitHandler extends AbstractSerialCommandHandler {
+
+    @Override
+    public void handle(SerialCommandsGroup serialCommandsGroup) {
+        // If the first command is not fired, then fire it.
+        final List<SerialCommandDto> serialCommands = 
serialCommandsGroup.getSerialCommands();
+        if (CollectionUtils.isEmpty(serialCommands)) {
+            return;
+        }
+        final SerialCommandDto serialCommand = serialCommands.get(0);
+        if (serialCommand.getState() == SerialCommandDto.State.WAITING) {
+            launchSerialCommand(serialCommand);
+            log.info("Launched SerialCommand: {}", serialCommand);
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandsGroup.java
similarity index 55%
rename from 
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandsGroup.java
index 67ac6192f8..ce324f2ab2 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.3_schema/postgresql/dolphinscheduler_dml.sql
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/SerialCommandsGroup.java
@@ -13,6 +13,29 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-UPDATE t_ds_datasource SET connection_params = REPLACE(connection_params, 
'"publicKey"', '"privateKey"') WHERE type = 17 AND connection_params LIKE 
'%"publicKey"%';
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionTypeEnum;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SerialCommandsGroup {
+
+    private Long workflowDefinitionCode;
+    private Integer workflowDefinitionVersion;
+    private WorkflowExecutionTypeEnum executionType;
+    private List<SerialCommandDto> serialCommands;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java
new file mode 100644
index 0000000000..5bd42d020c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
+import 
org.apache.dolphinscheduler.server.master.engine.IWorkflowSerialCoordinator;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Used to coordinate the serial execution of workflows.
+ * <p> Once a workflow instance is submitted with serial wait strategy, it 
will be added to the serial queue.
+ * <p> A dedicated thread will poll the serial queue and check if the workflow 
instance can be executed.
+ */
+@Slf4j
+@Component
+public class WorkflowSerialCoordinator implements IWorkflowSerialCoordinator {
+
+    @Autowired
+    private SerialCommandDao serialCommandDao;
+
+    @Autowired
+    private WorkflowDefinitionLogDao workflowDefinitionLogDao;
+
+    @Autowired
+    private SerialCommandWaitHandler serialCommandWaitHandler;
+
+    @Autowired
+    private SerialCommandDiscardHandler serialCommandDiscardHandler;
+
+    @Autowired
+    private SerialCommandPriorityHandler serialCommandPriorityHandler;
+
+    private volatile boolean flag = false;
+
+    private Thread internalThread;
+
+    private static final int DEFAULT_FETCH_SIZE = 1000;
+
+    private static final int DEFAULT_FETCH_INTERVAL_SECONDS = 5;
+
+    @Override
+    public synchronized void start() {
+        log.info("WorkflowSerialCoordinator starting...");
+        if (flag) {
+            throw new IllegalStateException("WorkflowSerialCoordinator is 
already started");
+        }
+        if (internalThread != null) {
+            throw new IllegalStateException("InternalThread is already 
started");
+        }
+        flag = true;
+        internalThread = new BaseDaemonThread(this::doStart) {
+        };
+        internalThread.setName("WorkflowSerialCoordinator-Thread");
+        internalThread.start();
+        log.info("WorkflowSerialCoordinator started...");
+    }
+
+    private void doStart() {
+        while (flag) {
+            try {
+                final StopWatch workflowSerialCoordinatorRoundCost = 
StopWatch.createStarted();
+                final List<SerialCommandsGroup> serialCommandsGroups = 
fetchSerialCommands();
+                serialCommandsGroups.forEach(this::handleSerialCommand);
+                log.debug("WorkflowSerialCoordinator handled 
SerialCommandsGroup size: {}, cost: {}/ms ",
+                        serialCommandsGroups.size(),
+                        
workflowSerialCoordinatorRoundCost.getDuration().toMillis());
+            } catch (Throwable e) {
+                log.error("WorkflowSerialCoordinator error", e);
+            } finally {
+                // sleep 5s
+                
ThreadUtils.sleep(TimeUnit.SECONDS.toMillis(DEFAULT_FETCH_INTERVAL_SECONDS));
+            }
+        }
+    }
+
+    private void handleSerialCommand(SerialCommandsGroup serialCommandsGroup) {
+        try {
+            if (serialCommandsGroup.getExecutionType() == null) {
+                log.error("Cannot find the ExecutionType for workflow: {}-{}",
+                        serialCommandsGroup.getWorkflowDefinitionCode(),
+                        serialCommandsGroup.getWorkflowDefinitionVersion());
+                return;
+            }
+            switch (serialCommandsGroup.getExecutionType()) {
+                case PARALLEL:
+                    throw new IllegalStateException(
+                            "SerialCommand with ExecutionType=PARALLEL is not 
supported, this shouldn't happen");
+                case SERIAL_WAIT:
+                    serialCommandWaitHandler.handle(serialCommandsGroup);
+                    break;
+                case SERIAL_DISCARD:
+                    serialCommandDiscardHandler.handle(serialCommandsGroup);
+                    break;
+                case SERIAL_PRIORITY:
+                    serialCommandPriorityHandler.handle(serialCommandsGroup);
+                    break;
+                default:
+            }
+        } catch (Exception ex) {
+            log.error("Handle SerialCommandsGroup: {} error", 
serialCommandsGroup, ex);
+        }
+    }
+
+    private List<SerialCommandsGroup> fetchSerialCommands() {
+        // todo: set a limit here and fetch by incremental id
+        final List<SerialCommandDto> serialCommands = 
serialCommandDao.fetchSerialCommands(DEFAULT_FETCH_SIZE);
+        if (CollectionUtils.isEmpty(serialCommands)) {
+            return Collections.emptyList();
+        }
+
+        // workflowCode-> workflowVersion -> commandGroup
+        // Right now, we think each workflow has its own queue
+        final Map<Long, Map<Integer, SerialCommandsGroup>> 
serialCommandsGroupMap = new HashMap<>();
+        for (SerialCommandDto serialCommand : serialCommands) {
+            
serialCommandsGroupMap.computeIfAbsent(serialCommand.getWorkflowDefinitionCode(),
 k -> new HashMap<>())
+                    
.computeIfAbsent(serialCommand.getWorkflowDefinitionVersion(),
+                            v -> createSerialCommandsGroup(serialCommand))
+                    .getSerialCommands().add(serialCommand);
+        }
+        return serialCommandsGroupMap.values().stream().flatMap(m -> 
m.values().stream()).collect(Collectors.toList());
+    }
+
+    private SerialCommandsGroup createSerialCommandsGroup(SerialCommandDto 
serialCommand) {
+        Long workflowDefinitionCode = 
serialCommand.getWorkflowDefinitionCode();
+        Integer workflowDefinitionVersion = 
serialCommand.getWorkflowDefinitionVersion();
+        final WorkflowDefinitionLog workflowDefinitionLog = 
workflowDefinitionLogDao
+                .queryByDefinitionCodeAndVersion(workflowDefinitionCode, 
workflowDefinitionVersion);
+        return SerialCommandsGroup.builder()
+                .workflowDefinitionCode(workflowDefinitionCode)
+                .workflowDefinitionVersion(workflowDefinitionVersion)
+                .executionType(workflowDefinitionLog.getExecutionType())
+                .serialCommands(new ArrayList<>())
+                .build();
+    }
+
+    @Override
+    public void close() {
+        flag = false;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
index e8596c0dcc..488e3aabe9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
@@ -21,6 +21,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent;
@@ -47,6 +48,7 @@ import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.support.TransactionTemplate;
 
 @Slf4j
 public abstract class AbstractWorkflowStateAction implements 
IWorkflowStateAction {
@@ -66,6 +68,12 @@ public abstract class AbstractWorkflowStateAction implements 
IWorkflowStateActio
     @Autowired
     protected WorkflowAlertManager workflowAlertManager;
 
+    @Autowired
+    protected TransactionTemplate transactionTemplate;
+
+    @Autowired
+    protected SerialCommandDao serialCommandDao;
+
     /**
      * Try to trigger the tasks if the trigger condition is met.
      * <p> If all the given tasks trigger condition is not met then will try 
to emit workflow finish event.
@@ -140,11 +148,22 @@ public abstract class AbstractWorkflowStateAction 
implements IWorkflowStateActio
 
     protected void workflowFinish(final IWorkflowExecutionRunnable 
workflowExecutionRunnable,
                                   final WorkflowExecutionStatus 
workflowExecutionStatus) {
-        final WorkflowInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
-        workflowInstance.setEndTime(new Date());
-        transformWorkflowInstanceState(workflowExecutionRunnable, 
workflowExecutionStatus);
-        workflowExecutionRunnable.getWorkflowEventBus()
-                
.publish(WorkflowFinalizeLifecycleEvent.of(workflowExecutionRunnable));
+        // todo: add transaction configuration in lifecycle event, all sync 
lifecycle should be in transaction
+        transactionTemplate.execute(status -> {
+            final WorkflowInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+            workflowInstance.setEndTime(new Date());
+            transformWorkflowInstanceState(workflowExecutionRunnable, 
workflowExecutionStatus);
+            if 
(workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowDefinition().getExecutionType()
+                    .isSerial()) {
+                if 
(serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId()) > 0) {
+                    log.info("Success clear SerialCommand for 
WorkflowExecuteRunnable: {}",
+                            workflowExecutionRunnable.getName());
+                }
+            }
+            workflowExecutionRunnable.getWorkflowEventBus()
+                    
.publish(WorkflowFinalizeLifecycleEvent.of(workflowExecutionRunnable));
+            return null;
+        });
     }
 
     /**
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
index 866d758419..f65708b8fc 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
@@ -17,10 +17,14 @@
 
 package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
 
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionTypeEnum;
 import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
 import org.apache.dolphinscheduler.dao.repository.CommandDao;
-import org.apache.dolphinscheduler.dao.repository.UserDao;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 
@@ -35,31 +39,47 @@ public abstract class 
AbstractWorkflowInstanceTrigger<TriggerRequest, TriggerRes
             IWorkflowTrigger<TriggerRequest, TriggerResponse> {
 
     @Autowired
-    private WorkflowDefinitionLogDao workflowDefinitionDao;
+    private WorkflowInstanceDao workflowInstanceDao;
 
     @Autowired
-    private WorkflowInstanceDao workflowInstanceDao;
+    private CommandDao commandDao;
 
     @Autowired
-    private UserDao userDao;
+    protected SerialCommandDao serialCommandDao;
 
     @Autowired
-    private CommandDao commandDao;
+    protected WorkflowDefinitionLogDao workflowDefinitionLogDao;
 
     @Override
     @Transactional
     public TriggerResponse triggerWorkflow(final TriggerRequest 
triggerRequest) {
         final WorkflowInstance workflowInstance = 
constructWorkflowInstance(triggerRequest);
-        workflowInstanceDao.updateById(workflowInstance);
-
+        final Long workflowDefinitionCode = 
workflowInstance.getWorkflowDefinitionCode();
+        final int workflowDefinitionVersion = 
workflowInstance.getWorkflowDefinitionVersion();
+        final WorkflowDefinitionLog workflowDefinition = 
workflowDefinitionLogDao.queryByDefinitionCodeAndVersion(
+                workflowDefinitionCode, workflowDefinitionVersion);
+        if (workflowDefinition == null) {
+            throw new IllegalStateException(
+                    "Workflow definition not found: " + workflowDefinitionCode 
+ " version: "
+                            + workflowDefinitionVersion);
+        }
         final Command command = constructTriggerCommand(triggerRequest, 
workflowInstance);
-        commandDao.insert(command);
+        if (workflowDefinition.getExecutionType() == 
WorkflowExecutionTypeEnum.PARALLEL) {
+            workflowInstanceDao.updateById(workflowInstance);
+            commandDao.insert(command);
+        } else {
+            workflowInstance.setState(WorkflowExecutionStatus.SERIAL_WAIT);
+            workflowInstanceDao.updateById(workflowInstance);
+            
serialCommandDao.insert(SerialCommandDto.newSerialCommand(command).toEntity());
+        }
 
         return onTriggerSuccess(workflowInstance);
     }
 
+    // todo: 使用WorkflowInstanceConstructor封装
     protected abstract WorkflowInstance constructWorkflowInstance(final 
TriggerRequest triggerRequest);
 
+    // todo: 使用CommandConstructor封装
     protected abstract Command constructTriggerCommand(final TriggerRequest 
triggerRequest,
                                                        final WorkflowInstance 
workflowInstance);
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowTrigger.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowTrigger.java
index 99c2bec0f2..8ff8ceb141 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowTrigger.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowTrigger.java
@@ -17,16 +17,22 @@
 
 package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
 
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionTypeEnum;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
 import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
 import org.apache.dolphinscheduler.dao.repository.UserDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -49,20 +55,33 @@ public abstract class 
AbstractWorkflowTrigger<TriggerRequest, TriggerResponse>
     @Autowired
     private CommandDao commandDao;
 
+    @Autowired
+    private SerialCommandDao serialCommandDao;
+
     @Override
     @Transactional
     public TriggerResponse triggerWorkflow(final TriggerRequest 
triggerRequest) {
-        final WorkflowInstance workflowInstance = 
constructWorkflowInstance(triggerRequest);
-        workflowInstanceDao.insert(workflowInstance);
-
-        final Command command = constructTriggerCommand(triggerRequest, 
workflowInstance);
-        commandDao.insert(command);
+        final ImmutablePair<WorkflowDefinition, WorkflowInstance> pair = 
constructWorkflowInstance(triggerRequest);
+        final WorkflowDefinition workflowDefinition = pair.getLeft();
+        final WorkflowInstance workflowInstance = pair.getRight();
+        if (workflowDefinition.getExecutionType() == 
WorkflowExecutionTypeEnum.PARALLEL) {
+            workflowInstanceDao.insert(workflowInstance);
+            final Command command = constructTriggerCommand(triggerRequest, 
workflowInstance);
+            commandDao.insert(command);
+        } else {
+            
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, "Waiting 
for serial execution");
+            workflowInstanceDao.insert(workflowInstance);
+            final Command command = constructTriggerCommand(triggerRequest, 
workflowInstance);
+            
serialCommandDao.insert(SerialCommandDto.newSerialCommand(command).toEntity());
+        }
 
         return onTriggerSuccess(workflowInstance);
     }
 
-    protected abstract WorkflowInstance constructWorkflowInstance(final 
TriggerRequest triggerRequest);
+    // todo: 使用WorkflowInstanceConstructor封装
+    protected abstract ImmutablePair<WorkflowDefinition, WorkflowInstance> 
constructWorkflowInstance(final TriggerRequest triggerRequest);
 
+    // todo: 使用CommandConstructor封装
     protected abstract Command constructTriggerCommand(final TriggerRequest 
triggerRequest,
                                                        final WorkflowInstance 
workflowInstance);
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java
index 9714b2f254..aac061bfa3 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowB
 import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse;
 
 import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import java.util.Date;
 import java.util.List;
@@ -48,7 +49,7 @@ public class WorkflowBackfillTrigger
             AbstractWorkflowTrigger<WorkflowBackfillTriggerRequest, 
WorkflowBackfillTriggerResponse> {
 
     @Override
-    protected WorkflowInstance 
constructWorkflowInstance(WorkflowBackfillTriggerRequest 
backfillTriggerRequest) {
+    protected ImmutablePair<WorkflowDefinition, WorkflowInstance> 
constructWorkflowInstance(WorkflowBackfillTriggerRequest 
backfillTriggerRequest) {
         final CommandType commandType = CommandType.COMPLEMENT_DATA;
         final Long workflowCode = backfillTriggerRequest.getWorkflowCode();
         final Integer workflowVersion = 
backfillTriggerRequest.getWorkflowVersion();
@@ -84,7 +85,7 @@ public class WorkflowBackfillTrigger
                 
EnvironmentUtils.getEnvironmentCodeOrDefault(backfillTriggerRequest.getEnvironmentCode()));
         workflowInstance.setTimeout(workflowDefinition.getTimeout());
         
workflowInstance.setDryRun(backfillTriggerRequest.getDryRun().getCode());
-        return workflowInstance;
+        return ImmutablePair.of(workflowDefinition, workflowInstance);
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java
index 45f5ccc0a2..5d04ecf0ed 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowM
 import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerResponse;
 
 import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import java.util.Date;
 
@@ -47,7 +48,7 @@ public class WorkflowManualTrigger
             AbstractWorkflowTrigger<WorkflowManualTriggerRequest, 
WorkflowManualTriggerResponse> {
 
     @Override
-    protected WorkflowInstance constructWorkflowInstance(final 
WorkflowManualTriggerRequest workflowManualTriggerRequest) {
+    protected ImmutablePair<WorkflowDefinition, WorkflowInstance> 
constructWorkflowInstance(final WorkflowManualTriggerRequest 
workflowManualTriggerRequest) {
         final CommandType commandType = CommandType.START_PROCESS;
         final Long workflowCode = 
workflowManualTriggerRequest.getWorkflowDefinitionCode();
         final Integer workflowVersion = 
workflowManualTriggerRequest.getWorkflowDefinitionVersion();
@@ -81,7 +82,7 @@ public class WorkflowManualTrigger
                 
EnvironmentUtils.getEnvironmentCodeOrDefault(workflowManualTriggerRequest.getEnvironmentCode()));
         workflowInstance.setTimeout(workflowDefinition.getTimeout());
         
workflowInstance.setDryRun(workflowManualTriggerRequest.getDryRun().getCode());
-        return workflowInstance;
+        return ImmutablePair.of(workflowDefinition, workflowInstance);
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowScheduleTrigger.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowScheduleTrigger.java
index d0a7a282d3..6fedd1cc2f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowScheduleTrigger.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowScheduleTrigger.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowS
 import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerResponse;
 
 import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import java.util.Date;
 
@@ -44,7 +45,7 @@ public class WorkflowScheduleTrigger
             AbstractWorkflowTrigger<WorkflowScheduleTriggerRequest, 
WorkflowScheduleTriggerResponse> {
 
     @Override
-    protected WorkflowInstance 
constructWorkflowInstance(WorkflowScheduleTriggerRequest 
scheduleTriggerRequest) {
+    protected ImmutablePair<WorkflowDefinition, WorkflowInstance> 
constructWorkflowInstance(WorkflowScheduleTriggerRequest 
scheduleTriggerRequest) {
         final CommandType commandType = CommandType.SCHEDULER;
         final Long workflowCode = scheduleTriggerRequest.getWorkflowCode();
         final Integer workflowVersion = 
scheduleTriggerRequest.getWorkflowVersion();
@@ -79,7 +80,7 @@ public class WorkflowScheduleTrigger
                 
EnvironmentUtils.getEnvironmentCodeOrDefault(scheduleTriggerRequest.getEnvironmentCode()));
         workflowInstance.setTimeout(workflowDefinition.getTimeout());
         
workflowInstance.setDryRun(scheduleTriggerRequest.getDryRun().getCode());
-        return workflowInstance;
+        return ImmutablePair.of(workflowDefinition, workflowInstance);
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index b61355195e..1ae906aa3f 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -124,6 +124,108 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
         masterContainer.assertAllResourceReleased();
     }
 
+    @Test
+    @DisplayName("Test start a workflow with one fake task(A) using serial 
wait strategy")
+    public void testStartWorkflow_with_serialWaitStrategy() {
+        final String yaml = 
"/it/start/workflow_with_serial_wait_strategy.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        final Integer workflowInstanceId1 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        final Integer workflowInstanceId2 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        final Integer workflowInstanceId3 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId1).getState())
+                            
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId2).getState())
+                            .isEqualTo(WorkflowExecutionStatus.SERIAL_WAIT);
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId3).getState())
+                            .isEqualTo(WorkflowExecutionStatus.SERIAL_WAIT);
+                });
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    final WorkflowInstance workflowInstance1 = 
repository.queryWorkflowInstance(workflowInstanceId1);
+                    final WorkflowInstance workflowInstance2 = 
repository.queryWorkflowInstance(workflowInstanceId2);
+                    final WorkflowInstance workflowInstance3 = 
repository.queryWorkflowInstance(workflowInstanceId3);
+                    
assertThat(workflowInstance1.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                    
assertThat(workflowInstance2.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                    assertThat(workflowInstance2.getEndTime())
+                            
.isAtLeast(DateUtils.addSeconds(workflowInstance1.getEndTime(), 5));
+                    
assertThat(workflowInstance3.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                    assertThat(workflowInstance3.getEndTime())
+                            
.isAtLeast(DateUtils.addSeconds(workflowInstance2.getEndTime(), 5));
+                });
+
+        masterContainer.assertAllResourceReleased();
+    }
+
+    @Test
+    @DisplayName("Test start a workflow with one fake task(A) using serial 
discard strategy")
+    public void testStartWorkflow_with_serialDiscardStrategy() {
+        final String yaml = 
"/it/start/workflow_with_serial_discard_strategy.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        final Integer workflowInstanceId1 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        final Integer workflowInstanceId2 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        final Integer workflowInstanceId3 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId1).getState())
+                            .isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId2).getState())
+                            .isEqualTo(WorkflowExecutionStatus.STOP);
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId3).getState())
+                            .isEqualTo(WorkflowExecutionStatus.STOP);
+                });
+
+        masterContainer.assertAllResourceReleased();
+    }
+
+    @Test
+    @DisplayName("Test start a workflow with one fake task(A) using serial 
priority strategy")
+    public void testStartWorkflow_with_serialPriorityStrategy() {
+        final String yaml = 
"/it/start/workflow_with_serial_priority_strategy.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        final Integer workflowInstanceId1 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        final Integer workflowInstanceId2 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+        final Integer workflowInstanceId3 = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId1).getState())
+                            .isEqualTo(WorkflowExecutionStatus.STOP);
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId2).getState())
+                            .isEqualTo(WorkflowExecutionStatus.STOP);
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId3).getState())
+                            .isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                });
+
+        masterContainer.assertAllResourceReleased();
+    }
+
     @Test
     @DisplayName("Test start a workflow with two fake task(A) using task 
group")
     public void testStartWorkflow_with_successTaskUsingTaskGroup() {
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_discard_strategy.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_discard_strategy.yaml
new file mode 100644
index 0000000000..3bc580ec03
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_discard_strategy.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_serial_discard_strategy
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: SERIAL_DISCARD
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_priority_strategy.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_priority_strategy.yaml
new file mode 100644
index 0000000000..e21fa5798a
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_priority_strategy.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_serial_priority_strategy
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: SERIAL_PRIORITY
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 10"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_wait_strategy.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_wait_strategy.yaml
new file mode 100644
index 0000000000..0cd531d37d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_serial_wait_strategy.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_serial_wait_strategy
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: SERIAL_WAIT
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
index 1d4c7b038b..7118a099d5 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
@@ -135,7 +135,7 @@ public class PhysicalTaskExecutor extends 
AbstractTaskExecutor {
                 storageOperator,
                 taskExecutionContext);
         taskExecutionContext.setResourceContext(resourceContext);
-        log.info("Download resources successfully: \n{}", 
taskExecutionContext.getResourceContext());
+        log.info("Download resources successfully: {}", 
taskExecutionContext.getResourceContext());
 
         log.info(TaskLogMarkers.excludeInTaskLog(), "Initialized Task 
Context{}",
                 JSONUtils.toPrettyJsonString(taskExecutionContext));

Reply via email to