Copilot commented on code in PR #18003:
URL:
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2992879911
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -79,29 +97,26 @@ private List<Integer> doSerialBackfillWorkflow(final
BackfillWorkflowDTO backfil
Collections.sort(backfillTimeList);
}
- final Integer workflowInstanceId = doBackfillWorkflow(
- backfillWorkflowDTO,
-
backfillTimeList.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+ final Integer workflowInstanceId =
doBackfillWorkflow(backfillWorkflowDTO, backfillTimeList, visitedCodes);
return Lists.newArrayList(workflowInstanceId);
}
- private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO) {
+ private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
+ final Set<Long>
visitedCodes) {
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
- Integer expectedParallelismNumber =
backfillParams.getExpectedParallelismNumber();
-
- List<ZonedDateTime> listDate = backfillParams.getBackfillDateList();
- if (expectedParallelismNumber != null) {
- expectedParallelismNumber = Math.min(listDate.size(),
expectedParallelismNumber);
- } else {
- expectedParallelismNumber = listDate.size();
- }
+ final List<ZonedDateTime> listDate =
backfillParams.getBackfillDateList();
+ final int parallelism = backfillParams.getExpectedParallelismNumber()
!= null
+ ? backfillParams.getExpectedParallelismNumber() : 0;
+ final int expectedParallelismNumber = Math.min(listDate.size(),
Math.max(parallelism, 1));
Review Comment:
In parallel mode, the default chunk count semantics changed: when
expectedParallelismNumber is null (or 0), this now forces
expectedParallelismNumber to 1, resulting in a single backfill request instead
of the previous behavior of splitting into listDate.size() chunks. This is
inconsistent with the prior implementation (and with
ExecutorServiceImpl#createComplementCommandList, where 0 means “use
listDate.size()”). Please restore the old behavior (treat null/0 as
listDate.size(), otherwise min(listDate.size(), expectedParallelismNumber))
while still guarding against divide-by-zero.
```suggestion
// If there is no date to backfill, nothing to do; avoid calling
splitDateTime with zero parts.
if (listDate == null || listDate.isEmpty()) {
log.info("In parallel mode, backfill date list is empty, no
workflow instances to trigger.");
return Collections.emptyList();
}
final Integer expectedParallelism =
backfillParams.getExpectedParallelismNumber();
final int expectedParallelismNumber;
if (expectedParallelism == null || expectedParallelism == 0) {
// Preserve old semantics: null/0 means use listDate.size()
expectedParallelismNumber = listDate.size();
} else {
// Use the smaller of list size and user-specified parallelism
expectedParallelismNumber = Math.min(listDate.size(),
expectedParallelism);
}
```
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +176,165 @@ private Integer doBackfillWorkflow(final
BackfillWorkflowDTO backfillWorkflowDTO
.dryRun(backfillWorkflowDTO.getDryRun())
.build();
- final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
- .withService(IWorkflowControlClient.class)
- .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
- .backfillTriggerWorkflow(backfillTriggerRequest);
+ final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+ triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
if (!backfillTriggerResponse.isSuccess()) {
throw new ServiceException("Backfill workflow failed: " +
backfillTriggerResponse.getMessage());
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+ final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new
HashSet<>() : visitedCodes;
+
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+ doBackfillDependentWorkflow(backfillWorkflowDTO,
backfillDateTimes, effectiveVisitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}
+ protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final
WorkflowBackfillTriggerRequest request,
+ final
Server masterServer) {
+ return Clients
+ .withService(IWorkflowControlClient.class)
+ .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
+ .backfillTriggerWorkflow(request);
+ }
+
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
- final List<String>
backfillTimeList) {
- // todo:
+ final List<ZonedDateTime>
backfillDateTimes,
+ final Set<Long> visitedCodes) {
+ // 1) Query downstream dependent workflows for the current workflow
+ final WorkflowDefinition upstreamWorkflow =
backfillWorkflowDTO.getWorkflowDefinition();
+ final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+ List<DependentWorkflowDefinition> downstreamDefinitions =
+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+ if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+ log.info("No downstream dependent workflows found for workflow
code {}", upstreamWorkflowCode);
+ return;
+ }
+ // downstreamDefinitions may contain multiple entries for the same
downstream workflow code
+ // (different dependent task lineage). We should only traverse each
downstream workflow once
+ // (visitedCodes check), but trigger all dependent nodes within that
downstream workflow by
+ // aggregating distinct taskDefinitionCodes into startNodes.
+ final Map<Long, List<DependentWorkflowDefinition>>
downstreamDefinitionsByCode =
+ downstreamDefinitions.stream()
+
.collect(Collectors.groupingBy(DependentWorkflowDefinition::getWorkflowDefinitionCode));
+ final Set<Long> downstreamCodes = downstreamDefinitionsByCode.keySet();
+ final List<WorkflowDefinition> downstreamWorkflowList =
workflowDefinitionDao.queryByCodes(downstreamCodes);
+ // Each workflow code maps to a single WorkflowDefinition (code is
unique in t_ds_workflow_definition).
+ // We still group by code to simplify lookup and keep the code robust
if this ever changes.
+ final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode
= downstreamWorkflowList.stream()
+ .collect(Collectors.groupingBy(WorkflowDefinition::getCode));
+
+ // 2) Reuse upstream business dates for downstream backfill (same
instants/zones as the chunk passed to
+ // doBackfillWorkflow; avoids List<String> -> system-default parse ->
dateToString drift)
+ final List<ZonedDateTime> upstreamBackfillDates = new
ArrayList<>(backfillDateTimes);
+
+ // 3) Iterate downstream workflows and build/trigger corresponding
BackfillWorkflowDTO
+ for (Map.Entry<Long, List<DependentWorkflowDefinition>> entry :
downstreamDefinitionsByCode.entrySet()) {
+ long downstreamCode = entry.getKey();
+ List<DependentWorkflowDefinition> dependentDefinitions =
entry.getValue();
+
+ // Prevent self-dependency and circular dependency chains.
+ // We only traverse each downstream workflow once.
+ if (visitedCodes.contains(downstreamCode)) {
+ log.warn("Skip already visited dependent workflow {}",
downstreamCode);
+ continue;
+ }
+
+ DependentWorkflowDefinition representativeDependent =
dependentDefinitions.get(0);
+
+ // Aggregate dependent nodes within the same downstream workflow.
+ // If any entry represents workflow-level dependency
(taskDefinitionCode==0),
+ // we should backfill the whole downstream workflow
(startNodes=null).
+ final boolean isWorkflowLevelDependency =
+ dependentDefinitions.stream().anyMatch(d ->
d.getTaskDefinitionCode() == 0);
+ final List<Long> aggregatedStartNodes;
+ if (isWorkflowLevelDependency) {
+ aggregatedStartNodes = null;
+ } else {
+ aggregatedStartNodes = dependentDefinitions.stream()
+
.map(DependentWorkflowDefinition::getTaskDefinitionCode)
+ .filter(code -> code != 0)
+ .distinct()
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ WorkflowDefinition downstreamWorkflow = null;
+ List<WorkflowDefinition> workflowCandidates =
downstreamWorkflowMapByCode.get(downstreamCode);
+ if (workflowCandidates != null) {
+ downstreamWorkflow =
+ workflowCandidates.stream()
+ .filter(workflow -> workflow.getVersion() ==
representativeDependent
+ .getWorkflowDefinitionVersion())
+ .findFirst()
+ .orElse(null);
+ }
+ if (downstreamWorkflow == null) {
+ log.warn("Skip dependent workflow {}, workflow definition not
found", downstreamCode);
+ continue;
+ }
+
+ if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+ log.warn("Skip dependent workflow {}, release state is not
ONLINE", downstreamCode);
+ continue;
+ }
+
+ // Currently, reuse the same business date list as upstream for
downstream backfill;
+ // later we can refine the dates based on dependency cycle
configuration in dependentWorkflowDefinition
+ // (taskParams).
+ BackfillWorkflowDTO.BackfillParamsDTO originalParams =
backfillWorkflowDTO.getBackfillParams();
+ boolean allLevelDependent = originalParams.isAllLevelDependent();
+ ComplementDependentMode downstreamDependentMode =
+ allLevelDependent ?
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+ BackfillWorkflowDTO.BackfillParamsDTO dependentParams =
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+ // When the upstream is PARALLEL, dependent triggers
should not re-apply
+ // chunking on the already sliced date list; force SERIAL
to keep
+ // "traverse dependencies once per upstream date-chunk".
+ .runMode(originalParams.getRunMode() ==
RunMode.RUN_MODE_PARALLEL ? RunMode.RUN_MODE_SERIAL
+ : originalParams.getRunMode())
+ .backfillDateList(upstreamBackfillDates)
+
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+ // Control whether downstream will continue triggering its
own dependencies based on
+ // allLevelDependent flag
+ .backfillDependentMode(downstreamDependentMode)
+ .allLevelDependent(allLevelDependent)
+ .executionOrder(originalParams.getExecutionOrder())
+ .build();
+
+ BackfillWorkflowDTO dependentBackfillDTO =
BackfillWorkflowDTO.builder()
+ .loginUser(backfillWorkflowDTO.getLoginUser())
+ .workflowDefinition(downstreamWorkflow)
+ // If taskDefinitionCode is 0, it means the dependency is
on the entire workflow.
+ // Otherwise, backfill should start from that dependent
node.
+ .startNodes(aggregatedStartNodes)
+ .failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+ .taskDependType(backfillWorkflowDTO.getTaskDependType())
+ .execType(backfillWorkflowDTO.getExecType())
+ .warningType(backfillWorkflowDTO.getWarningType())
+ .warningGroupId(downstreamWorkflow.getWarningGroupId())
+ .runMode(dependentParams.getRunMode())
+
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
+ // Align workerGroup with DependentWorkflowDefinition
(fallback to upstream when it's null).
+ .workerGroup(representativeDependent.getWorkerGroup() !=
null
+ ? representativeDependent.getWorkerGroup()
+ : backfillWorkflowDTO.getWorkerGroup())
Review Comment:
For a downstream workflow with multiple dependentDefinitions, workerGroup
can differ between entries (it’s derived from the dependent task definitions).
Using dependentDefinitions.get(0).getWorkerGroup() makes the chosen workerGroup
order-dependent and potentially non-deterministic. Please compute workerGroup
deterministically (e.g., if all non-null workerGroups match use it; otherwise
fall back to the upstream DTO workerGroup and log a warning).
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +176,165 @@ private Integer doBackfillWorkflow(final
BackfillWorkflowDTO backfillWorkflowDTO
.dryRun(backfillWorkflowDTO.getDryRun())
.build();
- final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
- .withService(IWorkflowControlClient.class)
- .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
- .backfillTriggerWorkflow(backfillTriggerRequest);
+ final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+ triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
if (!backfillTriggerResponse.isSuccess()) {
throw new ServiceException("Backfill workflow failed: " +
backfillTriggerResponse.getMessage());
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+ final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new
HashSet<>() : visitedCodes;
+
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+ doBackfillDependentWorkflow(backfillWorkflowDTO,
backfillDateTimes, effectiveVisitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}
+ protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final
WorkflowBackfillTriggerRequest request,
+ final
Server masterServer) {
+ return Clients
+ .withService(IWorkflowControlClient.class)
+ .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
+ .backfillTriggerWorkflow(request);
+ }
+
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
- final List<String>
backfillTimeList) {
- // todo:
+ final List<ZonedDateTime>
backfillDateTimes,
+ final Set<Long> visitedCodes) {
+ // 1) Query downstream dependent workflows for the current workflow
+ final WorkflowDefinition upstreamWorkflow =
backfillWorkflowDTO.getWorkflowDefinition();
+ final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+ List<DependentWorkflowDefinition> downstreamDefinitions =
+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+ if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+ log.info("No downstream dependent workflows found for workflow
code {}", upstreamWorkflowCode);
+ return;
+ }
+ // downstreamDefinitions may contain multiple entries for the same
downstream workflow code
+ // (different dependent task lineage). We should only traverse each
downstream workflow once
+ // (visitedCodes check), but trigger all dependent nodes within that
downstream workflow by
+ // aggregating distinct taskDefinitionCodes into startNodes.
+ final Map<Long, List<DependentWorkflowDefinition>>
downstreamDefinitionsByCode =
+ downstreamDefinitions.stream()
+
.collect(Collectors.groupingBy(DependentWorkflowDefinition::getWorkflowDefinitionCode));
+ final Set<Long> downstreamCodes = downstreamDefinitionsByCode.keySet();
+ final List<WorkflowDefinition> downstreamWorkflowList =
workflowDefinitionDao.queryByCodes(downstreamCodes);
+ // Each workflow code maps to a single WorkflowDefinition (code is
unique in t_ds_workflow_definition).
+ // We still group by code to simplify lookup and keep the code robust
if this ever changes.
+ final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode
= downstreamWorkflowList.stream()
+ .collect(Collectors.groupingBy(WorkflowDefinition::getCode));
+
+ // 2) Reuse upstream business dates for downstream backfill (same
instants/zones as the chunk passed to
+ // doBackfillWorkflow; avoids List<String> -> system-default parse ->
dateToString drift)
+ final List<ZonedDateTime> upstreamBackfillDates = new
ArrayList<>(backfillDateTimes);
+
+ // 3) Iterate downstream workflows and build/trigger corresponding
BackfillWorkflowDTO
+ for (Map.Entry<Long, List<DependentWorkflowDefinition>> entry :
downstreamDefinitionsByCode.entrySet()) {
+ long downstreamCode = entry.getKey();
+ List<DependentWorkflowDefinition> dependentDefinitions =
entry.getValue();
+
+ // Prevent self-dependency and circular dependency chains.
+ // We only traverse each downstream workflow once.
+ if (visitedCodes.contains(downstreamCode)) {
+ log.warn("Skip already visited dependent workflow {}",
downstreamCode);
+ continue;
+ }
+
+ DependentWorkflowDefinition representativeDependent =
dependentDefinitions.get(0);
+
+ // Aggregate dependent nodes within the same downstream workflow.
+ // If any entry represents workflow-level dependency
(taskDefinitionCode==0),
+ // we should backfill the whole downstream workflow
(startNodes=null).
+ final boolean isWorkflowLevelDependency =
+ dependentDefinitions.stream().anyMatch(d ->
d.getTaskDefinitionCode() == 0);
+ final List<Long> aggregatedStartNodes;
+ if (isWorkflowLevelDependency) {
+ aggregatedStartNodes = null;
+ } else {
+ aggregatedStartNodes = dependentDefinitions.stream()
+
.map(DependentWorkflowDefinition::getTaskDefinitionCode)
+ .filter(code -> code != 0)
+ .distinct()
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ WorkflowDefinition downstreamWorkflow = null;
+ List<WorkflowDefinition> workflowCandidates =
downstreamWorkflowMapByCode.get(downstreamCode);
+ if (workflowCandidates != null) {
+ downstreamWorkflow =
+ workflowCandidates.stream()
+ .filter(workflow -> workflow.getVersion() ==
representativeDependent
+ .getWorkflowDefinitionVersion())
+ .findFirst()
+ .orElse(null);
+ }
+ if (downstreamWorkflow == null) {
+ log.warn("Skip dependent workflow {}, workflow definition not
found", downstreamCode);
+ continue;
+ }
+
+ if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+ log.warn("Skip dependent workflow {}, release state is not
ONLINE", downstreamCode);
+ continue;
+ }
+
+ // Currently, reuse the same business date list as upstream for
downstream backfill;
+ // later we can refine the dates based on dependency cycle
configuration in dependentWorkflowDefinition
+ // (taskParams).
+ BackfillWorkflowDTO.BackfillParamsDTO originalParams =
backfillWorkflowDTO.getBackfillParams();
+ boolean allLevelDependent = originalParams.isAllLevelDependent();
+ ComplementDependentMode downstreamDependentMode =
+ allLevelDependent ?
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+ BackfillWorkflowDTO.BackfillParamsDTO dependentParams =
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+ // When the upstream is PARALLEL, dependent triggers
should not re-apply
+ // chunking on the already sliced date list; force SERIAL
to keep
+ // "traverse dependencies once per upstream date-chunk".
+ .runMode(originalParams.getRunMode() ==
RunMode.RUN_MODE_PARALLEL ? RunMode.RUN_MODE_SERIAL
+ : originalParams.getRunMode())
+ .backfillDateList(upstreamBackfillDates)
+
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+ // Control whether downstream will continue triggering its
own dependencies based on
+ // allLevelDependent flag
+ .backfillDependentMode(downstreamDependentMode)
+ .allLevelDependent(allLevelDependent)
+ .executionOrder(originalParams.getExecutionOrder())
+ .build();
+
+ BackfillWorkflowDTO dependentBackfillDTO =
BackfillWorkflowDTO.builder()
+ .loginUser(backfillWorkflowDTO.getLoginUser())
+ .workflowDefinition(downstreamWorkflow)
+ // If taskDefinitionCode is 0, it means the dependency is
on the entire workflow.
+ // Otherwise, backfill should start from that dependent
node.
+ .startNodes(aggregatedStartNodes)
+ .failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+ .taskDependType(backfillWorkflowDTO.getTaskDependType())
+ .execType(backfillWorkflowDTO.getExecType())
+ .warningType(backfillWorkflowDTO.getWarningType())
+ .warningGroupId(downstreamWorkflow.getWarningGroupId())
+ .runMode(dependentParams.getRunMode())
+
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
+ // Align workerGroup with DependentWorkflowDefinition
(fallback to upstream when it's null).
+ .workerGroup(representativeDependent.getWorkerGroup() !=
null
+ ? representativeDependent.getWorkerGroup()
+ : backfillWorkflowDTO.getWorkerGroup())
+ .tenantCode(backfillWorkflowDTO.getTenantCode())
+ .environmentCode(backfillWorkflowDTO.getEnvironmentCode())
+ .startParamList(backfillWorkflowDTO.getStartParamList())
+ .dryRun(backfillWorkflowDTO.getDryRun())
+ .backfillParams(dependentParams)
+ .build();
+
+ log.info("Trigger dependent workflow {} for upstream workflow {}
with backfill dates {}",
+ downstreamCode, upstreamWorkflowCode,
+
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+
+ // 4) Mark as visiting before recursive trigger to detect cycles,
then trigger downstream backfill
+ visitedCodes.add(downstreamCode);
+ executeWithVisitedCodes(dependentBackfillDTO, visitedCodes);
+ }
Review Comment:
Dependency traversal uses recursion via executeWithVisitedCodes(...). For
long dependency chains, this can overflow the call stack and abort the
backfill. Consider switching to an explicit stack/queue traversal (iterative
DFS/BFS) so recursion depth is bounded by heap rather than the thread stack.
##########
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
+import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class BackfillWorkflowExecutorDelegateTest {
+
+ @Spy
+ @InjectMocks
+ private BackfillWorkflowExecutorDelegate backfillWorkflowExecutorDelegate;
+
+ @Mock
+ private WorkflowLineageService workflowLineageService;
+
+ @Mock
+ private WorkflowDefinitionDao workflowDefinitionDao;
+
+ @Mock
+ private RegistryClient registryClient;
+
+ @Test
+ public void
testDoParallelBackfillWorkflow_ShouldIsolateVisitedCodesAcrossChunks() {
+ long upstreamCode = 500L;
+ long downstreamCode = 600L;
+ WorkflowDefinition upstreamWorkflow =
+
WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build();
+ User loginUser = new User();
+ loginUser.setId(1);
+ WorkflowDefinition downstreamWorkflow =
+
WorkflowDefinition.builder().code(downstreamCode).releaseState(ReleaseState.ONLINE).build();
+ DependentWorkflowDefinition dep = new DependentWorkflowDefinition();
+ dep.setWorkflowDefinitionCode(downstreamCode);
+
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode))
+ .thenReturn(Collections.singletonList(dep));
+
when(workflowDefinitionDao.queryByCodes(Collections.singleton(downstreamCode)))
+ .thenReturn(Collections.singletonList(downstreamWorkflow));
+
+ List<ZonedDateTime> dates = Arrays.asList(
+ ZonedDateTime.parse("2026-02-01T00:00:00Z"),
+ ZonedDateTime.parse("2026-02-02T00:00:00Z"),
+ ZonedDateTime.parse("2026-02-03T00:00:00Z"));
+ BackfillWorkflowDTO.BackfillParamsDTO params =
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+ .runMode(RunMode.RUN_MODE_PARALLEL)
+ .backfillDateList(dates)
+ .expectedParallelismNumber(2)
+ .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT)
+ .allLevelDependent(true)
+ .executionOrder(ExecutionOrder.ASC_ORDER)
+ .build();
+ BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder()
Review Comment:
Parallel-mode chunking semantics for expectedParallelismNumber null/0 are
not covered by tests here. Since the production API allows
expectedParallelismNumber to be omitted (null) and legacy behavior treats 0 as
“use listDate.size()”, please add a unit test that asserts the number of
chunks/trigger calls when expectedParallelismNumber is null (and/or 0).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]