Copilot commented on code in PR #18003:
URL: 
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2992008802


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -98,10 +117,12 @@ private List<Integer> doParallelBackfillWorkflow(final 
BackfillWorkflowDTO backf
 
         log.info("In parallel mode, current expectedParallelismNumber: {}", 
expectedParallelismNumber);
         final List<Integer> workflowInstanceIdList = Lists.newArrayList();
-        for (List<ZonedDateTime> stringDate : splitDateTime(listDate, 
expectedParallelismNumber)) {
-            final Integer workflowInstanceId = doBackfillWorkflow(
-                    backfillWorkflowDTO,
-                    
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+        final Set<Long> baseVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+        for (List<ZonedDateTime> dateChunk : splitDateTime(listDate, 
expectedParallelismNumber)) {
+            // Each parallel chunk should keep its own traversal context to 
avoid cross-chunk pollution.
+            final Set<Long> chunkVisitedCodes = new 
HashSet<>(baseVisitedCodes);
+            final Integer workflowInstanceId =

Review Comment:
   `splitDateTime(listDate, expectedParallelismNumber)` will throw 
`ArithmeticException` if `expectedParallelismNumber` is 0 (which is currently 
allowed by validation and can flow here). Please guard so 
`expectedParallelismNumber >= 1` before calling `splitDateTime` (e.g., treat 0 
as “no limit” and use `listDate.size()`).



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +182,119 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+

Review Comment:
   The new dependency backfill traversal in `doBackfillDependentWorkflow` 
introduces several behaviors (direct-only vs all-level recursion via 
`allLevelDependent`, cycle/duplicate skipping via `visitedCodes`, filtering 
OFFLINE workflows, missing-definition handling). The added test only asserts 
visited-code isolation across parallel chunks, but doesn’t cover these new 
branches/behaviors. Please add unit tests that assert: (1) 
`allLevelDependent=false` triggers only direct downstream, (2) cycles are 
skipped (no infinite recursion), and (3) OFFLINE/missing downstream definitions 
are not triggered.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +182,119 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        final Set<Long> downstreamCodes = downstreamDefinitions.stream()
+                .map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
+                .collect(Collectors.toCollection(LinkedHashSet::new));
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        final Map<Long, WorkflowDefinition> downstreamWorkflowMap = 
downstreamWorkflowList.stream()
+                .collect(Collectors.toMap(WorkflowDefinition::getCode, 
workflow -> workflow));
+
+        // 2) Reuse upstream business dates for downstream backfill (same 
instants/zones as the chunk passed to
+        // doBackfillWorkflow; avoids List<String> -> system-default parse -> 
dateToString drift)
+        final List<ZonedDateTime> upstreamBackfillDates = new 
ArrayList<>(backfillDateTimes);
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (DependentWorkflowDefinition dependentWorkflowDefinition : 
downstreamDefinitions) {
+            long downstreamCode = 
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+            // Prevent self-dependency and circular dependency chains
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip circular dependent workflow {}", 
downstreamCode);
+                continue;
+            }
+
+            WorkflowDefinition downstreamWorkflow = 
downstreamWorkflowMap.get(downstreamCode);
+            if (downstreamWorkflow == null) {
+                log.warn("Skip dependent workflow {}, workflow definition not 
found", downstreamCode);
+                continue;
+            }
+
+            if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+                log.warn("Skip dependent workflow {}, release state is not 
ONLINE", downstreamCode);
+                continue;
+            }
+
+            // Currently, reuse the same business date list as upstream for 
downstream backfill;
+            // later we can refine the dates based on dependency cycle 
configuration in dependentWorkflowDefinition
+            // (taskParams).
+            BackfillWorkflowDTO.BackfillParamsDTO originalParams = 
backfillWorkflowDTO.getBackfillParams();
+            boolean allLevelDependent = originalParams.isAllLevelDependent();
+            ComplementDependentMode downstreamDependentMode =
+                    allLevelDependent ? 
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+            BackfillWorkflowDTO.BackfillParamsDTO dependentParams = 
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+                    .runMode(originalParams.getRunMode())
+                    .backfillDateList(upstreamBackfillDates)
+                    
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+                    // Control whether downstream will continue triggering its 
own dependencies based on
+                    // allLevelDependent flag
+                    .backfillDependentMode(downstreamDependentMode)
+                    .allLevelDependent(allLevelDependent)
+                    .executionOrder(originalParams.getExecutionOrder())
+                    .build();
+
+            BackfillWorkflowDTO dependentBackfillDTO = 
BackfillWorkflowDTO.builder()
+                    .loginUser(backfillWorkflowDTO.getLoginUser())
+                    .workflowDefinition(downstreamWorkflow)
+                    .startNodes(null)
+                    .failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+                    .taskDependType(backfillWorkflowDTO.getTaskDependType())
+                    .execType(backfillWorkflowDTO.getExecType())
+                    .warningType(backfillWorkflowDTO.getWarningType())
+                    .warningGroupId(downstreamWorkflow.getWarningGroupId())
+                    .runMode(dependentParams.getRunMode())
+                    
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
+                    .workerGroup(backfillWorkflowDTO.getWorkerGroup())
+                    .tenantCode(backfillWorkflowDTO.getTenantCode())
+                    .environmentCode(backfillWorkflowDTO.getEnvironmentCode())
+                    .startParamList(backfillWorkflowDTO.getStartParamList())
+                    .dryRun(backfillWorkflowDTO.getDryRun())
+                    .backfillParams(dependentParams)
+                    .build();
+
+            log.info("Trigger dependent workflow {} for upstream workflow {} 
with backfill dates {}",
+                    downstreamCode, upstreamWorkflowCode,
+                    
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+
+            // 4) Mark as visiting before recursive trigger to detect cycles, 
then trigger downstream backfill
+            visitedCodes.add(downstreamCode);
+            executeWithVisitedCodes(dependentBackfillDTO, visitedCodes);

Review Comment:
   `executeWithVisitedCodes(dependentBackfillDTO, visitedCodes)` will re-apply 
the downstream DTO’s `runMode` chunking to `upstreamBackfillDates`. When the 
upstream runMode is PARALLEL, `backfillDateTimes` here is already a chunk, so 
downstream workflows can get split again (and potentially triggered multiple 
times per upstream chunk). If the intent is “for each upstream date-chunk, 
traverse dependencies once”, consider directly calling 
`doBackfillWorkflow(dependentBackfillDTO, backfillDateTimes, visitedCodes)` (or 
forcing dependent runMode to SERIAL for dependent triggers) to avoid 
double-splitting and keep traversal semantics consistent.
   ```suggestion
               // Use doBackfillWorkflow to reuse the current upstream date 
chunk without re-applying runMode chunking
               doBackfillWorkflow(dependentBackfillDTO, upstreamBackfillDates, 
visitedCodes);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +182,119 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        final Set<Long> downstreamCodes = downstreamDefinitions.stream()
+                .map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
+                .collect(Collectors.toCollection(LinkedHashSet::new));
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        final Map<Long, WorkflowDefinition> downstreamWorkflowMap = 
downstreamWorkflowList.stream()
+                .collect(Collectors.toMap(WorkflowDefinition::getCode, 
workflow -> workflow));
+
+        // 2) Reuse upstream business dates for downstream backfill (same 
instants/zones as the chunk passed to
+        // doBackfillWorkflow; avoids List<String> -> system-default parse -> 
dateToString drift)
+        final List<ZonedDateTime> upstreamBackfillDates = new 
ArrayList<>(backfillDateTimes);
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (DependentWorkflowDefinition dependentWorkflowDefinition : 
downstreamDefinitions) {
+            long downstreamCode = 
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+            // Prevent self-dependency and circular dependency chains
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip circular dependent workflow {}", 
downstreamCode);
+                continue;
+            }
+
+            WorkflowDefinition downstreamWorkflow = 
downstreamWorkflowMap.get(downstreamCode);
+            if (downstreamWorkflow == null) {
+                log.warn("Skip dependent workflow {}, workflow definition not 
found", downstreamCode);
+                continue;
+            }
+
+            if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+                log.warn("Skip dependent workflow {}, release state is not 
ONLINE", downstreamCode);
+                continue;
+            }
+
+            // Currently, reuse the same business date list as upstream for 
downstream backfill;
+            // later we can refine the dates based on dependency cycle 
configuration in dependentWorkflowDefinition
+            // (taskParams).
+            BackfillWorkflowDTO.BackfillParamsDTO originalParams = 
backfillWorkflowDTO.getBackfillParams();
+            boolean allLevelDependent = originalParams.isAllLevelDependent();
+            ComplementDependentMode downstreamDependentMode =
+                    allLevelDependent ? 
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+            BackfillWorkflowDTO.BackfillParamsDTO dependentParams = 
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+                    .runMode(originalParams.getRunMode())
+                    .backfillDateList(upstreamBackfillDates)
+                    
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+                    // Control whether downstream will continue triggering its 
own dependencies based on
+                    // allLevelDependent flag
+                    .backfillDependentMode(downstreamDependentMode)
+                    .allLevelDependent(allLevelDependent)
+                    .executionOrder(originalParams.getExecutionOrder())
+                    .build();
+
+            BackfillWorkflowDTO dependentBackfillDTO = 
BackfillWorkflowDTO.builder()
+                    .loginUser(backfillWorkflowDTO.getLoginUser())
+                    .workflowDefinition(downstreamWorkflow)
+                    .startNodes(null)
+                    .failureStrategy(backfillWorkflowDTO.getFailureStrategy())

Review Comment:
   When triggering dependent workflows, the `DependentWorkflowDefinition` 
provides routing details (`taskDefinitionCode`, `workflowDefinitionVersion`, 
`workerGroup`), but they are currently ignored (e.g., `startNodes(null)` and 
`workflowDefinition` comes from `queryByCodes`). This can trigger the wrong 
workflow version and/or run the entire downstream workflow instead of starting 
from the dependent node. Please populate `startNodes` from 
`dependentWorkflowDefinition.getTaskDefinitionCode()` and ensure the triggered 
workflow version/workerGroup align with the dependent definition (similar to 
how `ExecutorServiceImpl#createComplementDependentCommand` uses these fields).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to