det101 commented on code in PR #18003:
URL:
https://github.com/apache/dolphinscheduler/pull/18003#discussion_r2993025631
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -157,22 +176,165 @@ private Integer doBackfillWorkflow(final
BackfillWorkflowDTO backfillWorkflowDTO
.dryRun(backfillWorkflowDTO.getDryRun())
.build();
- final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
- .withService(IWorkflowControlClient.class)
- .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
- .backfillTriggerWorkflow(backfillTriggerRequest);
+ final WorkflowBackfillTriggerResponse backfillTriggerResponse =
+ triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
if (!backfillTriggerResponse.isSuccess()) {
throw new ServiceException("Backfill workflow failed: " +
backfillTriggerResponse.getMessage());
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
+ final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new
HashSet<>() : visitedCodes;
+
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
+ doBackfillDependentWorkflow(backfillWorkflowDTO,
backfillDateTimes, effectiveVisitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}
+ protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final
WorkflowBackfillTriggerRequest request,
+ final
Server masterServer) {
+ return Clients
+ .withService(IWorkflowControlClient.class)
+ .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
+ .backfillTriggerWorkflow(request);
+ }
+
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
- final List<String>
backfillTimeList) {
- // todo:
+ final List<ZonedDateTime>
backfillDateTimes,
+ final Set<Long> visitedCodes) {
+ // 1) Query downstream dependent workflows for the current workflow
+ final WorkflowDefinition upstreamWorkflow =
backfillWorkflowDTO.getWorkflowDefinition();
+ final long upstreamWorkflowCode = upstreamWorkflow.getCode();
+
+ List<DependentWorkflowDefinition> downstreamDefinitions =
+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
+
+ if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
+ log.info("No downstream dependent workflows found for workflow
code {}", upstreamWorkflowCode);
+ return;
+ }
+ // downstreamDefinitions may contain multiple entries for the same
downstream workflow code
+ // (different dependent task lineage). We should only traverse each
downstream workflow once
+ // (visitedCodes check), but trigger all dependent nodes within that
downstream workflow by
+ // aggregating distinct taskDefinitionCodes into startNodes.
+ final Map<Long, List<DependentWorkflowDefinition>>
downstreamDefinitionsByCode =
+ downstreamDefinitions.stream()
+
.collect(Collectors.groupingBy(DependentWorkflowDefinition::getWorkflowDefinitionCode));
+ final Set<Long> downstreamCodes = downstreamDefinitionsByCode.keySet();
+ final List<WorkflowDefinition> downstreamWorkflowList =
workflowDefinitionDao.queryByCodes(downstreamCodes);
+ // Each workflow code maps to a single WorkflowDefinition (code is
unique in t_ds_workflow_definition).
+ // We still group by code to simplify lookup and keep the code robust
if this ever changes.
+ final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode
= downstreamWorkflowList.stream()
+ .collect(Collectors.groupingBy(WorkflowDefinition::getCode));
+
+ // 2) Reuse upstream business dates for downstream backfill (same
instants/zones as the chunk passed to
+ // doBackfillWorkflow; avoids List<String> -> system-default parse ->
dateToString drift)
+ final List<ZonedDateTime> upstreamBackfillDates = new
ArrayList<>(backfillDateTimes);
+
+ // 3) Iterate downstream workflows and build/trigger corresponding
BackfillWorkflowDTO
+ for (Map.Entry<Long, List<DependentWorkflowDefinition>> entry :
downstreamDefinitionsByCode.entrySet()) {
+ long downstreamCode = entry.getKey();
+ List<DependentWorkflowDefinition> dependentDefinitions =
entry.getValue();
+
+ // Prevent self-dependency and circular dependency chains.
+ // We only traverse each downstream workflow once.
+ if (visitedCodes.contains(downstreamCode)) {
+ log.warn("Skip already visited dependent workflow {}",
downstreamCode);
+ continue;
+ }
+
+ DependentWorkflowDefinition representativeDependent =
dependentDefinitions.get(0);
+
+ // Aggregate dependent nodes within the same downstream workflow.
+ // If any entry represents workflow-level dependency
(taskDefinitionCode==0),
+ // we should backfill the whole downstream workflow
(startNodes=null).
+ final boolean isWorkflowLevelDependency =
+ dependentDefinitions.stream().anyMatch(d ->
d.getTaskDefinitionCode() == 0);
+ final List<Long> aggregatedStartNodes;
+ if (isWorkflowLevelDependency) {
+ aggregatedStartNodes = null;
+ } else {
+ aggregatedStartNodes = dependentDefinitions.stream()
+
.map(DependentWorkflowDefinition::getTaskDefinitionCode)
+ .filter(code -> code != 0)
+ .distinct()
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ WorkflowDefinition downstreamWorkflow = null;
+ List<WorkflowDefinition> workflowCandidates =
downstreamWorkflowMapByCode.get(downstreamCode);
+ if (workflowCandidates != null) {
+ downstreamWorkflow =
+ workflowCandidates.stream()
+ .filter(workflow -> workflow.getVersion() ==
representativeDependent
+ .getWorkflowDefinitionVersion())
+ .findFirst()
+ .orElse(null);
+ }
+ if (downstreamWorkflow == null) {
+ log.warn("Skip dependent workflow {}, workflow definition not
found", downstreamCode);
+ continue;
+ }
+
+ if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
+ log.warn("Skip dependent workflow {}, release state is not
ONLINE", downstreamCode);
+ continue;
+ }
+
+ // Currently, reuse the same business date list as upstream for
downstream backfill;
+ // later we can refine the dates based on dependency cycle
configuration in dependentWorkflowDefinition
+ // (taskParams).
+ BackfillWorkflowDTO.BackfillParamsDTO originalParams =
backfillWorkflowDTO.getBackfillParams();
+ boolean allLevelDependent = originalParams.isAllLevelDependent();
+ ComplementDependentMode downstreamDependentMode =
+ allLevelDependent ?
originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
+
+ BackfillWorkflowDTO.BackfillParamsDTO dependentParams =
BackfillWorkflowDTO.BackfillParamsDTO.builder()
+ // When the upstream is PARALLEL, dependent triggers
should not re-apply
+ // chunking on the already sliced date list; force SERIAL
to keep
+ // "traverse dependencies once per upstream date-chunk".
+ .runMode(originalParams.getRunMode() ==
RunMode.RUN_MODE_PARALLEL ? RunMode.RUN_MODE_SERIAL
+ : originalParams.getRunMode())
+ .backfillDateList(upstreamBackfillDates)
+
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
+ // Control whether downstream will continue triggering its
own dependencies based on
+ // allLevelDependent flag
+ .backfillDependentMode(downstreamDependentMode)
+ .allLevelDependent(allLevelDependent)
+ .executionOrder(originalParams.getExecutionOrder())
+ .build();
+
+ BackfillWorkflowDTO dependentBackfillDTO =
BackfillWorkflowDTO.builder()
+ .loginUser(backfillWorkflowDTO.getLoginUser())
+ .workflowDefinition(downstreamWorkflow)
+ // If taskDefinitionCode is 0, it means the dependency is
on the entire workflow.
+ // Otherwise, backfill should start from that dependent
node.
+ .startNodes(aggregatedStartNodes)
+ .failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+ .taskDependType(backfillWorkflowDTO.getTaskDependType())
+ .execType(backfillWorkflowDTO.getExecType())
+ .warningType(backfillWorkflowDTO.getWarningType())
+ .warningGroupId(downstreamWorkflow.getWarningGroupId())
+ .runMode(dependentParams.getRunMode())
+
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
+ // Align workerGroup with DependentWorkflowDefinition
(fallback to upstream when it's null).
+ .workerGroup(representativeDependent.getWorkerGroup() !=
null
+ ? representativeDependent.getWorkerGroup()
+ : backfillWorkflowDTO.getWorkerGroup())
+ .tenantCode(backfillWorkflowDTO.getTenantCode())
+ .environmentCode(backfillWorkflowDTO.getEnvironmentCode())
+ .startParamList(backfillWorkflowDTO.getStartParamList())
+ .dryRun(backfillWorkflowDTO.getDryRun())
+ .backfillParams(dependentParams)
+ .build();
+
+ log.info("Trigger dependent workflow {} for upstream workflow {}
with backfill dates {}",
+ downstreamCode, upstreamWorkflowCode,
+
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+
+ // 4) Mark as visiting before recursive trigger to detect cycles,
then trigger downstream backfill
+ visitedCodes.add(downstreamCode);
+ executeWithVisitedCodes(dependentBackfillDTO, visitedCodes);
+ }
Review Comment:
Situations where the actual scene layer count falls short of requirements,
necessitating optimization.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]