This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5f64badb45 [Fix-17239][Dependent] Dependent check get wrong result in 
manual running execution type (#17240)
5f64badb45 is described below

commit 5f64badb45770eb4895e64b88bc6b16c1f2d804f
Author: xiangzihao <[email protected]>
AuthorDate: Thu Jun 5 09:53:58 2025 +0800

    [Fix-17239][Dependent] Dependent check get wrong result in manual running 
execution type (#17240)
---
 .../dao/mapper/WorkflowInstanceMapper.java         | 52 ++++++++++++----------
 .../dao/repository/WorkflowInstanceDao.java        |  2 +
 .../repository/impl/WorkflowInstanceDaoImpl.java   | 10 +++++
 .../dao/mapper/WorkflowInstanceMapper.xml          | 18 ++++++++
 .../server/master/utils/DependentExecute.java      | 19 +++++---
 5 files changed, 72 insertions(+), 29 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
index a9f743f49a..30484635d2 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
@@ -65,6 +65,7 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
      */
     List<WorkflowInstance> queryMainWorkflowByHostAndStatus(@Param("host") 
String host,
                                                             @Param("states") 
int[] stateArray);
+
     /**
      * query workflow instance host by stateArray
      *
@@ -108,15 +109,15 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
     /**
      * workflow instance page
      *
-     * @param page                  page
-     * @param projectCode           projectCode
+     * @param page                   page
+     * @param projectCode            projectCode
      * @param workflowDefinitionCode workflowDefinitionCode
-     * @param searchVal             searchVal
-     * @param executorName          executorName
-     * @param statusArray           statusArray
-     * @param host                  host
-     * @param startTime             startTime
-     * @param endTime               endTime
+     * @param searchVal              searchVal
+     * @param executorName           executorName
+     * @param statusArray            statusArray
+     * @param host                   host
+     * @param startTime              startTime
+     * @param endTime                endTime
      * @return workflow instance page
      */
     IPage<WorkflowInstance> 
queryWorkflowInstanceListPaging(Page<WorkflowInstance> page,
@@ -186,7 +187,7 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
      * query workflow instance by workflowDefinitionCode
      *
      * @param workflowDefinitionCode workflowDefinitionCode
-     * @param size                  size
+     * @param size                   size
      * @return workflow instance list
      */
     List<WorkflowInstance> 
queryByWorkflowDefinitionCode(@Param("workflowDefinitionCode") Long 
workflowDefinitionCode,
@@ -196,10 +197,10 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
      * query last scheduler workflow instance
      *
      * @param workflowDefinitionCode definitionCode
-     * @param taskDefinitionCode    definitionCode
-     * @param startTime             startTime
-     * @param endTime               endTime
-     * @param testFlag              testFlag
+     * @param taskDefinitionCode     definitionCode
+     * @param startTime              startTime
+     * @param endTime                endTime
+     * @param testFlag               testFlag
      * @return workflow instance
      */
     WorkflowInstance 
queryLastSchedulerWorkflow(@Param("workflowDefinitionCode") Long 
workflowDefinitionCode,
@@ -212,10 +213,10 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
      * query last manual workflow instance
      *
      * @param workflowDefinitionCode workflowDefinitionCode
-     * @param taskCode       taskCode
-     * @param startTime      startTime
-     * @param endTime        endTime
-     * @param testFlag       testFlag
+     * @param taskCode               taskCode
+     * @param startTime              startTime
+     * @param endTime                endTime
+     * @param testFlag               testFlag
      * @return workflow instance
      */
     WorkflowInstance queryLastManualWorkflow(@Param("workflowDefinitionCode") 
Long workflowDefinitionCode,
@@ -224,6 +225,11 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
                                              @Param("endTime") Date endTime,
                                              @Param("testFlag") int testFlag);
 
+    WorkflowInstance queryLastRunningWorkflow(@Param("workflowDefinitionCode") 
Long workflowDefinitionCode,
+                                              @Param("startTime") Date 
startTime,
+                                              @Param("endTime") Date endTime,
+                                              @Param("states") int[] 
stateArray);
+
     /**
      * query first schedule workflow instance
      *
@@ -261,7 +267,7 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
      * query workflow instance by workflowDefinitionCode and stateArray
      *
      * @param workflowDefinitionCode workflowDefinitionCode
-     * @param states                states array
+     * @param states                 states array
      * @return workflow instance list
      */
 
@@ -275,12 +281,12 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
     /**
      * Filter workflow instance
      *
-     * @param page                  page
+     * @param page                   page
      * @param workflowDefinitionCode workflowDefinitionCode
-     * @param name                  name
-     * @param host                  host
-     * @param startTime             startTime
-     * @param endTime               endTime
+     * @param name                   name
+     * @param host                   host
+     * @param startTime              startTime
+     * @param endTime                endTime
      * @return workflow instance IPage
      */
     IPage<WorkflowInstance> 
queryWorkflowInstanceListV2Paging(Page<WorkflowInstance> page,
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
index 5a2fb4e147..94a6a85c6c 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
@@ -68,6 +68,8 @@ public interface WorkflowInstanceDao extends 
IDao<WorkflowInstance> {
     WorkflowInstance queryLastManualWorkflowInterval(Long definitionCode, Long 
taskCode, DateInterval dateInterval,
                                                      int testFlag);
 
+    WorkflowInstance queryLastRunningWorkflowInterval(Long definitionCode, 
DateInterval dateInterval);
+
     /**
      * query first schedule workflow instance
      *
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
index 996678c581..1446047e2b 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
@@ -120,6 +120,16 @@ public class WorkflowInstanceDaoImpl extends 
BaseDao<WorkflowInstance, WorkflowI
                 testFlag);
     }
 
+    @Override
+    public WorkflowInstance queryLastRunningWorkflowInterval(Long 
definitionCode, DateInterval dateInterval) {
+        int[] runningStateArray = new 
int[]{WorkflowExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+                WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(),
+                WorkflowExecutionStatus.READY_PAUSE.ordinal(),
+                WorkflowExecutionStatus.READY_STOP.ordinal()};
+        return mybatisMapper.queryLastRunningWorkflow(definitionCode, 
dateInterval.getStartTime(),
+                dateInterval.getEndTime(), runningStateArray);
+    }
+
     /**
      * query first schedule process instance
      *
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
index 83fba41769..a52e0dcd02 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
@@ -249,6 +249,24 @@
         order by t1.end_time desc limit 1
     </select>
 
+    <select id="queryLastRunningWorkflow" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_workflow_instance
+        where workflow_definition_code=#{workflowDefinitionCode}
+        <if test="states !=null and states.length != 0">
+            and state in
+            <foreach collection="states" item="i" index="index" open="(" 
separator="," close=")">
+                #{i}
+            </foreach>
+        </if>
+        <if test="startTime!=null and endTime != null ">
+            and ((schedule_time <![CDATA[ >= ]]> #{startTime} and 
schedule_time <![CDATA[ <= ]]> #{endTime})
+            or (start_time <![CDATA[ >= ]]> #{startTime} and start_time 
<![CDATA[ <= ]]> #{endTime}))
+        </if>
+        order by start_time desc limit 1
+    </select>
+
     <select id="queryFirstScheduleWorkflowInstance" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
         select
         <include refid="baseSql"/>
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index 300a989b42..9389accb35 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -155,7 +155,7 @@ public class DependentExecute {
         DependResult result = DependResult.FAILED;
         for (DateInterval dateInterval : dateIntervals) {
             WorkflowInstance workflowInstance =
-                    
findLastWorkflowInterval(dependentItem.getDefinitionCode(), 
dependentItem.getDepTaskCode(),
+                    
findDependentWorkflowCandidate(dependentItem.getDefinitionCode(), 
dependentItem.getDepTaskCode(),
                             dateInterval, testFlag);
             if (workflowInstance == null) {
                 return DependResult.WAITING;
@@ -314,17 +314,24 @@ public class DependentExecute {
     }
 
     /**
-     * find the last one workflow instance that :
-     * 1. manual run and finish between the interval
-     * 2. schedule run and schedule time between the interval
+     * find the last one workflow instance that:
+     * 1. running workflow instance in the date interval
+     * 2. manual run and finish between the interval
+     * 3. schedule run and schedule time between the interval
      *
      * @param definitionCode definition code
      * @param taskCode task code
      * @param dateInterval   date interval
      * @return workflowInstance
      */
-    private WorkflowInstance findLastWorkflowInterval(Long definitionCode, 
Long taskCode, DateInterval dateInterval,
-                                                      int testFlag) {
+    private WorkflowInstance findDependentWorkflowCandidate(Long 
definitionCode, Long taskCode,
+                                                            DateInterval 
dateInterval,
+                                                            int testFlag) {
+        WorkflowInstance runningWorkflow =
+                
workflowInstanceDao.queryLastRunningWorkflowInterval(definitionCode, 
dateInterval);
+        if (runningWorkflow != null) {
+            return runningWorkflow;
+        }
 
         WorkflowInstance lastSchedulerWorkflowInstance =
                 
workflowInstanceDao.queryLastSchedulerWorkflowInterval(definitionCode, 
taskCode, dateInterval,

Reply via email to