Copilot commented on code in PR #18003:
URL: 
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2978763456


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -166,13 +188,100 @@ private Integer doBackfillWorkflow(final 
BackfillWorkflowDTO backfillWorkflowDTO
         }
         final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = 
backfillWorkflowDTO.getBackfillParams();
         if (backfillParams.getBackfillDependentMode() == 
ComplementDependentMode.ALL_DEPENDENT) {
-            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+            final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new 
HashSet<>() : visitedCodes;
+            
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+            doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, 
effectiveVisitedCodes);
         }
         return backfillTriggerResponse.getWorkflowInstanceId();
     }
 
     private void doBackfillDependentWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
-                                             final List<String> 
backfillTimeList) {
-        // todo:
+                                             final List<String> 
backfillTimeList,
+                                             final Set<Long> visitedCodes) {
+        // 1) Query downstream dependent workflows for the current workflow
+        final WorkflowDefinition upstreamWorkflow = 
backfillWorkflowDTO.getWorkflowDefinition();
+        final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+        List<DependentWorkflowDefinition> downstreamDefinitions =
+                
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+        if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+            log.info("No downstream dependent workflows found for workflow 
code {}", upstreamWorkflowCode);
+            return;
+        }
+
+        // 2) Convert upstream backfill time from string to ZonedDateTime as 
the base business dates for downstream
+        // backfill
+        final List<ZonedDateTime> upstreamBackfillDates = 
backfillTimeList.stream()
+                .map(DateUtils::stringToZoneDateTime)
+                .collect(Collectors.toList());
+
+        // 3) Iterate downstream workflows and build/trigger corresponding 
BackfillWorkflowDTO
+        for (DependentWorkflowDefinition dependentWorkflowDefinition : 
downstreamDefinitions) {
+            long downstreamCode = 
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+            // Prevent self-dependency and circular dependency chains
+            if (visitedCodes.contains(downstreamCode)) {
+                log.warn("Skip circular dependent workflow {}", 
downstreamCode);
+                continue;
+            }
+
+            WorkflowDefinition downstreamWorkflow =
+                    
workflowDefinitionDao.queryByCode(downstreamCode).orElse(null);
+            if (downstreamWorkflow == null) {
+                log.warn("Skip dependent workflow {}, workflow definition not 
found", downstreamCode);
+                continue;

Review Comment:
   `doBackfillDependentWorkflow` queries each downstream workflow definition 
one-by-one via `workflowDefinitionDao.queryByCode(downstreamCode)`, which will 
result in an N+1 query pattern for workflows with many downstream dependencies. 
Since `WorkflowDefinitionDao#queryByCodes(Collection<Long>)` exists, consider 
collecting downstream codes first and fetching definitions in a single batch, 
then iterating in-memory (still applying the ONLINE filter and visited/cycle 
checks).



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -101,7 +121,8 @@ private List<Integer> doParallelBackfillWorkflow(final 
BackfillWorkflowDTO backf
         for (List<ZonedDateTime> stringDate : splitDateTime(listDate, 
expectedParallelismNumber)) {
             final Integer workflowInstanceId = doBackfillWorkflow(
                     backfillWorkflowDTO,
-                    
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+                    
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()),
+                    visitedCodes);
             workflowInstanceIdList.add(workflowInstanceId);

Review Comment:
   In parallel backfill mode, the same mutable `visitedCodes` instance is 
passed into every `doBackfillWorkflow(...)` call. Since `visitedCodes` is also 
used to prevent duplicate/cyclic dependent triggers, the first chunk will add 
downstream codes and later chunks will skip triggering downstream workflows, 
causing dependent backfill to only run for the first subset of dates. Consider 
scoping `visitedCodes` to a single dependency traversal per backfill chunk 
(e.g., pass `null` or a fresh copy like `new HashSet<>(visitedCodes)` for each 
split) so downstream workflows are triggered for each chunk’s 
`backfillTimeList`.



##########
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java:
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
+import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
+
+import java.lang.reflect.Method;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class BackfillWorkflowExecutorDelegateTest {
+
+    @Spy
+    @InjectMocks
+    private BackfillWorkflowExecutorDelegate backfillWorkflowExecutorDelegate;
+
+    @Mock
+    private WorkflowLineageService workflowLineageService;
+
+    @Mock
+    private WorkflowDefinitionDao workflowDefinitionDao;
+
+    @Test
+    public void testDoBackfillDependentWorkflow_NoDownstreamDefinitions() 
throws Exception {
+        long upstreamCode = 1L;
+        WorkflowDefinition upstreamWorkflow =
+                WorkflowDefinition.builder()
+                        .code(upstreamCode)
+                        .releaseState(ReleaseState.ONLINE)
+                        .build();
+
+        BackfillWorkflowDTO.BackfillParamsDTO params = 
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+                .runMode(RunMode.RUN_MODE_SERIAL)
+                .backfillDateList(Collections.<ZonedDateTime>emptyList())
+                .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT)
+                .allLevelDependent(true)
+                .executionOrder(ExecutionOrder.ASC_ORDER)
+                .build();
+

Review Comment:
   The new dependent-triggering behavior isn’t covered for 
`RunMode.RUN_MODE_PARALLEL`. All tests build params with `RUN_MODE_SERIAL`, so 
regressions around splitting dates / triggering dependencies per chunk (and 
interaction with `visitedCodes`) won’t be caught. Adding a test that uses 
`RUN_MODE_PARALLEL` with multiple backfill dates and asserts downstream 
workflows are triggered for each split would improve coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to