Copilot commented on code in PR #17531:
URL: 
https://github.com/apache/dolphinscheduler/pull/17531#discussion_r2560333766


##########
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql:
##########
@@ -350,6 +350,27 @@ CREATE TABLE `t_ds_command` (
 -- Records of t_ds_command
 -- ----------------------------
 
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS `t_ds_serial_command`;
+CREATE TABLE `t_ds_serial_command` (
+  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+  `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition 
code',
+  `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition 
code',

Review Comment:
   Incorrect comment: the COMMENT for `workflow_definition_version` incorrectly 
says 'workflow definition code' but should say 'workflow definition version'.
   ```suggestion
     `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow 
definition version',
   ```



##########
docs/docs/en/guide/upgrade/incompatible.md:
##########
@@ -2,29 +2,23 @@
 
 This document records the incompatible updates between each version. You need 
to check this document before you upgrade to related version.
 
-## dev
-
-* Upgrade mysql driver version from 8.0.16 to 8.0.33 
([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
-* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to 
`DATAX_LAUNCHER` 
([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
-* Change regex matching sql params in SQL task plugin 
([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
-* Remove the spark version of spark task 
([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
-* Change the default unix shell executor from sh to bash 
([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
-* Remove `deleteSource` in `download()` of `StorageOperate` 
([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
-* Remove default key for attribute `data-quality.jar.name` in 
`common.properties` 
([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
-* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in 
`common.properties` and represent for directory 
([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
-
-## 3.2.0
-
-* Remove parameter `description` from public interfaces of new resource center 
 ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
-
 ## 3.0.0
 
 * Copy and import workflow without 'copy' suffix 
[#10607](https://github.com/apache/dolphinscheduler/pull/10607)
 * Use semicolon as default sql segment separator 
[#10869](https://github.com/apache/dolphinscheduler/pull/10869)
 
 ## 3.2.0
 
+* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in 
`common.properties` and represent for directory 
([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
+* Remove default key for attribute `data-quality.jar.name` in 
`common.properties` 
([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
+* Remove `deleteSource` in `download()` of `StorageOperate` 
([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
+* Change the default unix shell executor from sh to bash 
([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
+* Remove the spark version of spark task 
([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
+* Change regex matching sql params in SQL task plugin 
([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
+* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to 
`DATAX_LAUNCHER` 
([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
+* Upgrade mysql driver version from 8.0.16 to 8.0.33 
([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
 * Add required field `database` in /datasources/tables && 
/datasources/tableColumns Api 
[#14406](https://github.com/apache/dolphinscheduler/pull/14406)
+* * Remove parameter `description` from public interfaces of new resource 
center  ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))

Review Comment:
   Double asterisk error in Markdown formatting. Line 21 has two asterisks at 
the beginning (`* *`) which will render incorrectly. Remove one asterisk to fix 
the formatting.
   ```suggestion
   * Remove parameter `description` from public interfaces of new resource 
center  ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
   ```



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowSerialQueue.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("t_ds_workflow_task_lineage")

Review Comment:
   The `WorkflowSerialQueue` entity has an incorrect `@TableName` annotation. 
It should be `"t_ds_serial_command"` based on the table structure created in 
the SQL files, but it's currently set to `"t_ds_workflow_task_lineage"` which 
is unrelated to this entity. This will cause runtime errors when trying to 
persist or query this entity.
   ```suggestion
   @TableName("t_ds_serial_command")
   ```



##########
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql:
##########
@@ -350,6 +350,27 @@ CREATE TABLE `t_ds_command` (
 -- Records of t_ds_command
 -- ----------------------------
 
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS `t_ds_serial_command`;
+CREATE TABLE `t_ds_serial_command` (
+  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+  `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition 
code',
+  `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition 
code',
+  `workflow_instance_id` bigint(20) NOT NULL COMMENT 'workflow instance id',
+  `state` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'state of the serial queue: 0 
waiting, 1 fired',
+  `command` text COMMENT 'command json',
+  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create 
time',
+  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'update time',
+  PRIMARY KEY (`id`),
+  KEY `idx_workflow_instance_id` (`workflow_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------

Review Comment:
   Duplicate comment block. The lines 370-372 repeat the same comment "Table 
structure for t_ds_serial_command" that was already opened at line 353. This 
should be removed to avoid confusion.
   ```suggestion
   
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
+import 
org.apache.dolphinscheduler.server.master.engine.IWorkflowSerialCoordinator;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Used to coordinate the serial execution of workflows.
+ * <p> Once a workflow instance is submitted with serial wait strategy, it 
will be added to the serial queue.
+ * <p> A dedicated thread will poll the serial queue and check if the workflow 
instance can be executed.
+ */
+@Slf4j
+@Component
+public class WorkflowSerialCoordinator implements IWorkflowSerialCoordinator {
+
+    @Autowired
+    private SerialCommandDao serialCommandDao;
+
+    @Autowired
+    private WorkflowDefinitionLogDao workflowDefinitionLogDao;
+
+    @Autowired
+    private SerialCommandWaitHandler serialCommandWaitHandler;
+
+    @Autowired
+    private SerialCommandDiscardHandler serialCommandDiscardHandler;
+
+    @Autowired
+    private SerialCommandPriorityHandler serialCommandPriorityHandler;
+
+    private boolean flag = false;
+
+    private Thread internalThread;
+
+    private static final int DEFAULT_FETCH_SIZE = 1000;
+
+    private static final int DEFAULT_FETCH_INTERVAL_SECONDS = 30;
+
+    @Override
+    public synchronized void start() {
+        log.info("WorkflowSerialCoordinator starting...");
+        if (flag) {
+            throw new IllegalStateException("WorkflowSerialCoordinator is 
already started");
+        }
+        if (internalThread != null) {
+            throw new IllegalStateException("InternalThread is already 
started");
+        }
+        flag = true;
+        internalThread = new BaseDaemonThread(this::doStart) {
+        };
+        internalThread.setName("WorkflowSerialCoordinator-Thread");
+        internalThread.start();
+        log.info("WorkflowSerialCoordinator started...");
+    }
+
+    private void doStart() {
+        while (flag) {
+            try {
+                final StopWatch workflowSerialCoordinatorRoundCost = 
StopWatch.createStarted();
+                final List<SerialCommandsGroup> serialCommandsGroups = 
fetchSerialCommands();
+                serialCommandsGroups.forEach(this::handleSerialCommand);
+                log.debug("WorkflowSerialCoordinator handled 
SerialCommandsGroup size: {}, cost: {}/ms ",
+                        serialCommandsGroups.size(),
+                        
workflowSerialCoordinatorRoundCost.getDuration().toMillis());
+            } catch (Throwable e) {
+                log.error("WorkflowSerialCoordinator error", e);
+            } finally {
+                // sleep 5s
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 
DEFAULT_FETCH_INTERVAL_SECONDS);
+            }
+        }
+    }
+
+    private void handleSerialCommand(SerialCommandsGroup serialCommandsGroup) {
+        try {
+            if (serialCommandsGroup.getExecutionType() == null) {
+                log.error("Cannot find the ExecutionType for workflow: {}-{}",
+                        serialCommandsGroup.getWorkflowDefinitionCode(),
+                        serialCommandsGroup.getWorkflowDefinitionVersion());
+                return;
+            }
+            switch (serialCommandsGroup.getExecutionType()) {
+                case PARALLEL:
+                    throw new IllegalStateException(
+                            "SerialCommand with ExecutionType=PARALLEL is not 
supported, this shouldn't happen");
+                case SERIAL_WAIT:
+                    serialCommandWaitHandler.handle(serialCommandsGroup);
+                    break;
+                case SERIAL_DISCARD:
+                    serialCommandDiscardHandler.handle(serialCommandsGroup);
+                    break;
+                case SERIAL_PRIORITY:
+                    serialCommandPriorityHandler.handle(serialCommandsGroup);
+                    break;
+                default:
+            }
+        } catch (Exception ex) {
+            log.error("Handle SerialCommandsGroup: {} error", 
serialCommandsGroup, ex);
+        }
+    }
+
+    private List<SerialCommandsGroup> fetchSerialCommands() {
+        // todo: set a limit here and fetch by incremental id
+        final List<SerialCommandDto> serialCommands = 
serialCommandDao.fetchSerialCommands(DEFAULT_FETCH_SIZE);
+        if (CollectionUtils.isEmpty(serialCommands)) {
+            return Collections.emptyList();
+        }
+
+        // workflowCode-> workflowVersion -> commandGroup
+        // Right now, we think each workflow has its own queue
+        final Map<Long, Map<Integer, SerialCommandsGroup>> 
serialCommandsGroupMap = new HashMap<>();
+        for (SerialCommandDto serialCommand : serialCommands) {
+            
serialCommandsGroupMap.computeIfAbsent(serialCommand.getWorkflowDefinitionCode(),
 k -> new HashMap<>())
+                    
.computeIfAbsent(serialCommand.getWorkflowDefinitionVersion(),
+                            v -> createSerialCommandsGroup(serialCommand))
+                    .getSerialCommands().add(serialCommand);
+        }
+        return serialCommandsGroupMap.values().stream().flatMap(m -> 
m.values().stream()).collect(Collectors.toList());
+    }
+
+    private SerialCommandsGroup createSerialCommandsGroup(SerialCommandDto 
serialCommand) {
+        Long workflowDefinitionCode = 
serialCommand.getWorkflowDefinitionCode();
+        Integer workflowDefinitionVersion = 
serialCommand.getWorkflowDefinitionVersion();
+        final WorkflowDefinitionLog workflowDefinitionLog = 
workflowDefinitionLogDao
+                .queryByDefinitionCodeAndVersion(workflowDefinitionCode, 
workflowDefinitionVersion);
+        return SerialCommandsGroup.builder()
+                .workflowDefinitionCode(workflowDefinitionCode)
+                .workflowDefinitionVersion(workflowDefinitionVersion)
+                .executionType(workflowDefinitionLog.getExecutionType())

Review Comment:
   Potential null pointer exception. If `workflowDefinitionLog` is null, 
calling `.getExecutionType()` on line 166 will throw a NullPointerException. 
Consider adding a null check and appropriate error handling or default value.



##########
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql:
##########
@@ -274,6 +274,34 @@ CREATE TABLE t_ds_command (
 
 create index priority_id_index on t_ds_command (workflow_instance_priority,id);
 
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS t_ds_serial_command;
+CREATE TABLE t_ds_serial_command (
+     id SERIAL PRIMARY KEY,
+     workflow_definition_code BIGINT NOT NULL,
+     workflow_definition_version INT NOT NULL,
+     workflow_instance_id BIGINT NOT NULL,
+     state SMALLINT NOT NULL DEFAULT 0,
+     command TEXT,
+     create_time TIMESTAMP NOT NULL DEFAULT now(),
+     update_time TIMESTAMP NOT NULL DEFAULT now()
+);
+CREATE INDEX idx_workflow_instance_id ON t_ds_serial_command 
(workflow_instance_id);
+COMMENT ON TABLE t_ds_serial_command IS 'serial command queue table';
+COMMENT ON COLUMN t_ds_serial_command.id IS 'primary key';
+COMMENT ON COLUMN t_ds_serial_command.workflow_definition_code IS 'workflow 
definition code';
+COMMENT ON COLUMN t_ds_serial_command.workflow_definition_version IS 'workflow 
definition version';
+COMMENT ON COLUMN t_ds_serial_command.workflow_instance_id IS 'workflow 
instance id';
+COMMENT ON COLUMN t_ds_serial_command.state IS 'state of the serial queue: 0 
waiting, 1 fired';
+COMMENT ON COLUMN t_ds_serial_command.command IS 'command json';
+COMMENT ON COLUMN t_ds_serial_command.create_time IS 'create time';
+COMMENT ON COLUMN t_ds_serial_command.update_time IS 'update time';
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------

Review Comment:
   Duplicate comment block. The lines 301-303 repeat the same comment "Table 
structure for t_ds_serial_command" that was already opened at line 277. This 
should be removed to avoid confusion.
   ```suggestion
   
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java:
##########
@@ -35,25 +39,36 @@ public abstract class 
AbstractWorkflowInstanceTrigger<TriggerRequest, TriggerRes
             IWorkflowTrigger<TriggerRequest, TriggerResponse> {
 
     @Autowired
-    private WorkflowDefinitionLogDao workflowDefinitionDao;
+    private WorkflowInstanceDao workflowInstanceDao;
 
     @Autowired
-    private WorkflowInstanceDao workflowInstanceDao;
+    private CommandDao commandDao;
 
     @Autowired
-    private UserDao userDao;
+    protected SerialCommandDao serialCommandDao;
 
     @Autowired
-    private CommandDao commandDao;
+    protected WorkflowDefinitionLogDao workflowDefinitionLogDao;
 
     @Override
     @Transactional
     public TriggerResponse triggerWorkflow(final TriggerRequest 
triggerRequest) {
         final WorkflowInstance workflowInstance = 
constructWorkflowInstance(triggerRequest);
-        workflowInstanceDao.updateById(workflowInstance);
-
+        final WorkflowDefinitionLog workflowDefinition = 
workflowDefinitionLogDao.queryByDefinitionCodeAndVersion(
+                workflowInstance.getWorkflowDefinitionCode(), 
workflowInstance.getWorkflowDefinitionVersion());
+        if (workflowDefinition == null) {
+            throw new IllegalStateException(
+                    "Workflow definition not found: " + 
workflowInstance.getWorkflowDefinitionCode() + " version: "
+                            + workflowInstance.getWorkflowDefinitionVersion());
+        }
         final Command command = constructTriggerCommand(triggerRequest, 
workflowInstance);
-        commandDao.insert(command);
+        if (workflowDefinition.getExecutionType() == 
WorkflowExecutionTypeEnum.PARALLEL) {
+            workflowInstanceDao.updateById(workflowInstance);
+            commandDao.insert(command);
+        } else {
+            workflowInstance.setState(WorkflowExecutionStatus.SERIAL_WAIT);
+            
serialCommandDao.insert(SerialCommandDto.newSerialCommand(command).toEntity());
+        }

Review Comment:
   Missing workflow instance update. When execution type is not PARALLEL (line 
68-71), the workflow instance state is set to SERIAL_WAIT but 
`workflowInstanceDao.updateById(workflowInstance)` is never called, unlike the 
PARALLEL case on line 66. This means the state change won't be persisted to the 
database.



##########
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql:
##########
@@ -342,6 +342,27 @@ CREATE TABLE t_ds_command
 -- Records of t_ds_command
 -- ----------------------------
 
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS `t_ds_serial_command`;
+CREATE TABLE `t_ds_serial_command` (
+   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+   `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition 
code',
+   `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition 
code',

Review Comment:
   Incorrect comment: the COMMENT for `workflow_definition_version` incorrectly 
says 'workflow definition code' but should say 'workflow definition version'.
   ```suggestion
      `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow 
definition version',
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
+import 
org.apache.dolphinscheduler.server.master.engine.IWorkflowSerialCoordinator;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Used to coordinate the serial execution of workflows.
+ * <p> Once a workflow instance is submitted with serial wait strategy, it 
will be added to the serial queue.
+ * <p> A dedicated thread will poll the serial queue and check if the workflow 
instance can be executed.
+ */
+@Slf4j
+@Component
+public class WorkflowSerialCoordinator implements IWorkflowSerialCoordinator {
+
+    @Autowired
+    private SerialCommandDao serialCommandDao;
+
+    @Autowired
+    private WorkflowDefinitionLogDao workflowDefinitionLogDao;
+
+    @Autowired
+    private SerialCommandWaitHandler serialCommandWaitHandler;
+
+    @Autowired
+    private SerialCommandDiscardHandler serialCommandDiscardHandler;
+
+    @Autowired
+    private SerialCommandPriorityHandler serialCommandPriorityHandler;
+
+    private boolean flag = false;
+
+    private Thread internalThread;
+
+    private static final int DEFAULT_FETCH_SIZE = 1000;
+
+    private static final int DEFAULT_FETCH_INTERVAL_SECONDS = 30;
+
+    @Override
+    public synchronized void start() {
+        log.info("WorkflowSerialCoordinator starting...");
+        if (flag) {
+            throw new IllegalStateException("WorkflowSerialCoordinator is 
already started");
+        }
+        if (internalThread != null) {
+            throw new IllegalStateException("InternalThread is already 
started");
+        }
+        flag = true;
+        internalThread = new BaseDaemonThread(this::doStart) {
+        };
+        internalThread.setName("WorkflowSerialCoordinator-Thread");
+        internalThread.start();
+        log.info("WorkflowSerialCoordinator started...");
+    }

Review Comment:
   Missing lifecycle management method. The `WorkflowSerialCoordinator` only 
has a `start()` method but no `close()` or `stop()` method to properly shut 
down the background thread. This can lead to resource leaks or orphaned threads 
when the application shuts down. Consider adding a `close()` method similar to 
`TaskGroupCoordinator` that sets `flag = false` and interrupts the 
`internalThread`.



##########
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.0_schema/mysql/dolphinscheduler_ddl.sql:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_task_id` (`task_id`);
+ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_group_id` (`group_id`);
+ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_status` (`status`);
+ALTER TABLE `t_ds_task_group_queue` ADD KEY `idx_workflow_instance_id` 
(`workflow_instance_id`);
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `t_ds_serial_command` (
+   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+   `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition 
code',
+   `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition 
code',

Review Comment:
   Incorrect comment: the COMMENT for `workflow_definition_version` incorrectly 
says 'workflow definition code' but should say 'workflow definition version'.
   ```suggestion
      `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow 
definition version',
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceConstructor.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
+
+public interface WorkflowInstanceConstructor {
+
+}

Review Comment:
   Empty interface with no methods or documentation. This interface appears to 
be unused and serves no purpose. Consider either removing it or adding the 
intended methods and documentation if it's meant for future implementation.



##########
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql:
##########
@@ -342,6 +342,27 @@ CREATE TABLE t_ds_command
 -- Records of t_ds_command
 -- ----------------------------
 
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------
+DROP TABLE IF EXISTS `t_ds_serial_command`;
+CREATE TABLE `t_ds_serial_command` (
+   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+   `workflow_definition_code` int(11) NOT NULL COMMENT 'workflow definition 
code',
+   `workflow_definition_version` int(11) NOT NULL COMMENT 'workflow definition 
code',
+   `workflow_instance_id` bigint(20) NOT NULL COMMENT 'workflow instance id',
+   `state` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'state of the serial queue: 0 
waiting, 1 fired',
+   `command` text COMMENT 'command json',
+   `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create 
time',
+   `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'update time',
+   PRIMARY KEY (`id`),
+   KEY `idx_workflow_instance_id` (`workflow_instance_id`)
+);
+
+-- ----------------------------
+-- Table structure for t_ds_serial_command
+-- ----------------------------

Review Comment:
   Duplicate comment block. The lines 362-364 repeat the same comment "Table 
structure for t_ds_serial_command" that was already opened at line 345. This 
should be removed to avoid confusion.
   ```suggestion
   
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/WorkflowSerialCoordinator.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.serial;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
+import org.apache.dolphinscheduler.dao.model.SerialCommandDto;
+import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
+import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
+import 
org.apache.dolphinscheduler.server.master.engine.IWorkflowSerialCoordinator;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Used to coordinate the serial execution of workflows.
+ * <p> Once a workflow instance is submitted with serial wait strategy, it 
will be added to the serial queue.
+ * <p> A dedicated thread will poll the serial queue and check if the workflow 
instance can be executed.
+ */
+@Slf4j
+@Component
+public class WorkflowSerialCoordinator implements IWorkflowSerialCoordinator {
+
+    @Autowired
+    private SerialCommandDao serialCommandDao;
+
+    @Autowired
+    private WorkflowDefinitionLogDao workflowDefinitionLogDao;
+
+    @Autowired
+    private SerialCommandWaitHandler serialCommandWaitHandler;
+
+    @Autowired
+    private SerialCommandDiscardHandler serialCommandDiscardHandler;
+
+    @Autowired
+    private SerialCommandPriorityHandler serialCommandPriorityHandler;
+
+    private boolean flag = false;
+
+    private Thread internalThread;
+
+    private static final int DEFAULT_FETCH_SIZE = 1000;
+
+    private static final int DEFAULT_FETCH_INTERVAL_SECONDS = 30;
+
+    @Override
+    public synchronized void start() {
+        log.info("WorkflowSerialCoordinator starting...");
+        if (flag) {
+            throw new IllegalStateException("WorkflowSerialCoordinator is 
already started");
+        }
+        if (internalThread != null) {
+            throw new IllegalStateException("InternalThread is already 
started");
+        }
+        flag = true;
+        internalThread = new BaseDaemonThread(this::doStart) {
+        };
+        internalThread.setName("WorkflowSerialCoordinator-Thread");
+        internalThread.start();
+        log.info("WorkflowSerialCoordinator started...");
+    }
+
+    private void doStart() {
+        while (flag) {
+            try {
+                final StopWatch workflowSerialCoordinatorRoundCost = 
StopWatch.createStarted();
+                final List<SerialCommandsGroup> serialCommandsGroups = 
fetchSerialCommands();
+                serialCommandsGroups.forEach(this::handleSerialCommand);
+                log.debug("WorkflowSerialCoordinator handled 
SerialCommandsGroup size: {}, cost: {}/ms ",
+                        serialCommandsGroups.size(),
+                        
workflowSerialCoordinatorRoundCost.getDuration().toMillis());
+            } catch (Throwable e) {
+                log.error("WorkflowSerialCoordinator error", e);
+            } finally {
+                // sleep 5s

Review Comment:
   Incorrect comment: The comment says "sleep 5s" but the actual sleep time is 
`Constants.SLEEP_TIME_MILLIS * DEFAULT_FETCH_INTERVAL_SECONDS` which equals 
`1000 * 30 = 30000 milliseconds = 30 seconds`, not 5 seconds. Update the 
comment to accurately reflect the sleep duration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to