SbloodyS commented on a change in pull request #8496:
URL: https://github.com/apache/dolphinscheduler/pull/8496#discussion_r812592610



##########
File path: 
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
##########
@@ -88,4 +88,23 @@
          where project_code = #{projectCode}
                 and process_definition_code = #{processDefinitionCode}
     </select>
+
+    <select id="queryDependentProcessDefinitionByProcessDefinitionCode" 
resultType="DependentProcessDefinition">
+        SELECT
+        c.code AS process_definition_code
+        ,c.name AS process_definition_name
+        ,a.code AS task_definition_code
+        ,a.task_params
+        ,d.worker_group
+        FROM
+        t_ds_task_definition a
+        JOIN t_ds_process_task_relation b ON a.code    = b.pre_task_code and 
a.version = b.pre_task_version
+        JOIN t_ds_process_definition c ON c.code = b.process_definition_code 
AND c.version = b.process_definition_version AND c.project_code = b.project_code

Review comment:
       This project_code is just to find the task definition’s process 
definition. It won't affect cross projects.

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -673,14 +691,80 @@ private int createComplementCommandList(Date start, Date 
end, RunMode runMode, C
                         cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(listDate.get(endDateIndex)));
                         
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                         processService.createCommand(command);
+
+                        if (schedules.isEmpty() || complementDependentMode == 
ComplementDependentMode.OFF_MODE) {
+                            logger.info("process code: {} complement dependent 
in off mode or schedule's size is 0, skip "
+                                    + "dependent complement data", 
command.getProcessDefinitionCode());
+                        } else {
+                            dependentProcessDefinitionCreateCount += 
createComplementDependentCommand(schedules, command);
+                        }
                     }
                 }
                 break;
             }
             default:
                 break;
         }
-        logger.info("create complement command count: {}", createCount);
+        logger.info("create complement command count: {}, create dependent 
complement command count: {}", createCount
+                , dependentProcessDefinitionCreateCount);
         return createCount;
     }
+
+    /**
+     * create complement dependent command
+     */
+    @SuppressWarnings("checkstyle:OperatorWrap")
+    private int createComplementDependentCommand(List<Schedule> schedules, 
Command command) {
+        int dependentProcessDefinitionCreateCount = 0;
+        Command dependentCommand;
+
+        try {
+            dependentCommand = (Command) BeanUtils.cloneBean(command);

Review comment:
       I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to