This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 37325b4c34 [Bug](dependent) Dependent downstream trigger error when
schedule cycle not day. (#11734)
37325b4c34 is described below
commit 37325b4c3410c2fe2f025c16363ea0c3a157647e
Author: Stalary <[email protected]>
AuthorDate: Thu Sep 8 15:08:10 2022 +0800
[Bug](dependent) Dependent downstream trigger error when schedule cycle not
day. (#11734)
* FIX: dependent
* FIX: version
* MOD: for review
---
.../api/service/impl/ExecutorServiceImpl.java | 8 +++++---
.../dao/entity/DependentProcessDefinition.java | 17 +++++++++++++++--
.../dao/mapper/WorkFlowLineageMapper.xml | 1 +
3 files changed, 21 insertions(+), 5 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 957447dd68..b4fb3fb8aa 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -907,6 +907,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition :
dependentProcessDefinitionList) {
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+
dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
Map<String, String> cmdParam =
JSONUtils.toMap(dependentCommand.getCommandParam());
cmdParam.put(CMD_PARAM_START_NODES,
String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
@@ -927,7 +928,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
return
checkDependentProcessDefinitionValid(dependentProcessDefinitionList,
processDefinitionCycle,
- workerGroup);
+ workerGroup, processDefinitionCode);
}
/**
@@ -938,7 +939,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
private List<DependentProcessDefinition>
checkDependentProcessDefinitionValid(
List<DependentProcessDefinition> dependentProcessDefinitionList,
CycleEnum processDefinitionCycle,
-
String workerGroup) {
+
String workerGroup,
+
long upstreamProcessDefinitionCode) {
List<DependentProcessDefinition> validDependentProcessDefinitionList =
new ArrayList<>();
List<Long> processDefinitionCodeList =
@@ -949,7 +951,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition :
dependentProcessDefinitionList) {
- if (dependentProcessDefinition.getDependentCycle() ==
processDefinitionCycle) {
+ if
(dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) ==
processDefinitionCycle) {
if (processDefinitionWorkerGroupMap
.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
dependentProcessDefinition.setWorkerGroup(workerGroup);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
index 9de57dff33..87bb3d4234 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
@@ -41,6 +41,11 @@ public class DependentProcessDefinition {
*/
private String processDefinitionName;
+ /**
+ * process definition version
+ **/
+ private int processDefinitionVersion;
+
/**
* task definition name
*/
@@ -60,14 +65,14 @@ public class DependentProcessDefinition {
* get dependent cycle
* @return CycleEnum
*/
- public CycleEnum getDependentCycle() {
+ public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
DependentParameters dependentParameters =
this.getDependentParameters();
List<DependentTaskModel> dependentTaskModelList =
dependentParameters.getDependTaskList();
for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
List<DependentItem> dependentItemList =
dependentTaskModel.getDependItemList();
for (DependentItem dependentItem : dependentItemList) {
- if (this.getProcessDefinitionCode() ==
dependentItem.getDefinitionCode()) {
+ if (upstreamProcessDefinitionCode ==
dependentItem.getDefinitionCode()) {
return cycle2CycleEnum(dependentItem.getCycle());
}
}
@@ -122,6 +127,14 @@ public class DependentProcessDefinition {
this.processDefinitionCode = code;
}
+ public int getProcessDefinitionVersion() {
+ return processDefinitionVersion;
+ }
+
+ public void setProcessDefinitionVersion(int processDefinitionVersion) {
+ this.processDefinitionVersion = processDefinitionVersion;
+ }
+
public long getTaskDefinitionCode() {
return this.taskDefinitionCode;
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index b17499bb60..2689b6d50f 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -149,6 +149,7 @@
SELECT
c.code AS process_definition_code
,c.name AS process_definition_name
+ ,c.version as process_definition_version
,a.code AS task_definition_code
,a.task_params
FROM