SbloodyS commented on code in PR #18003:
URL:
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2963816118
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -166,13 +179,100 @@ private Integer doBackfillWorkflow(final
BackfillWorkflowDTO backfillWorkflowDTO
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+ final Set<Long> visitedCodes = new HashSet<>();
+
visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+ doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList,
visitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
- final List<String>
backfillTimeList) {
- // todo:
+ final List<String>
backfillTimeList,
+ final Set<Long> visitedCodes) {
+ // 1) Query downstream dependent workflows for the current workflow
+ final WorkflowDefinition upstreamWorkflow =
backfillWorkflowDTO.getWorkflowDefinition();
+ final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+ List<DependentWorkflowDefinition> downstreamDefinitions =
+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+ if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+ log.info("No downstream dependent workflows found for workflow
code {}", upstreamWorkflowCode);
+ return;
+ }
+
+ // 2) Convert upstream backfill time from string to ZonedDateTime as
the base business dates for downstream
+ // backfill
+ final List<ZonedDateTime> upstreamBackfillDates =
backfillTimeList.stream()
+ .map(DateUtils::stringToZoneDateTime)
+ .collect(Collectors.toList());
+
+ // 3) Iterate downstream workflows and build/trigger corresponding
BackfillWorkflowDTO
+ for (DependentWorkflowDefinition dependentWorkflowDefinition :
downstreamDefinitions) {
+ long downstreamCode =
dependentWorkflowDefinition.getWorkflowDefinitionCode();
+
+ // Prevent self-dependency and circular dependency chains
+ if (visitedCodes.contains(downstreamCode)) {
+ log.warn("Skip circular dependent workflow {}",
downstreamCode);
+ continue;
+ }
+
+ WorkflowDefinition downstreamWorkflow =
+
workflowDefinitionDao.queryByCode(downstreamCode).orElse(null);
+ if (downstreamWorkflow == null) {
+ log.warn("Skip dependent workflow {}, definition not found",
downstreamCode);
Review Comment:
```suggestion
log.warn("Skip dependent workflow {}, workflow definition
not found", downstreamCode);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]