This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new eab71f1071 [Improvement-15707][Master] Work out the issue that the 
workflow with a task dependency couldn't work well.  (#15712)
eab71f1071 is described below

commit eab71f1071774bd51fe5cfeb66597ea988c49e7c
Author: calvin <[email protected]>
AuthorDate: Thu Mar 14 17:57:13 2024 +0800

    [Improvement-15707][Master] Work out the issue that the workflow with a 
task dependency couldn't work well.  (#15712)
---
 .../dao/mapper/ProcessInstanceMapper.java          |  2 ++
 .../dao/repository/ProcessInstanceDao.java         |  4 +++-
 .../repository/impl/ProcessInstanceDaoImpl.java    |  4 +++-
 .../dao/mapper/ProcessInstanceMapper.xml           | 28 +++++++++++++++-------
 .../dao/mapper/ProcessInstanceMapperTest.java      |  6 +++--
 .../server/master/utils/DependentExecute.java      | 11 +++++----
 6 files changed, 38 insertions(+), 17 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index deef2a9d4c..509bf7ca4d 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -219,12 +219,14 @@ public interface ProcessInstanceMapper extends 
BaseMapper<ProcessInstance> {
      * query last manual process instance
      *
      * @param definitionCode definitionCode
+     * @param taskCode taskCode
      * @param startTime      startTime
      * @param endTime        endTime
      * @param testFlag       testFlag
      * @return process instance
      */
     ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") 
Long definitionCode,
+                                           @Param("taskCode") Long taskCode,
                                            @Param("startTime") Date startTime,
                                            @Param("endTime") Date endTime,
                                            @Param("testFlag") int testFlag);
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index 6aa48ea12d..c1bceab120 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -51,10 +51,12 @@ public interface ProcessInstanceDao extends 
IDao<ProcessInstance> {
      * find last manual process instance interval
      *
      * @param definitionCode process definition code
+     * @param taskCode taskCode
      * @param dateInterval   dateInterval
      * @return process instance
      */
-    ProcessInstance queryLastManualProcessInterval(Long definitionCode, 
DateInterval dateInterval, int testFlag);
+    ProcessInstance queryLastManualProcessInterval(Long definitionCode, Long 
taskCode, DateInterval dateInterval,
+                                                   int testFlag);
 
     /**
      * query first schedule process instance
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index fca93da29d..a5562f7a91 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -82,13 +82,15 @@ public class ProcessInstanceDaoImpl extends 
BaseDao<ProcessInstance, ProcessInst
      * find last manual process instance interval
      *
      * @param definitionCode process definition code
+     * @param taskCode taskCode
      * @param dateInterval   dateInterval
      * @return process instance
      */
     @Override
-    public ProcessInstance queryLastManualProcessInterval(Long definitionCode, 
DateInterval dateInterval,
+    public ProcessInstance queryLastManualProcessInterval(Long definitionCode, 
Long taskCode, DateInterval dateInterval,
                                                           int testFlag) {
         return mybatisMapper.queryLastManualProcess(definitionCode,
+                taskCode,
                 dateInterval.getStartTime(),
                 dateInterval.getEndTime(),
                 testFlag);
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 813568994e..80c008d0fd 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -224,15 +224,25 @@
         order by start_time desc limit 1
     </select>
     <select id="queryLastManualProcess" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
-        select
-        <include refid="baseSql"/>
-        from t_ds_process_instance
-        where process_definition_code=#{processDefinitionCode} and 
test_flag=#{testFlag}
-        and schedule_time is null
-        <if test="startTime!=null and endTime != null ">
-            and start_time <![CDATA[ >= ]]> #{startTime} and start_time 
<![CDATA[ <= ]]> #{endTime}
-        </if>
-        order by end_time desc limit 1
+        select t1.*
+        from
+        (
+            select
+            <include refid="baseSql"/>
+            from t_ds_process_instance
+            where
+            process_definition_code=#{processDefinitionCode} and 
test_flag=#{testFlag}
+            and schedule_time is null
+            <if test="startTime!=null and endTime != null ">
+                and start_time <![CDATA[ >= ]]> #{startTime} and start_time 
<![CDATA[ <= ]]> #{endTime}
+            </if>
+        ) as t1
+        <if test="taskCode != null and taskCode!=0 and taskCode!=-1">
+        inner join
+        t_ds_task_instance as t2
+        on t1.id = t2.process_instance_id and t2.task_code=#{taskCode}
+        </if>
+        order by t1.end_time desc limit 1
     </select>
 
     <select id="queryFirstScheduleProcessInstance" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
index 102110c18f..1a6049aeb1 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
@@ -301,13 +301,15 @@ public class ProcessInstanceMapperTest extends 
BaseDaoTest {
         Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0);
         Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0);
         ProcessInstance processInstance1 =
-                
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(),
 start, end,
+                
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(),
 null, start,
+                        end,
                         processInstance.getTestFlag());
         Assertions.assertEquals(processInstance1.getId(), 
processInstance.getId());
 
         start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0);
         processInstance1 =
-                
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(),
 start, end,
+                
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(),
 null, start,
+                        end,
                         processInstance.getTestFlag());
         Assertions.assertNull(processInstance1);
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index 0d343a709f..15ab34de34 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -149,8 +149,9 @@ public class DependentExecute {
 
         DependResult result = DependResult.FAILED;
         for (DateInterval dateInterval : dateIntervals) {
-            ProcessInstance processInstance = 
findLastProcessInterval(dependentItem.getDefinitionCode(),
-                    dateInterval, testFlag);
+            ProcessInstance processInstance =
+                    findLastProcessInterval(dependentItem.getDefinitionCode(), 
dependentItem.getDepTaskCode(),
+                            dateInterval, testFlag);
             if (processInstance == null) {
                 return DependResult.WAITING;
             }
@@ -311,16 +312,18 @@ public class DependentExecute {
      * 2. schedule run and schedule time between the interval
      *
      * @param definitionCode definition code
+     * @param taskCode task code
      * @param dateInterval   date interval
      * @return ProcessInstance
      */
-    private ProcessInstance findLastProcessInterval(Long definitionCode, 
DateInterval dateInterval, int testFlag) {
+    private ProcessInstance findLastProcessInterval(Long definitionCode, Long 
taskCode, DateInterval dateInterval,
+                                                    int testFlag) {
 
         ProcessInstance lastSchedulerProcess =
                 
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, 
dateInterval, testFlag);
 
         ProcessInstance lastManualProcess =
-                
processInstanceDao.queryLastManualProcessInterval(definitionCode, dateInterval, 
testFlag);
+                
processInstanceDao.queryLastManualProcessInterval(definitionCode, taskCode, 
dateInterval, testFlag);
 
         if (lastManualProcess == null) {
             return lastSchedulerProcess;

Reply via email to