Copilot commented on code in PR #18003:
URL: 
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2992169734


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -98,10 +116,12 @@ private List<Integer> doParallelBackfillWorkflow(final 
BackfillWorkflowDTO backf
 
         log.info("In parallel mode, current expectedParallelismNumber: {}", 
expectedParallelismNumber);
         final List<Integer> workflowInstanceIdList = Lists.newArrayList();
-        for (List<ZonedDateTime> stringDate : splitDateTime(listDate, 
expectedParallelismNumber)) {
-            final Integer workflowInstanceId = doBackfillWorkflow(
-                    backfillWorkflowDTO,
-                    
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+        final Set<Long> baseVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+        for (List<ZonedDateTime> dateChunk : splitDateTime(listDate, 
expectedParallelismNumber)) {
+            // Each parallel chunk should keep its own traversal context to 
avoid cross-chunk pollution.

Review Comment:
   `expectedParallelismNumber` can be `0` (BackfillWorkflowDTOValidator allows 
`>= 0`), which will flow into `splitDateTime(listDate, 
expectedParallelismNumber)` and cause an `ArithmeticException` (division by 
zero). Please guard so `numParts >= 1` before calling `splitDateTime` (e.g., 
treat 0 as 1 or as `listDate.size()`), and/or tighten validation to disallow 0 
when runMode is PARALLEL.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +181,165 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        // downstreamDefinitions may contain multiple entries for the same 
downstream workflow code
+        // (different dependent task lineage). We should only traverse each 
downstream workflow once
+        // (visitedCodes check), but trigger all dependent nodes within that 
downstream workflow by
+        // aggregating distinct taskDefinitionCodes into startNodes.
+        final Map<Long, List<DependentWorkflowDefinition>> 
downstreamDefinitionsByCode =
+                downstreamDefinitions.stream()
+                        
.collect(Collectors.groupingBy(DependentWorkflowDefinition::getWorkflowDefinitionCode));
+        final Set<Long> downstreamCodes = downstreamDefinitionsByCode.keySet();
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        // queryByCodes returns multiple versions for the same workflow code, 
so we must select the correct one
+        // based on DependentWorkflowDefinition.getWorkflowDefinitionVersion().

Review Comment:
   The comment says `queryByCodes` returns multiple versions for the same 
workflow code, but `t_ds_workflow_definition` has a unique constraint on `code` 
(see `dolphinscheduler_mysql.sql`), so this is misleading. Please update/remove 
the comment (and consider simplifying the selection logic if multiple versions 
cannot exist).
   ```suggestion
           // Each workflow code maps to a single WorkflowDefinition (code is 
unique in t_ds_workflow_definition).
           // We still group by code to simplify lookup and keep the code 
robust if this ever changes.
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +181,165 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        // downstreamDefinitions may contain multiple entries for the same 
downstream workflow code
+        // (different dependent task lineage). We should only traverse each 
downstream workflow once
+        // (visitedCodes check), but trigger all dependent nodes within that 
downstream workflow by
+        // aggregating distinct taskDefinitionCodes into startNodes.
+        final Map<Long, List<DependentWorkflowDefinition>> 
downstreamDefinitionsByCode =
+                downstreamDefinitions.stream()
+                        
.collect(Collectors.groupingBy(DependentWorkflowDefinition::getWorkflowDefinitionCode));
+        final Set<Long> downstreamCodes = downstreamDefinitionsByCode.keySet();
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        // queryByCodes returns multiple versions for the same workflow code, 
so we must select the correct one
+        // based on DependentWorkflowDefinition.getWorkflowDefinitionVersion().
+        final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode 
= downstreamWorkflowList.stream()
+                .collect(Collectors.groupingBy(WorkflowDefinition::getCode));
+
+        // 2) Reuse upstream business dates for downstream backfill (same 
instants/zones as the chunk passed to
+        // doBackfillWorkflow; avoids List<String> -> system-default parse -> 
dateToString drift)
+        final List<ZonedDateTime> upstreamBackfillDates = new 
ArrayList<>(backfillDateTimes);
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (Map.Entry<Long, List<DependentWorkflowDefinition>> entry : 
downstreamDefinitionsByCode.entrySet()) {
+            long downstreamCode = entry.getKey();
+            List<DependentWorkflowDefinition> dependentDefinitions = 
entry.getValue();
+
+            // Prevent self-dependency and circular dependency chains.
+            // We only traverse each downstream workflow once.
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip circular dependent workflow {}", 
downstreamCode);
+                continue;
+            }
+
+            DependentWorkflowDefinition representativeDependent = 
dependentDefinitions.get(0);
+
+            // Aggregate dependent nodes within the same downstream workflow.
+            // If any entry represents workflow-level dependency 
(taskDefinitionCode==0),
+            // we should backfill the whole downstream workflow 
(startNodes=null).
+            final boolean isWorkflowLevelDependency =
+                    dependentDefinitions.stream().anyMatch(d -> 
d.getTaskDefinitionCode() == 0);
+            final List<Long> aggregatedStartNodes;
+            if (isWorkflowLevelDependency) {
+                aggregatedStartNodes = null;
+            } else {
+                aggregatedStartNodes = dependentDefinitions.stream()
+                        
.map(DependentWorkflowDefinition::getTaskDefinitionCode)
+                        .filter(code -> code != 0)
+                        .distinct()
+                        .sorted()
+                        .collect(Collectors.toList());
+            }
+
+            WorkflowDefinition downstreamWorkflow = null;
+            List<WorkflowDefinition> workflowCandidates = 
downstreamWorkflowMapByCode.get(downstreamCode);
+            if (workflowCandidates != null) {
+                downstreamWorkflow =
+                        workflowCandidates.stream()
+                                .filter(workflow -> workflow.getVersion() == 
representativeDependent
+                                        .getWorkflowDefinitionVersion())
+                                .findFirst()
+                                .orElse(null);
+            }
+            if (downstreamWorkflow == null) {
+                log.warn("Skip dependent workflow {}, workflow definition not 
found", downstreamCode);
+                continue;
+            }
+
+            if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+                log.warn("Skip dependent workflow {}, release state is not 
ONLINE", downstreamCode);
+                continue;
+            }
+
+            // Currently, reuse the same business date list as upstream for 
downstream backfill;
+            // later we can refine the dates based on dependency cycle 
configuration in dependentWorkflowDefinition
+            // (taskParams).
+            BackfillWorkflowDTO.BackfillParamsDTO originalParams = 
backfillWorkflowDTO.getBackfillParams();
+            boolean allLevelDependent = originalParams.isAllLevelDependent();
+            ComplementDependentMode downstreamDependentMode =
+                    allLevelDependent ? 
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+            BackfillWorkflowDTO.BackfillParamsDTO dependentParams = 
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+                    // When the upstream is PARALLEL, dependent triggers 
should not re-apply
+                    // chunking on the already sliced date list; force SERIAL 
to keep
+                    // "traverse dependencies once per upstream date-chunk".
+                    .runMode(originalParams.getRunMode() == 
RunMode.RUN_MODE_PARALLEL ? RunMode.RUN_MODE_SERIAL
+                            : originalParams.getRunMode())
+                    .backfillDateList(upstreamBackfillDates)
+                    
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+                    // Control whether downstream will continue triggering its 
own dependencies based on
+                    // allLevelDependent flag
+                    .backfillDependentMode(downstreamDependentMode)
+                    .allLevelDependent(allLevelDependent)
+                    .executionOrder(originalParams.getExecutionOrder())
+                    .build();
+
+            BackfillWorkflowDTO dependentBackfillDTO = 
BackfillWorkflowDTO.builder()
+                    .loginUser(backfillWorkflowDTO.getLoginUser())
+                    .workflowDefinition(downstreamWorkflow)
+                    // If taskDefinitionCode is 0, it means the dependency is 
on the entire workflow.
+                    // Otherwise, backfill should start from that dependent 
node.
+                    .startNodes(aggregatedStartNodes)
+                    .failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+                    .taskDependType(backfillWorkflowDTO.getTaskDependType())
+                    .execType(backfillWorkflowDTO.getExecType())
+                    .warningType(backfillWorkflowDTO.getWarningType())
+                    .warningGroupId(downstreamWorkflow.getWarningGroupId())
+                    .runMode(dependentParams.getRunMode())
+                    
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
+                    // Align workerGroup with DependentWorkflowDefinition 
(fallback to upstream when it's null).
+                    .workerGroup(representativeDependent.getWorkerGroup() != 
null
+                            ? representativeDependent.getWorkerGroup()
+                            : backfillWorkflowDTO.getWorkerGroup())

Review Comment:
   When multiple `DependentWorkflowDefinition` entries for the same downstream 
workflow are aggregated into a single trigger (multiple `startNodes`), the 
downstream tasks may have different `workerGroup` values. The current logic 
uses only `representativeDependent.getWorkerGroup()` (first entry), which can 
route the workflow instance to an arbitrary worker group and potentially break 
execution for other start nodes. Please handle mixed worker groups explicitly 
(e.g., require they are the same, choose a deterministic fallback like the 
upstream-selected worker group, or trigger separately per worker group).



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +181,165 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        // downstreamDefinitions may contain multiple entries for the same 
downstream workflow code
+        // (different dependent task lineage). We should only traverse each 
downstream workflow once
+        // (visitedCodes check), but trigger all dependent nodes within that 
downstream workflow by
+        // aggregating distinct taskDefinitionCodes into startNodes.
+        final Map<Long, List<DependentWorkflowDefinition>> 
downstreamDefinitionsByCode =
+                downstreamDefinitions.stream()
+                        
.collect(Collectors.groupingBy(DependentWorkflowDefinition::getWorkflowDefinitionCode));
+        final Set<Long> downstreamCodes = downstreamDefinitionsByCode.keySet();
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        // queryByCodes returns multiple versions for the same workflow code, 
so we must select the correct one
+        // based on DependentWorkflowDefinition.getWorkflowDefinitionVersion().
+        final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode 
= downstreamWorkflowList.stream()
+                .collect(Collectors.groupingBy(WorkflowDefinition::getCode));
+
+        // 2) Reuse upstream business dates for downstream backfill (same 
instants/zones as the chunk passed to
+        // doBackfillWorkflow; avoids List<String> -> system-default parse -> 
dateToString drift)
+        final List<ZonedDateTime> upstreamBackfillDates = new 
ArrayList<>(backfillDateTimes);
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (Map.Entry<Long, List<DependentWorkflowDefinition>> entry : 
downstreamDefinitionsByCode.entrySet()) {
+            long downstreamCode = entry.getKey();
+            List<DependentWorkflowDefinition> dependentDefinitions = 
entry.getValue();
+
+            // Prevent self-dependency and circular dependency chains.
+            // We only traverse each downstream workflow once.
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip circular dependent workflow {}", 
downstreamCode);

Review Comment:
   `visitedCodes` is used to prevent both cycles and duplicate traversal, but 
the log message says "Skip circular dependent workflow". This is misleading for 
non-cyclic graphs (e.g., diamond dependencies) where a workflow is skipped 
because it was already visited. Consider adjusting the message to reflect 
"already visited" vs "circular".
   ```suggestion
                   log.warn("Skip already visited dependent workflow {}", 
downstreamCode);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to