det101 commented on code in PR #18003:
URL: 
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2993025631


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +176,165 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        // downstreamDefinitions may contain multiple entries for the same 
downstream workflow code
+        // (different dependent task lineage). We should only traverse each 
downstream workflow once
+        // (visitedCodes check), but trigger all dependent nodes within that 
downstream workflow by
+        // aggregating distinct taskDefinitionCodes into startNodes.
+        final Map<Long, List<DependentWorkflowDefinition>> 
downstreamDefinitionsByCode =
+                downstreamDefinitions.stream()
+                        
.collect(Collectors.groupingBy(DependentWorkflowDefinition::getWorkflowDefinitionCode));
+        final Set<Long> downstreamCodes = downstreamDefinitionsByCode.keySet();
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        // Each workflow code maps to a single WorkflowDefinition (code is 
unique in t_ds_workflow_definition).
+        // We still group by code to simplify lookup and keep the code robust 
if this ever changes.
+        final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode 
= downstreamWorkflowList.stream()
+                .collect(Collectors.groupingBy(WorkflowDefinition::getCode));
+
+        // 2) Reuse upstream business dates for downstream backfill (same 
instants/zones as the chunk passed to
+        // doBackfillWorkflow; avoids List<String> -> system-default parse -> 
dateToString drift)
+        final List<ZonedDateTime> upstreamBackfillDates = new 
ArrayList<>(backfillDateTimes);
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (Map.Entry<Long, List<DependentWorkflowDefinition>> entry : 
downstreamDefinitionsByCode.entrySet()) {
+            long downstreamCode = entry.getKey();
+            List<DependentWorkflowDefinition> dependentDefinitions = 
entry.getValue();
+
+            // Prevent self-dependency and circular dependency chains.
+            // We only traverse each downstream workflow once.
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip already visited dependent workflow {}", 
downstreamCode);
+                continue;
+            }
+
+            DependentWorkflowDefinition representativeDependent = 
dependentDefinitions.get(0);
+
+            // Aggregate dependent nodes within the same downstream workflow.
+            // If any entry represents workflow-level dependency 
(taskDefinitionCode==0),
+            // we should backfill the whole downstream workflow 
(startNodes=null).
+            final boolean isWorkflowLevelDependency =
+                    dependentDefinitions.stream().anyMatch(d -> 
d.getTaskDefinitionCode() == 0);
+            final List<Long> aggregatedStartNodes;
+            if (isWorkflowLevelDependency) {
+                aggregatedStartNodes = null;
+            } else {
+                aggregatedStartNodes = dependentDefinitions.stream()
+                        
.map(DependentWorkflowDefinition::getTaskDefinitionCode)
+                        .filter(code -> code != 0)
+                        .distinct()
+                        .sorted()
+                        .collect(Collectors.toList());
+            }
+
+            WorkflowDefinition downstreamWorkflow = null;
+            List<WorkflowDefinition> workflowCandidates = 
downstreamWorkflowMapByCode.get(downstreamCode);
+            if (workflowCandidates != null) {
+                downstreamWorkflow =
+                        workflowCandidates.stream()
+                                .filter(workflow -> workflow.getVersion() == 
representativeDependent
+                                        .getWorkflowDefinitionVersion())
+                                .findFirst()
+                                .orElse(null);
+            }
+            if (downstreamWorkflow == null) {
+                log.warn("Skip dependent workflow {}, workflow definition not 
found", downstreamCode);
+                continue;
+            }
+
+            if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+                log.warn("Skip dependent workflow {}, release state is not 
ONLINE", downstreamCode);
+                continue;
+            }
+
+            // Currently, reuse the same business date list as upstream for 
downstream backfill;
+            // later we can refine the dates based on dependency cycle 
configuration in dependentWorkflowDefinition
+            // (taskParams).
+            BackfillWorkflowDTO.BackfillParamsDTO originalParams = 
backfillWorkflowDTO.getBackfillParams();
+            boolean allLevelDependent = originalParams.isAllLevelDependent();
+            ComplementDependentMode downstreamDependentMode =
+                    allLevelDependent ? 
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+            BackfillWorkflowDTO.BackfillParamsDTO dependentParams = 
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+                    // When the upstream is PARALLEL, dependent triggers 
should not re-apply
+                    // chunking on the already sliced date list; force SERIAL 
to keep
+                    // "traverse dependencies once per upstream date-chunk".
+                    .runMode(originalParams.getRunMode() == 
RunMode.RUN_MODE_PARALLEL ? RunMode.RUN_MODE_SERIAL
+                            : originalParams.getRunMode())
+                    .backfillDateList(upstreamBackfillDates)
+                    
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+                    // Control whether downstream will continue triggering its 
own dependencies based on
+                    // allLevelDependent flag
+                    .backfillDependentMode(downstreamDependentMode)
+                    .allLevelDependent(allLevelDependent)
+                    .executionOrder(originalParams.getExecutionOrder())
+                    .build();
+
+            BackfillWorkflowDTO dependentBackfillDTO = 
BackfillWorkflowDTO.builder()
+                    .loginUser(backfillWorkflowDTO.getLoginUser())
+                    .workflowDefinition(downstreamWorkflow)
+                    // If taskDefinitionCode is 0, it means the dependency is 
on the entire workflow.
+                    // Otherwise, backfill should start from that dependent 
node.
+                    .startNodes(aggregatedStartNodes)
+                    .failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+                    .taskDependType(backfillWorkflowDTO.getTaskDependType())
+                    .execType(backfillWorkflowDTO.getExecType())
+                    .warningType(backfillWorkflowDTO.getWarningType())
+                    .warningGroupId(downstreamWorkflow.getWarningGroupId())
+                    .runMode(dependentParams.getRunMode())
+                    
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
+                    // Align workerGroup with DependentWorkflowDefinition 
(fallback to upstream when it's null).
+                    .workerGroup(representativeDependent.getWorkerGroup() != 
null
+                            ? representativeDependent.getWorkerGroup()
+                            : backfillWorkflowDTO.getWorkerGroup())
+                    .tenantCode(backfillWorkflowDTO.getTenantCode())
+                    .environmentCode(backfillWorkflowDTO.getEnvironmentCode())
+                    .startParamList(backfillWorkflowDTO.getStartParamList())
+                    .dryRun(backfillWorkflowDTO.getDryRun())
+                    .backfillParams(dependentParams)
+                    .build();
+
+            log.info("Trigger dependent workflow {} for upstream workflow {} 
with backfill dates {}",
+                    downstreamCode, upstreamWorkflowCode,
+                    
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+
+            // 4) Mark as visiting before recursive trigger to detect cycles, 
then trigger downstream backfill
+            visitedCodes.add(downstreamCode);
+            executeWithVisitedCodes(dependentBackfillDTO, visitedCodes);
+        }

Review Comment:
   Situations where the actual scene layer count falls short of requirements, 
necessitating optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to