Copilot commented on code in PR #18003:
URL: 
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2987147870


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -57,20 +65,32 @@ public class BackfillWorkflowExecutorDelegate implements 
IExecutorDelegate<Backf
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private WorkflowLineageService workflowLineageService;
+
+    @Autowired
+    private WorkflowDefinitionDao workflowDefinitionDao;
+
     @Autowired
     private RegistryClient registryClient;
 
     @Override
     public List<Integer> execute(final BackfillWorkflowDTO 
backfillWorkflowDTO) {
+        return executeWithVisitedCodes(backfillWorkflowDTO, null);
+    }
+
+    List<Integer> executeWithVisitedCodes(final BackfillWorkflowDTO 
backfillWorkflowDTO,
+                                          final Set<Long> visitedCodes) {
         // todo: directly call the master api to do backfill
         if (backfillWorkflowDTO.getBackfillParams().getRunMode() == 
RunMode.RUN_MODE_SERIAL) {
-            return doSerialBackfillWorkflow(backfillWorkflowDTO);
+            return doSerialBackfillWorkflow(backfillWorkflowDTO, visitedCodes);
         } else {
-            return doParallelBackfillWorkflow(backfillWorkflowDTO);
+            return doParallelBackfillWorkflow(backfillWorkflowDTO, 
visitedCodes);
         }
     }

Review Comment:
   The PR description currently states "This pull request is code cleanup 
without any test coverage", but this change introduces new 
dependency-triggering behavior and adds a new test class. Please update the PR 
description/verify section to reflect the actual behavior change and how it is 
tested (or why tests are sufficient).



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +184,120 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, 
effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<String> 
backfillTimeList,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        final Set<Long> downstreamCodes = downstreamDefinitions.stream()
+                .map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
+                .collect(Collectors.toCollection(LinkedHashSet::new));
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        final Map<Long, WorkflowDefinition> downstreamWorkflowMap = 
downstreamWorkflowList.stream()
+                .collect(Collectors.toMap(WorkflowDefinition::getCode, 
workflow -> workflow));
+
+        // 2) Convert upstream backfill time from string to ZonedDateTime as 
the base business dates for downstream
+        // backfill
+        final List<ZonedDateTime> upstreamBackfillDates = 
backfillTimeList.stream()
+                .map(DateUtils::stringToZoneDateTime)
+                .collect(Collectors.toList());

Review Comment:
   `doBackfillDependentWorkflow` converts `backfillTimeList` (strings) to 
`ZonedDateTime` via `DateUtils.stringToZoneDateTime`, then later downstream 
execution converts those `ZonedDateTime`s back to strings via 
`DateUtils.dateToString`. Because `stringToZoneDateTime` interprets the input 
using the thread-local/default timezone but returns a `ZonedDateTime` in 
`ZoneId.systemDefault()`, this round-trip can shift the local date/time when 
the request timezone differs from the JVM default, causing downstream workflows 
to be backfilled for the wrong business dates. Avoid the 
string->ZonedDateTime->string round-trip by propagating the original 
`ZonedDateTime` chunk list into downstream params, or ensure parsing/formatting 
uses the same timezone consistently (e.g., the request/thread-local timezone).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to