This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0c470ffe66 [Fix #15129] [Dependent] The date rules of the dependent 
node are ambiguous. (#15289)
0c470ffe66 is described below

commit 0c470ffe66312085d47d5f55ad5717647aa44360
Author: ll <[email protected]>
AuthorDate: Fri Dec 8 16:18:33 2023 +0800

    [Fix #15129] [Dependent] The date rules of the dependent node are 
ambiguous. (#15289)
    
    * [Fix-15129][Dependent] Fix the ambiguity in date rules for dependent node.
    
    * [fix #15129] Revert ddl
    
    * restore findLastProcessInterval
    
    * update: mvn spotless:apply
    
    ---------
    
    Co-authored-by: 李乐 <[email protected]>
    Co-authored-by: xiangzihao <[email protected]>
---
 .../dao/mapper/TaskInstanceMapper.java                 | 12 ++++--------
 .../dao/repository/TaskInstanceDao.java                | 12 ++++++------
 .../dao/repository/impl/TaskInstanceDaoImpl.java       | 16 +++++++---------
 .../dolphinscheduler/dao/mapper/TaskInstanceMapper.xml | 14 ++++++--------
 .../server/master/utils/DependentExecute.java          | 18 ++++++++----------
 5 files changed, 31 insertions(+), 41 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index bc28c1d5e6..04e818f5c3 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -170,18 +170,14 @@ public interface TaskInstanceMapper extends 
BaseMapper<TaskInstance> {
      * find last task instance list in the date interval
      *
      * @param taskCodes taskCodes
-     * @param startTime startTime
-     * @param endTime endTime
      * @param testFlag testFlag
      * @return task instance list
      */
-    List<TaskInstance> findLastTaskInstances(@Param("taskCodes") Set<Long> 
taskCodes,
-                                             @Param("startTime") Date 
startTime,
-                                             @Param("endTime") Date endTime,
+    List<TaskInstance> findLastTaskInstances(@Param("processInstanceId") 
Integer processInstanceId,
+                                             @Param("taskCodes") Set<Long> 
taskCodes,
                                              @Param("testFlag") int testFlag);
 
-    TaskInstance findLastTaskInstance(@Param("taskCode") long depTaskCode,
-                                      @Param("startTime") Date startTime,
-                                      @Param("endTime") Date endTime,
+    TaskInstance findLastTaskInstance(@Param("processInstanceId") Integer 
processInstanceId,
+                                      @Param("taskCode") long depTaskCode,
                                       @Param("testFlag") int testFlag);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index b5d41f8783..0156416fd3 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.repository;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 
 import java.util.List;
 import java.util.Set;
@@ -91,20 +90,21 @@ public interface TaskInstanceDao extends IDao<TaskInstance> 
{
     /**
      * find last task instance list corresponding to taskCodes in the date 
interval
      *
+     * @param processInstanceId Task's parent process instance id
      * @param taskCodes taskCodes
-     * @param dateInterval dateInterval
      * @param testFlag test flag
      * @return task instance list
      */
-    List<TaskInstance> queryLastTaskInstanceListIntervalByTaskCodes(Set<Long> 
taskCodes, DateInterval dateInterval,
-                                                                    int 
testFlag);
+    List<TaskInstance> 
queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId,
+                                                                          
Set<Long> taskCodes, int testFlag);
 
     /**
      * find last task instance corresponding to taskCode in the date interval
+     * @param processInstanceId Task's parent process instance id
      * @param depTaskCode taskCode
-     * @param dateInterval dateInterval
      * @param testFlag test flag
      * @return task instance
      */
-    TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, 
DateInterval dateInterval, int testFlag);
+    TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer 
processInstanceId,
+                                                                long 
depTaskCode, int testFlag);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 9cbb92286c..7e96bcd68b 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -27,7 +27,6 @@ import 
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.repository.BaseDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -174,16 +173,15 @@ public class TaskInstanceDaoImpl extends 
BaseDao<TaskInstance, TaskInstanceMappe
     }
 
     @Override
-    public List<TaskInstance> 
queryLastTaskInstanceListIntervalByTaskCodes(Set<Long> taskCodes,
-                                                                           
DateInterval dateInterval, int testFlag) {
-        return mybatisMapper.findLastTaskInstances(taskCodes, 
dateInterval.getStartTime(), dateInterval.getEndTime(),
-                testFlag);
+    public List<TaskInstance> 
queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId,
+                                                                               
  Set<Long> taskCodes,
+                                                                               
  int testFlag) {
+        return mybatisMapper.findLastTaskInstances(processInstanceId, 
taskCodes, testFlag);
     }
 
     @Override
-    public TaskInstance queryLastTaskInstanceIntervalByTaskCode(long 
depTaskCode, DateInterval dateInterval,
-                                                                int testFlag) {
-        return mybatisMapper.findLastTaskInstance(depTaskCode, 
dateInterval.getStartTime(), dateInterval.getEndTime(),
-                testFlag);
+    public TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer 
processInstanceId, long depTaskCode,
+                                                                       int 
testFlag) {
+        return mybatisMapper.findLastTaskInstance(processInstanceId, 
depTaskCode, testFlag);
     }
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 8e3674e523..1544d0ed8f 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -350,27 +350,25 @@
         select task_code, max(end_time) as max_end_time
         from t_ds_task_instance
         where 1=1 and test_flag = #{testFlag}
+        and instance.process_instance_id = #{processInstanceId}
         <if test="taskCodes != null and taskCodes.size() != 0">
             and task_code in
             <foreach collection="taskCodes" index="index" item="i" open="(" 
separator="," close=")">
                 #{i}
             </foreach>
         </if>
-        <if test="startTime!=null and endTime != null">
-            and start_time <![CDATA[ >= ]]> #{startTime} and start_time 
<![CDATA[ <= ]]> #{endTime}
-        </if>
         group by task_code
         ) t_max
-        on instance.task_code = t_max.task_code and instance.end_time = 
t_max.max_end_time
+        on instance.process_instance_id = t_max.process_instance_id
+        and instance.task_code = t_max.task_code
+        and instance.end_time = t_max.max_end_time
     </select>
     <select id="findLastTaskInstance" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSql"/>
         from t_ds_task_instance
-        where task_code = #{taskCode}
-        <if test="startTime!=null and endTime != null">
-            and start_time <![CDATA[ >= ]]> #{startTime} and start_time 
<![CDATA[ <= ]]> #{endTime}
-        </if>
+        where process_instance_id = #{processInstanceId}
+        and task_code = #{taskCode}
         order by end_time desc limit 1
     </select>
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index 195212ef34..0d343a709f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -158,10 +158,9 @@ public class DependentExecute {
             if (dependentItem.getDepTaskCode() == 
Constants.DEPENDENT_WORKFLOW_CODE) {
                 result = dependResultByProcessInstance(processInstance);
             } else if (dependentItem.getDepTaskCode() == 
Constants.DEPENDENT_ALL_TASK_CODE) {
-                result = 
dependResultByAllTaskOfProcessInstance(processInstance, dateInterval, testFlag);
+                result = 
dependResultByAllTaskOfProcessInstance(processInstance, testFlag);
             } else {
-                result = dependResultBySingleTaskInstance(processInstance, 
dependentItem.getDepTaskCode(), dateInterval,
-                        testFlag);
+                result = dependResultBySingleTaskInstance(processInstance, 
dependentItem.getDepTaskCode(), testFlag);
             }
             if (result != DependResult.SUCCESS) {
                 break;
@@ -194,8 +193,7 @@ public class DependentExecute {
      *
      * @return
      */
-    private DependResult 
dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance,
-                                                                DateInterval 
dateInterval, int testFlag) {
+    private DependResult 
dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance, int 
testFlag) {
         if (!processInstance.getState().isFinished()) {
             log.info("Wait for the dependent workflow to complete, 
processCode: {}, processInstanceId: {}.",
                     processInstance.getProcessDefinitionCode(), 
processInstance.getId());
@@ -212,8 +210,8 @@ public class DependentExecute {
                             
.collect(Collectors.toMap(TaskDefinitionLog::getCode, 
TaskDefinitionLog::getName));
 
             List<TaskInstance> taskInstanceList =
-                    
taskInstanceDao.queryLastTaskInstanceListIntervalByTaskCodes(taskDefinitionCodeMap.keySet(),
-                            dateInterval, testFlag);
+                    
taskInstanceDao.queryLastTaskInstanceListIntervalInProcessInstance(processInstance.getId(),
+                            taskDefinitionCodeMap.keySet(), testFlag);
             Map<Long, TaskExecutionStatus> taskExecutionStatusMap =
                     taskInstanceList.stream()
                             .filter(taskInstance -> 
taskInstance.getTaskExecuteType() != TaskExecuteType.STREAM)
@@ -245,14 +243,14 @@ public class DependentExecute {
      *
      * @param processInstance last process instance in the date interval
      * @param depTaskCode the dependent task code
-     * @param dateInterval date interval
      * @param testFlag test flag
      * @return depend result
      */
     private DependResult dependResultBySingleTaskInstance(ProcessInstance 
processInstance, long depTaskCode,
-                                                          DateInterval 
dateInterval, int testFlag) {
+                                                          int testFlag) {
         TaskInstance taskInstance =
-                
taskInstanceDao.queryLastTaskInstanceIntervalByTaskCode(depTaskCode, 
dateInterval, testFlag);
+                
taskInstanceDao.queryLastTaskInstanceIntervalInProcessInstance(processInstance.getId(),
+                        depTaskCode, testFlag);
 
         if (taskInstance == null) {
             TaskDefinition taskDefinition = 
taskDefinitionDao.queryByCode(depTaskCode);

Reply via email to