This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new eab71f1071 [Improvement-15707][Master] Work out the issue that the
workflow with a task dependency couldn't work well. (#15712)
eab71f1071 is described below
commit eab71f1071774bd51fe5cfeb66597ea988c49e7c
Author: calvin <[email protected]>
AuthorDate: Thu Mar 14 17:57:13 2024 +0800
[Improvement-15707][Master] Work out the issue that the workflow with a
task dependency couldn't work well. (#15712)
---
.../dao/mapper/ProcessInstanceMapper.java | 2 ++
.../dao/repository/ProcessInstanceDao.java | 4 +++-
.../repository/impl/ProcessInstanceDaoImpl.java | 4 +++-
.../dao/mapper/ProcessInstanceMapper.xml | 28 +++++++++++++++-------
.../dao/mapper/ProcessInstanceMapperTest.java | 6 +++--
.../server/master/utils/DependentExecute.java | 11 +++++----
6 files changed, 38 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index deef2a9d4c..509bf7ca4d 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -219,12 +219,14 @@ public interface ProcessInstanceMapper extends
BaseMapper<ProcessInstance> {
* query last manual process instance
*
* @param definitionCode definitionCode
+ * @param taskCode taskCode
* @param startTime startTime
* @param endTime endTime
* @param testFlag testFlag
* @return process instance
*/
ProcessInstance queryLastManualProcess(@Param("processDefinitionCode")
Long definitionCode,
+ @Param("taskCode") Long taskCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("testFlag") int testFlag);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index 6aa48ea12d..c1bceab120 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -51,10 +51,12 @@ public interface ProcessInstanceDao extends
IDao<ProcessInstance> {
* find last manual process instance interval
*
* @param definitionCode process definition code
+ * @param taskCode taskCode
* @param dateInterval dateInterval
* @return process instance
*/
- ProcessInstance queryLastManualProcessInterval(Long definitionCode,
DateInterval dateInterval, int testFlag);
+ ProcessInstance queryLastManualProcessInterval(Long definitionCode, Long
taskCode, DateInterval dateInterval,
+ int testFlag);
/**
* query first schedule process instance
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index fca93da29d..a5562f7a91 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -82,13 +82,15 @@ public class ProcessInstanceDaoImpl extends
BaseDao<ProcessInstance, ProcessInst
* find last manual process instance interval
*
* @param definitionCode process definition code
+ * @param taskCode taskCode
* @param dateInterval dateInterval
* @return process instance
*/
@Override
- public ProcessInstance queryLastManualProcessInterval(Long definitionCode,
DateInterval dateInterval,
+ public ProcessInstance queryLastManualProcessInterval(Long definitionCode,
Long taskCode, DateInterval dateInterval,
int testFlag) {
return mybatisMapper.queryLastManualProcess(definitionCode,
+ taskCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
testFlag);
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 813568994e..80c008d0fd 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -224,15 +224,25 @@
order by start_time desc limit 1
</select>
<select id="queryLastManualProcess"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
- select
- <include refid="baseSql"/>
- from t_ds_process_instance
- where process_definition_code=#{processDefinitionCode} and
test_flag=#{testFlag}
- and schedule_time is null
- <if test="startTime!=null and endTime != null ">
- and start_time <![CDATA[ >= ]]> #{startTime} and start_time
<![CDATA[ <= ]]> #{endTime}
- </if>
- order by end_time desc limit 1
+ select t1.*
+ from
+ (
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_instance
+ where
+ process_definition_code=#{processDefinitionCode} and
test_flag=#{testFlag}
+ and schedule_time is null
+ <if test="startTime!=null and endTime != null ">
+ and start_time <![CDATA[ >= ]]> #{startTime} and start_time
<![CDATA[ <= ]]> #{endTime}
+ </if>
+ ) as t1
+ <if test="taskCode != null and taskCode!=0 and taskCode!=-1">
+ inner join
+ t_ds_task_instance as t2
+ on t1.id = t2.process_instance_id and t2.task_code=#{taskCode}
+ </if>
+ order by t1.end_time desc limit 1
</select>
<select id="queryFirstScheduleProcessInstance"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
index 102110c18f..1a6049aeb1 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
@@ -301,13 +301,15 @@ public class ProcessInstanceMapperTest extends
BaseDaoTest {
Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0);
Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0);
ProcessInstance processInstance1 =
-
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(),
start, end,
+
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(),
null, start,
+ end,
processInstance.getTestFlag());
Assertions.assertEquals(processInstance1.getId(),
processInstance.getId());
start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0);
processInstance1 =
-
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(),
start, end,
+
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(),
null, start,
+ end,
processInstance.getTestFlag());
Assertions.assertNull(processInstance1);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index 0d343a709f..15ab34de34 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -149,8 +149,9 @@ public class DependentExecute {
DependResult result = DependResult.FAILED;
for (DateInterval dateInterval : dateIntervals) {
- ProcessInstance processInstance =
findLastProcessInterval(dependentItem.getDefinitionCode(),
- dateInterval, testFlag);
+ ProcessInstance processInstance =
+ findLastProcessInterval(dependentItem.getDefinitionCode(),
dependentItem.getDepTaskCode(),
+ dateInterval, testFlag);
if (processInstance == null) {
return DependResult.WAITING;
}
@@ -311,16 +312,18 @@ public class DependentExecute {
* 2. schedule run and schedule time between the interval
*
* @param definitionCode definition code
+ * @param taskCode task code
* @param dateInterval date interval
* @return ProcessInstance
*/
- private ProcessInstance findLastProcessInterval(Long definitionCode,
DateInterval dateInterval, int testFlag) {
+ private ProcessInstance findLastProcessInterval(Long definitionCode, Long
taskCode, DateInterval dateInterval,
+ int testFlag) {
ProcessInstance lastSchedulerProcess =
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode,
dateInterval, testFlag);
ProcessInstance lastManualProcess =
-
processInstanceDao.queryLastManualProcessInterval(definitionCode, dateInterval,
testFlag);
+
processInstanceDao.queryLastManualProcessInterval(definitionCode, taskCode,
dateInterval, testFlag);
if (lastManualProcess == null) {
return lastSchedulerProcess;