Copilot commented on code in PR #18003:
URL: 
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2992090911


##########
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
+import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
+import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest;
+import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class BackfillWorkflowExecutorDelegateTest {
+
+    @Spy
+    @InjectMocks
+    private BackfillWorkflowExecutorDelegate backfillWorkflowExecutorDelegate;
+
+    @Mock
+    private WorkflowLineageService workflowLineageService;
+
+    @Mock
+    private WorkflowDefinitionDao workflowDefinitionDao;
+
+    @Mock
+    private RegistryClient registryClient;
+
+    @Test
+    public void 
testDoParallelBackfillWorkflow_ShouldIsolateVisitedCodesAcrossChunks() {
+        long upstreamCode = 500L;
+        long downstreamCode = 600L;
+        WorkflowDefinition upstreamWorkflow =
+                
WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build();
+        User loginUser = new User();

Review Comment:
   The PR description currently states this is "code cleanup without any test 
coverage", but this PR adds a new unit test class with multiple test cases. 
Please update the PR description/checklist to reflect that tests were added (or 
which existing tests cover the change).



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +182,141 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        final Set<Long> downstreamCodes = downstreamDefinitions.stream()
+                .map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
+                .collect(Collectors.toCollection(LinkedHashSet::new));
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        // queryByCodes returns multiple versions for the same workflow code, 
so we must select the correct one
+        // based on DependentWorkflowDefinition.getWorkflowDefinitionVersion().
+        final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode 
= downstreamWorkflowList.stream()
+                .collect(Collectors.groupingBy(WorkflowDefinition::getCode));
+
+        // 2) Reuse upstream business dates for downstream backfill (same 
instants/zones as the chunk passed to
+        // doBackfillWorkflow; avoids List<String> -> system-default parse -> 
dateToString drift)
+        final List<ZonedDateTime> upstreamBackfillDates = new 
ArrayList<>(backfillDateTimes);
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (DependentWorkflowDefinition dependentWorkflowDefinition : 
downstreamDefinitions) {
+            long downstreamCode = 
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+            // Prevent self-dependency and circular dependency chains
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip circular dependent workflow {}", 
downstreamCode);
+                continue;
+            }
+
+            WorkflowDefinition downstreamWorkflow = null;
+            List<WorkflowDefinition> workflowCandidates = 
downstreamWorkflowMapByCode.get(downstreamCode);
+            if (workflowCandidates != null) {
+                downstreamWorkflow =
+                        workflowCandidates.stream()
+                                .filter(workflow -> workflow.getVersion() == 
dependentWorkflowDefinition
+                                        .getWorkflowDefinitionVersion())
+                                .findFirst()
+                                .orElse(workflowCandidates.get(0));

Review Comment:
   When selecting the downstream `WorkflowDefinition`, falling back to 
`workflowCandidates.get(0)` if the requested `workflowDefinitionVersion` is not 
found can trigger a backfill on an arbitrary version (ordering is not 
guaranteed by `queryByCodes`). It would be safer to require an exact version 
match and skip/log when it's absent, or query by (code, version) explicitly.
   ```suggestion
                                   .orElse(null);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +182,141 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
                 .dryRun(backfillWorkflowDTO.getDryRun())
                 .build();
 
-        final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
-                .withService(IWorkflowControlClient.class)
-                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
-                .backfillTriggerWorkflow(backfillTriggerRequest);
+        final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+                triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
         if (!backfillTriggerResponse.isSuccess()) {
             throw new ServiceException("Backfill workflow failed: " + 
backfillTriggerResponse.getMessage());
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, 
backfillDateTimes, effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
+    protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final 
WorkflowBackfillTriggerRequest request,
+                                                                      final 
Server masterServer) {
+        return Clients
+                .withService(IWorkflowControlClient.class)
+                .withHost(masterServer.getHost() + ":" + 
masterServer.getPort())
+                .backfillTriggerWorkflow(request);
+    }
+
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<ZonedDateTime> 
backfillDateTimes,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+        final Set<Long> downstreamCodes = downstreamDefinitions.stream()
+                .map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
+                .collect(Collectors.toCollection(LinkedHashSet::new));
+        final List<WorkflowDefinition> downstreamWorkflowList = 
workflowDefinitionDao.queryByCodes(downstreamCodes);
+        // queryByCodes returns multiple versions for the same workflow code, 
so we must select the correct one
+        // based on DependentWorkflowDefinition.getWorkflowDefinitionVersion().
+        final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode 
= downstreamWorkflowList.stream()
+                .collect(Collectors.groupingBy(WorkflowDefinition::getCode));
+
+        // 2) Reuse upstream business dates for downstream backfill (same 
instants/zones as the chunk passed to
+        // doBackfillWorkflow; avoids List<String> -> system-default parse -> 
dateToString drift)
+        final List<ZonedDateTime> upstreamBackfillDates = new 
ArrayList<>(backfillDateTimes);
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (DependentWorkflowDefinition dependentWorkflowDefinition : 
downstreamDefinitions) {
+            long downstreamCode = 
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+            // Prevent self-dependency and circular dependency chains
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip circular dependent workflow {}", 
downstreamCode);
+                continue;
+            }

Review Comment:
   `downstreamDefinitions` can contain multiple entries for the same downstream 
workflow code (one per dependent task lineage). With the current per-entry 
loop, `visitedCodes` will cause subsequent entries for the same downstream 
workflow to be skipped, and only a single `taskDefinitionCode` will be used as 
`startNodes`. This can miss triggering other dependent nodes in the same 
downstream workflow. Consider grouping by `workflowDefinitionCode` first, 
aggregating distinct non-zero `taskDefinitionCode`s into a single `startNodes` 
list (or `null` if any entry is workflow-level), then applying the 
`visitedCodes` check once per downstream workflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to