This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fb0f96ed94 [Fix-9868] A task flow definition isolates the runs of
different execution strategies by version numbers. (#9869)
fb0f96ed94 is described below
commit fb0f96ed946c89323ba347eb9d538a22b9be6e73
Author: WangJPLeo <[email protected]>
AuthorDate: Tue May 3 22:52:42 2022 +0800
[Fix-9868] A task flow definition isolates the runs of different execution
strategies by version numbers. (#9869)
* The thread cache task flow definition should get the latest version.
* Coverage on New Code
* Coverage on New Code
* Coverage on New Code
* use an existing method.
* Increase unit test coverage.
* Task flow definitions enforce policy isolation.
---
.../dolphinscheduler/dao/mapper/ProcessInstanceMapper.java | 3 ++-
.../dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml | 3 ++-
.../server/master/runner/WorkflowExecuteThread.java | 2 +-
.../dolphinscheduler/service/process/ProcessServiceImpl.java | 12 ++++++------
.../dolphinscheduler/service/process/ProcessServiceTest.java | 6 +++---
5 files changed, 14 insertions(+), 12 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 1a639df8b4..8c0abf3dce 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -247,7 +247,8 @@ public interface ProcessInstanceMapper extends
BaseMapper<ProcessInstance> {
List<ProcessInstance>
queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long
processDefinitionCode,
@Param("states")
int[] states);
- List<ProcessInstance>
queryByProcessDefineCodeAndStatusAndNextId(@Param("processDefinitionCode") Long
processDefinitionCode,
+ List<ProcessInstance>
queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(@Param("processDefinitionCode")
Long processDefinitionCode,
+
@Param("processDefinitionVersion") int processDefinitionVersion,
@Param("states") int[] states, @Param("id") int id);
int updateGlobalParamsById(@Param("globalParams") String globalParams,
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 2e49ce9b5b..d77e3cc4d8 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -231,11 +231,12 @@
</foreach>
order by id asc
</select>
- <select id="queryByProcessDefineCodeAndStatusAndNextId"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+ <select
id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
+ and process_definition_version = #{processDefinitionVersion}
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 1db5d81b6d..b7af67e675 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -802,7 +802,7 @@ public class WorkflowExecuteThread {
/**
* process end handle
*/
- private void endProcess() {
+ public void endProcess() {
this.stateEvents.clear();
if (processDefinition.getExecutionType().typeIsSerialWait()) {
checkSerialProcess(processDefinition);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 58183f2c88..53c20a5ae8 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -313,8 +313,8 @@ public class ProcessServiceImpl implements ProcessService {
//when we get the running instance(or waiting instance) only get the
priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
while (true) {
- List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
@@ -326,15 +326,15 @@ public class ProcessServiceImpl implements ProcessService
{
}
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard())
{
- List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance);
}
} else if
(processDefinition.getExecutionType().typeIsSerialPriority()) {
- List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
for (ProcessInstance info : runningProcessInstances) {
info.setCommandType(CommandType.STOP);
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 22575681e1..47bdca17d1 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -398,7 +398,7 @@ public class ProcessServiceTest {
command6.setCommandParam("{\"ProcessInstanceId\":223}");
command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command6.setProcessDefinitionVersion(1);
-
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,
Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
+
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L,
1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223,
222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
ProcessInstance processInstance6 =
processService.handleCommand(logger, host, command6);
@@ -419,7 +419,7 @@ public class ProcessServiceTest {
command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command7.setProcessDefinitionVersion(1);
Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
-
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,
Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
+
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L,
1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
ProcessInstance processInstance8 =
processService.handleCommand(logger, host, command7);
Assert.assertTrue(processInstance8 == null);
@@ -441,7 +441,7 @@ public class ProcessServiceTest {
command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command9.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
-
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L,
Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
+
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L,
1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 =
processService.handleCommand(logger, host, command9);