Copilot commented on code in PR #18003:
URL:
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2992008802
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -98,10 +117,12 @@ private List<Integer> doParallelBackfillWorkflow(final
BackfillWorkflowDTO backf
log.info("In parallel mode, current expectedParallelismNumber: {}",
expectedParallelismNumber);
final List<Integer> workflowInstanceIdList = Lists.newArrayList();
- for (List<ZonedDateTime> stringDate : splitDateTime(listDate,
expectedParallelismNumber)) {
- final Integer workflowInstanceId = doBackfillWorkflow(
- backfillWorkflowDTO,
-
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+ final Set<Long> baseVisitedCodes = visitedCodes == null ? new
HashSet<>() : visitedCodes;
+ for (List<ZonedDateTime> dateChunk : splitDateTime(listDate,
expectedParallelismNumber)) {
+ // Each parallel chunk should keep its own traversal context to
avoid cross-chunk pollution.
+ final Set<Long> chunkVisitedCodes = new
HashSet<>(baseVisitedCodes);
+ final Integer workflowInstanceId =
Review Comment:
`splitDateTime(listDate, expectedParallelismNumber)` will throw
`ArithmeticException` if `expectedParallelismNumber` is 0 (which is currently
allowed by validation and can flow here). Please guard so
`expectedParallelismNumber >= 1` before calling `splitDateTime` (e.g., treat 0
as “no limit” and use `listDate.size()`).
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +182,119 @@ private Integer doBackfillWorkflow(final
BackfillWorkflowDTO backfillWorkflowDTO
.dryRun(backfillWorkflowDTO.getDryRun())
.build();
- final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
- .withService(IWorkflowControlClient.class)
- .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
- .backfillTriggerWorkflow(backfillTriggerRequest);
+ final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+ triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
if (!backfillTriggerResponse.isSuccess()) {
throw new ServiceException("Backfill workflow failed: " +
backfillTriggerResponse.getMessage());
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+ final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new
HashSet<>() : visitedCodes;
+
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+ doBackfillDependentWorkflow(backfillWorkflowDTO,
backfillDateTimes, effectiveVisitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}
+ protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final
WorkflowBackfillTriggerRequest request,
+ final
Server masterServer) {
+ return Clients
+ .withService(IWorkflowControlClient.class)
+ .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
+ .backfillTriggerWorkflow(request);
+ }
+
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
- final List<String>
backfillTimeList) {
- // todo:
+ final List<ZonedDateTime>
backfillDateTimes,
+ final Set<Long> visitedCodes) {
+ // 1) Query downstream dependent workflows for the current workflow
+ final WorkflowDefinition upstreamWorkflow =
backfillWorkflowDTO.getWorkflowDefinition();
+ final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+ List<DependentWorkflowDefinition> downstreamDefinitions =
+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
Review Comment:
The new dependency backfill traversal in `doBackfillDependentWorkflow`
introduces several behaviors (direct-only vs all-level recursion via
`allLevelDependent`, cycle/duplicate skipping via `visitedCodes`, filtering
OFFLINE workflows, missing-definition handling). The added test only asserts
visited-code isolation across parallel chunks, but doesn’t cover these new
branches/behaviors. Please add unit tests that assert: (1)
`allLevelDependent=false` triggers only direct downstream, (2) cycles are
skipped (no infinite recursion), and (3) OFFLINE/missing downstream definitions
are not triggered.
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +182,119 @@ private Integer doBackfillWorkflow(final
BackfillWorkflowDTO backfillWorkflowDTO
.dryRun(backfillWorkflowDTO.getDryRun())
.build();
- final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
- .withService(IWorkflowControlClient.class)
- .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
- .backfillTriggerWorkflow(backfillTriggerRequest);
+ final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+ triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
if (!backfillTriggerResponse.isSuccess()) {
throw new ServiceException("Backfill workflow failed: " +
backfillTriggerResponse.getMessage());
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+ final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new
HashSet<>() : visitedCodes;
+
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+ doBackfillDependentWorkflow(backfillWorkflowDTO,
backfillDateTimes, effectiveVisitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}
+ protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final
WorkflowBackfillTriggerRequest request,
+ final
Server masterServer) {
+ return Clients
+ .withService(IWorkflowControlClient.class)
+ .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
+ .backfillTriggerWorkflow(request);
+ }
+
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
- final List<String>
backfillTimeList) {
- // todo:
+ final List<ZonedDateTime>
backfillDateTimes,
+ final Set<Long> visitedCodes) {
+ // 1) Query downstream dependent workflows for the current workflow
+ final WorkflowDefinition upstreamWorkflow =
backfillWorkflowDTO.getWorkflowDefinition();
+ final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+ List<DependentWorkflowDefinition> downstreamDefinitions =
+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+ if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+ log.info("No downstream dependent workflows found for workflow
code {}", upstreamWorkflowCode);
+ return;
+ }
+ final Set<Long> downstreamCodes = downstreamDefinitions.stream()
+ .map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ final List<WorkflowDefinition> downstreamWorkflowList =
workflowDefinitionDao.queryByCodes(downstreamCodes);
+ final Map<Long, WorkflowDefinition> downstreamWorkflowMap =
downstreamWorkflowList.stream()
+ .collect(Collectors.toMap(WorkflowDefinition::getCode,
workflow -> workflow));
+
+ // 2) Reuse upstream business dates for downstream backfill (same
instants/zones as the chunk passed to
+ // doBackfillWorkflow; avoids List<String> -> system-default parse ->
dateToString drift)
+ final List<ZonedDateTime> upstreamBackfillDates = new
ArrayList<>(backfillDateTimes);
+
+ // 3) Iterate downstream workflows and build/trigger corresponding
BackfillWorkflowDTO
+ for (DependentWorkflowDefinition dependentWorkflowDefinition :
downstreamDefinitions) {
+ long downstreamCode =
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+ // Prevent self-dependency and circular dependency chains
+ if (visitedCodes.contains(downstreamCode)) {
+ log.warn("Skip circular dependent workflow {}",
downstreamCode);
+ continue;
+ }
+
+ WorkflowDefinition downstreamWorkflow =
downstreamWorkflowMap.get(downstreamCode);
+ if (downstreamWorkflow == null) {
+ log.warn("Skip dependent workflow {}, workflow definition not
found", downstreamCode);
+ continue;
+ }
+
+ if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+ log.warn("Skip dependent workflow {}, release state is not
ONLINE", downstreamCode);
+ continue;
+ }
+
+ // Currently, reuse the same business date list as upstream for
downstream backfill;
+ // later we can refine the dates based on dependency cycle
configuration in dependentWorkflowDefinition
+ // (taskParams).
+ BackfillWorkflowDTO.BackfillParamsDTO originalParams =
backfillWorkflowDTO.getBackfillParams();
+ boolean allLevelDependent = originalParams.isAllLevelDependent();
+ ComplementDependentMode downstreamDependentMode =
+ allLevelDependent ?
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+ BackfillWorkflowDTO.BackfillParamsDTO dependentParams =
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+ .runMode(originalParams.getRunMode())
+ .backfillDateList(upstreamBackfillDates)
+
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+ // Control whether downstream will continue triggering its
own dependencies based on
+ // allLevelDependent flag
+ .backfillDependentMode(downstreamDependentMode)
+ .allLevelDependent(allLevelDependent)
+ .executionOrder(originalParams.getExecutionOrder())
+ .build();
+
+ BackfillWorkflowDTO dependentBackfillDTO =
BackfillWorkflowDTO.builder()
+ .loginUser(backfillWorkflowDTO.getLoginUser())
+ .workflowDefinition(downstreamWorkflow)
+ .startNodes(null)
+ .failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+ .taskDependType(backfillWorkflowDTO.getTaskDependType())
+ .execType(backfillWorkflowDTO.getExecType())
+ .warningType(backfillWorkflowDTO.getWarningType())
+ .warningGroupId(downstreamWorkflow.getWarningGroupId())
+ .runMode(dependentParams.getRunMode())
+
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
+ .workerGroup(backfillWorkflowDTO.getWorkerGroup())
+ .tenantCode(backfillWorkflowDTO.getTenantCode())
+ .environmentCode(backfillWorkflowDTO.getEnvironmentCode())
+ .startParamList(backfillWorkflowDTO.getStartParamList())
+ .dryRun(backfillWorkflowDTO.getDryRun())
+ .backfillParams(dependentParams)
+ .build();
+
+ log.info("Trigger dependent workflow {} for upstream workflow {}
with backfill dates {}",
+ downstreamCode, upstreamWorkflowCode,
+
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+
+ // 4) Mark as visiting before recursive trigger to detect cycles,
then trigger downstream backfill
+ visitedCodes.add(downstreamCode);
+ executeWithVisitedCodes(dependentBackfillDTO, visitedCodes);
Review Comment:
`executeWithVisitedCodes(dependentBackfillDTO, visitedCodes)` will re-apply
the downstream DTO’s `runMode` chunking to `upstreamBackfillDates`. When the
upstream runMode is PARALLEL, `backfillDateTimes` here is already a chunk, so
downstream workflows can get split again (and potentially triggered multiple
times per upstream chunk). If the intent is “for each upstream date-chunk,
traverse dependencies once”, consider directly calling
`doBackfillWorkflow(dependentBackfillDTO, backfillDateTimes, visitedCodes)` (or
forcing dependent runMode to SERIAL for dependent triggers) to avoid
double-splitting and keep traversal semantics consistent.
```suggestion
// Use doBackfillWorkflow to reuse the current upstream date
chunk without re-applying runMode chunking
doBackfillWorkflow(dependentBackfillDTO, upstreamBackfillDates,
visitedCodes);
```
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +182,119 @@ private Integer doBackfillWorkflow(final
BackfillWorkflowDTO backfillWorkflowDTO
.dryRun(backfillWorkflowDTO.getDryRun())
.build();
- final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
- .withService(IWorkflowControlClient.class)
- .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
- .backfillTriggerWorkflow(backfillTriggerRequest);
+ final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+ triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
if (!backfillTriggerResponse.isSuccess()) {
throw new ServiceException("Backfill workflow failed: " +
backfillTriggerResponse.getMessage());
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+ final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new
HashSet<>() : visitedCodes;
+
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+ doBackfillDependentWorkflow(backfillWorkflowDTO,
backfillDateTimes, effectiveVisitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}
+ protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final
WorkflowBackfillTriggerRequest request,
+ final
Server masterServer) {
+ return Clients
+ .withService(IWorkflowControlClient.class)
+ .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
+ .backfillTriggerWorkflow(request);
+ }
+
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
- final List<String>
backfillTimeList) {
- // todo:
+ final List<ZonedDateTime>
backfillDateTimes,
+ final Set<Long> visitedCodes) {
+ // 1) Query downstream dependent workflows for the current workflow
+ final WorkflowDefinition upstreamWorkflow =
backfillWorkflowDTO.getWorkflowDefinition();
+ final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+ List<DependentWorkflowDefinition> downstreamDefinitions =
+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+ if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+ log.info("No downstream dependent workflows found for workflow
code {}", upstreamWorkflowCode);
+ return;
+ }
+ final Set<Long> downstreamCodes = downstreamDefinitions.stream()
+ .map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ final List<WorkflowDefinition> downstreamWorkflowList =
workflowDefinitionDao.queryByCodes(downstreamCodes);
+ final Map<Long, WorkflowDefinition> downstreamWorkflowMap =
downstreamWorkflowList.stream()
+ .collect(Collectors.toMap(WorkflowDefinition::getCode,
workflow -> workflow));
+
+ // 2) Reuse upstream business dates for downstream backfill (same
instants/zones as the chunk passed to
+ // doBackfillWorkflow; avoids List<String> -> system-default parse ->
dateToString drift)
+ final List<ZonedDateTime> upstreamBackfillDates = new
ArrayList<>(backfillDateTimes);
+
+ // 3) Iterate downstream workflows and build/trigger corresponding
BackfillWorkflowDTO
+ for (DependentWorkflowDefinition dependentWorkflowDefinition :
downstreamDefinitions) {
+ long downstreamCode =
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+ // Prevent self-dependency and circular dependency chains
+ if (visitedCodes.contains(downstreamCode)) {
+ log.warn("Skip circular dependent workflow {}",
downstreamCode);
+ continue;
+ }
+
+ WorkflowDefinition downstreamWorkflow =
downstreamWorkflowMap.get(downstreamCode);
+ if (downstreamWorkflow == null) {
+ log.warn("Skip dependent workflow {}, workflow definition not
found", downstreamCode);
+ continue;
+ }
+
+ if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+ log.warn("Skip dependent workflow {}, release state is not
ONLINE", downstreamCode);
+ continue;
+ }
+
+ // Currently, reuse the same business date list as upstream for
downstream backfill;
+ // later we can refine the dates based on dependency cycle
configuration in dependentWorkflowDefinition
+ // (taskParams).
+ BackfillWorkflowDTO.BackfillParamsDTO originalParams =
backfillWorkflowDTO.getBackfillParams();
+ boolean allLevelDependent = originalParams.isAllLevelDependent();
+ ComplementDependentMode downstreamDependentMode =
+ allLevelDependent ?
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+ BackfillWorkflowDTO.BackfillParamsDTO dependentParams =
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+ .runMode(originalParams.getRunMode())
+ .backfillDateList(upstreamBackfillDates)
+
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+ // Control whether downstream will continue triggering its
own dependencies based on
+ // allLevelDependent flag
+ .backfillDependentMode(downstreamDependentMode)
+ .allLevelDependent(allLevelDependent)
+ .executionOrder(originalParams.getExecutionOrder())
+ .build();
+
+ BackfillWorkflowDTO dependentBackfillDTO =
BackfillWorkflowDTO.builder()
+ .loginUser(backfillWorkflowDTO.getLoginUser())
+ .workflowDefinition(downstreamWorkflow)
+ .startNodes(null)
+ .failureStrategy(backfillWorkflowDTO.getFailureStrategy())
Review Comment:
When triggering dependent workflows, the `DependentWorkflowDefinition`
provides routing details (`taskDefinitionCode`, `workflowDefinitionVersion`,
`workerGroup`), but they are currently ignored (e.g., `startNodes(null)` and
`workflowDefinition` comes from `queryByCodes`). This can trigger the wrong
workflow version and/or run the entire downstream workflow instead of starting
from the dependent node. Please populate `startNodes` from
`dependentWorkflowDefinition.getTaskDefinitionCode()` and ensure the triggered
workflow version/workerGroup align with the dependent definition (similar to
how `ExecutorServiceImpl#createComplementDependentCommand` uses these fields).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]