SbloodyS commented on code in PR #18003:
URL: 
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2963816118


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -166,13 +179,100 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> visitedCodes = new HashSet<>();
+            
visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, 
visitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<String> 
backfillTimeList,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+
+        // 2) Convert upstream backfill time from string to ZonedDateTime as 
the base business dates for downstream
+        // backfill
+        final List<ZonedDateTime> upstreamBackfillDates = 
backfillTimeList.stream()
+                .map(DateUtils::stringToZoneDateTime)
+                .collect(Collectors.toList());
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (DependentWorkflowDefinition dependentWorkflowDefinition : 
downstreamDefinitions) {
+            long downstreamCode = 
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+            // Prevent self-dependency and circular dependency chains
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip circular dependent workflow {}", 
downstreamCode);
+                continue;
+            }
+
+            WorkflowDefinition downstreamWorkflow =
+                    
workflowDefinitionDao.queryByCode(downstreamCode).orElse(null);
+            if (downstreamWorkflow == null) {
+                log.warn("Skip dependent workflow {}, definition not found", 
downstreamCode);

Review Comment:
   ```suggestion
                   log.warn("Skip dependent workflow {}, workflow definition 
not found", downstreamCode);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to