This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0c470ffe66 [Fix #15129] [Dependent] The date rules of the dependent
node are ambiguous. (#15289)
0c470ffe66 is described below
commit 0c470ffe66312085d47d5f55ad5717647aa44360
Author: ll <[email protected]>
AuthorDate: Fri Dec 8 16:18:33 2023 +0800
[Fix #15129] [Dependent] The date rules of the dependent node are
ambiguous. (#15289)
* [Fix-15129][Dependent] Fix the ambiguity in date rules for dependent node.
* [fix #15129] Revert ddl
* restore findLastProcessInterval
* update: mvn spotless:apply
---------
Co-authored-by: 李乐 <[email protected]>
Co-authored-by: xiangzihao <[email protected]>
---
.../dao/mapper/TaskInstanceMapper.java | 12 ++++--------
.../dao/repository/TaskInstanceDao.java | 12 ++++++------
.../dao/repository/impl/TaskInstanceDaoImpl.java | 16 +++++++---------
.../dolphinscheduler/dao/mapper/TaskInstanceMapper.xml | 14 ++++++--------
.../server/master/utils/DependentExecute.java | 18 ++++++++----------
5 files changed, 31 insertions(+), 41 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index bc28c1d5e6..04e818f5c3 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -170,18 +170,14 @@ public interface TaskInstanceMapper extends
BaseMapper<TaskInstance> {
* find last task instance list in the date interval
*
* @param taskCodes taskCodes
- * @param startTime startTime
- * @param endTime endTime
* @param testFlag testFlag
* @return task instance list
*/
- List<TaskInstance> findLastTaskInstances(@Param("taskCodes") Set<Long>
taskCodes,
- @Param("startTime") Date
startTime,
- @Param("endTime") Date endTime,
+ List<TaskInstance> findLastTaskInstances(@Param("processInstanceId")
Integer processInstanceId,
+ @Param("taskCodes") Set<Long>
taskCodes,
@Param("testFlag") int testFlag);
- TaskInstance findLastTaskInstance(@Param("taskCode") long depTaskCode,
- @Param("startTime") Date startTime,
- @Param("endTime") Date endTime,
+ TaskInstance findLastTaskInstance(@Param("processInstanceId") Integer
processInstanceId,
+ @Param("taskCode") long depTaskCode,
@Param("testFlag") int testFlag);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index b5d41f8783..0156416fd3 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import java.util.List;
import java.util.Set;
@@ -91,20 +90,21 @@ public interface TaskInstanceDao extends IDao<TaskInstance>
{
/**
* find last task instance list corresponding to taskCodes in the date
interval
*
+ * @param processInstanceId Task's parent process instance id
* @param taskCodes taskCodes
- * @param dateInterval dateInterval
* @param testFlag test flag
* @return task instance list
*/
- List<TaskInstance> queryLastTaskInstanceListIntervalByTaskCodes(Set<Long>
taskCodes, DateInterval dateInterval,
- int
testFlag);
+ List<TaskInstance>
queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId,
+
Set<Long> taskCodes, int testFlag);
/**
* find last task instance corresponding to taskCode in the date interval
+ * @param processInstanceId Task's parent process instance id
* @param depTaskCode taskCode
- * @param dateInterval dateInterval
* @param testFlag test flag
* @return task instance
*/
- TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode,
DateInterval dateInterval, int testFlag);
+ TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer
processInstanceId,
+ long
depTaskCode, int testFlag);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 9cbb92286c..7e96bcd68b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -27,7 +27,6 @@ import
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.commons.lang3.StringUtils;
@@ -174,16 +173,15 @@ public class TaskInstanceDaoImpl extends
BaseDao<TaskInstance, TaskInstanceMappe
}
@Override
- public List<TaskInstance>
queryLastTaskInstanceListIntervalByTaskCodes(Set<Long> taskCodes,
-
DateInterval dateInterval, int testFlag) {
- return mybatisMapper.findLastTaskInstances(taskCodes,
dateInterval.getStartTime(), dateInterval.getEndTime(),
- testFlag);
+ public List<TaskInstance>
queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId,
+
Set<Long> taskCodes,
+
int testFlag) {
+ return mybatisMapper.findLastTaskInstances(processInstanceId,
taskCodes, testFlag);
}
@Override
- public TaskInstance queryLastTaskInstanceIntervalByTaskCode(long
depTaskCode, DateInterval dateInterval,
- int testFlag) {
- return mybatisMapper.findLastTaskInstance(depTaskCode,
dateInterval.getStartTime(), dateInterval.getEndTime(),
- testFlag);
+ public TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer
processInstanceId, long depTaskCode,
+ int
testFlag) {
+ return mybatisMapper.findLastTaskInstance(processInstanceId,
depTaskCode, testFlag);
}
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 8e3674e523..1544d0ed8f 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -350,27 +350,25 @@
select task_code, max(end_time) as max_end_time
from t_ds_task_instance
where 1=1 and test_flag = #{testFlag}
+ and instance.process_instance_id = #{processInstanceId}
<if test="taskCodes != null and taskCodes.size() != 0">
and task_code in
<foreach collection="taskCodes" index="index" item="i" open="("
separator="," close=")">
#{i}
</foreach>
</if>
- <if test="startTime!=null and endTime != null">
- and start_time <![CDATA[ >= ]]> #{startTime} and start_time
<![CDATA[ <= ]]> #{endTime}
- </if>
group by task_code
) t_max
- on instance.task_code = t_max.task_code and instance.end_time =
t_max.max_end_time
+ on instance.process_instance_id = t_max.process_instance_id
+ and instance.task_code = t_max.task_code
+ and instance.end_time = t_max.max_end_time
</select>
<select id="findLastTaskInstance"
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
- where task_code = #{taskCode}
- <if test="startTime!=null and endTime != null">
- and start_time <![CDATA[ >= ]]> #{startTime} and start_time
<![CDATA[ <= ]]> #{endTime}
- </if>
+ where process_instance_id = #{processInstanceId}
+ and task_code = #{taskCode}
order by end_time desc limit 1
</select>
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index 195212ef34..0d343a709f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -158,10 +158,9 @@ public class DependentExecute {
if (dependentItem.getDepTaskCode() ==
Constants.DEPENDENT_WORKFLOW_CODE) {
result = dependResultByProcessInstance(processInstance);
} else if (dependentItem.getDepTaskCode() ==
Constants.DEPENDENT_ALL_TASK_CODE) {
- result =
dependResultByAllTaskOfProcessInstance(processInstance, dateInterval, testFlag);
+ result =
dependResultByAllTaskOfProcessInstance(processInstance, testFlag);
} else {
- result = dependResultBySingleTaskInstance(processInstance,
dependentItem.getDepTaskCode(), dateInterval,
- testFlag);
+ result = dependResultBySingleTaskInstance(processInstance,
dependentItem.getDepTaskCode(), testFlag);
}
if (result != DependResult.SUCCESS) {
break;
@@ -194,8 +193,7 @@ public class DependentExecute {
*
* @return
*/
- private DependResult
dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance,
- DateInterval
dateInterval, int testFlag) {
+ private DependResult
dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance, int
testFlag) {
if (!processInstance.getState().isFinished()) {
log.info("Wait for the dependent workflow to complete,
processCode: {}, processInstanceId: {}.",
processInstance.getProcessDefinitionCode(),
processInstance.getId());
@@ -212,8 +210,8 @@ public class DependentExecute {
.collect(Collectors.toMap(TaskDefinitionLog::getCode,
TaskDefinitionLog::getName));
List<TaskInstance> taskInstanceList =
-
taskInstanceDao.queryLastTaskInstanceListIntervalByTaskCodes(taskDefinitionCodeMap.keySet(),
- dateInterval, testFlag);
+
taskInstanceDao.queryLastTaskInstanceListIntervalInProcessInstance(processInstance.getId(),
+ taskDefinitionCodeMap.keySet(), testFlag);
Map<Long, TaskExecutionStatus> taskExecutionStatusMap =
taskInstanceList.stream()
.filter(taskInstance ->
taskInstance.getTaskExecuteType() != TaskExecuteType.STREAM)
@@ -245,14 +243,14 @@ public class DependentExecute {
*
* @param processInstance last process instance in the date interval
* @param depTaskCode the dependent task code
- * @param dateInterval date interval
* @param testFlag test flag
* @return depend result
*/
private DependResult dependResultBySingleTaskInstance(ProcessInstance
processInstance, long depTaskCode,
- DateInterval
dateInterval, int testFlag) {
+ int testFlag) {
TaskInstance taskInstance =
-
taskInstanceDao.queryLastTaskInstanceIntervalByTaskCode(depTaskCode,
dateInterval, testFlag);
+
taskInstanceDao.queryLastTaskInstanceIntervalInProcessInstance(processInstance.getId(),
+ depTaskCode, testFlag);
if (taskInstance == null) {
TaskDefinition taskDefinition =
taskDefinitionDao.queryByCode(depTaskCode);