This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37325b4c34 [Bug](dependent) Dependent downstream trigger error when 
schedule cycle not day. (#11734)
37325b4c34 is described below

commit 37325b4c3410c2fe2f025c16363ea0c3a157647e
Author: Stalary <[email protected]>
AuthorDate: Thu Sep 8 15:08:10 2022 +0800

    [Bug](dependent) Dependent downstream trigger error when schedule cycle not 
day. (#11734)
    
    * FIX: dependent
    
    * FIX: version
    
    * MOD: for review
---
 .../api/service/impl/ExecutorServiceImpl.java           |  8 +++++---
 .../dao/entity/DependentProcessDefinition.java          | 17 +++++++++++++++--
 .../dao/mapper/WorkFlowLineageMapper.xml                |  1 +
 3 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 957447dd68..b4fb3fb8aa 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -907,6 +907,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
         for (DependentProcessDefinition dependentProcessDefinition : 
dependentProcessDefinitionList) {
             
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+            
dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
             
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
             Map<String, String> cmdParam = 
JSONUtils.toMap(dependentCommand.getCommandParam());
             cmdParam.put(CMD_PARAM_START_NODES, 
String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
@@ -927,7 +928,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                 
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
 
         return 
checkDependentProcessDefinitionValid(dependentProcessDefinitionList, 
processDefinitionCycle,
-                workerGroup);
+                workerGroup, processDefinitionCode);
     }
 
     /**
@@ -938,7 +939,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
     private List<DependentProcessDefinition> 
checkDependentProcessDefinitionValid(
                                                                                
   List<DependentProcessDefinition> dependentProcessDefinitionList,
                                                                                
   CycleEnum processDefinitionCycle,
-                                                                               
   String workerGroup) {
+                                                                               
   String workerGroup,
+                                                                               
   long upstreamProcessDefinitionCode) {
         List<DependentProcessDefinition> validDependentProcessDefinitionList = 
new ArrayList<>();
 
         List<Long> processDefinitionCodeList =
@@ -949,7 +951,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                 
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
 
         for (DependentProcessDefinition dependentProcessDefinition : 
dependentProcessDefinitionList) {
-            if (dependentProcessDefinition.getDependentCycle() == 
processDefinitionCycle) {
+            if 
(dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == 
processDefinitionCycle) {
                 if (processDefinitionWorkerGroupMap
                         
.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
                     dependentProcessDefinition.setWorkerGroup(workerGroup);
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
index 9de57dff33..87bb3d4234 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
@@ -41,6 +41,11 @@ public class DependentProcessDefinition {
      */
     private String processDefinitionName;
 
+    /**
+     * process definition version
+     **/
+    private int processDefinitionVersion;
+
     /**
      * task definition name
      */
@@ -60,14 +65,14 @@ public class DependentProcessDefinition {
      * get dependent cycle
      * @return CycleEnum
      */
-    public CycleEnum getDependentCycle() {
+    public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
         DependentParameters dependentParameters = 
this.getDependentParameters();
         List<DependentTaskModel> dependentTaskModelList = 
dependentParameters.getDependTaskList();
 
         for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
             List<DependentItem> dependentItemList = 
dependentTaskModel.getDependItemList();
             for (DependentItem dependentItem : dependentItemList) {
-                if (this.getProcessDefinitionCode() == 
dependentItem.getDefinitionCode()) {
+                if (upstreamProcessDefinitionCode == 
dependentItem.getDefinitionCode()) {
                     return cycle2CycleEnum(dependentItem.getCycle());
                 }
             }
@@ -122,6 +127,14 @@ public class DependentProcessDefinition {
         this.processDefinitionCode = code;
     }
 
+    public int getProcessDefinitionVersion() {
+        return processDefinitionVersion;
+    }
+
+    public void setProcessDefinitionVersion(int processDefinitionVersion) {
+        this.processDefinitionVersion = processDefinitionVersion;
+    }
+
     public long getTaskDefinitionCode() {
         return this.taskDefinitionCode;
     }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index b17499bb60..2689b6d50f 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -149,6 +149,7 @@
         SELECT
         c.code AS process_definition_code
         ,c.name AS process_definition_name
+        ,c.version as process_definition_version
         ,a.code AS task_definition_code
         ,a.task_params
         FROM

Reply via email to