github-advanced-security[bot] commented on code in PR #16523:
URL:
https://github.com/apache/dolphinscheduler/pull/16523#discussion_r1732311026
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -93,47 +96,60 @@
}
log.info("In parallel mode, current expectedParallelismNumber:{}",
expectedParallelismNumber);
+ final List<Integer> workflowInstanceIdList = Lists.newArrayList();
for (List<ZonedDateTime> stringDate : Lists.partition(listDate,
expectedParallelismNumber)) {
- final BackfillWorkflowCommandParam backfillWorkflowCommandParam =
BackfillWorkflowCommandParam.builder()
- .commandParams(backfillWorkflowDTO.getStartParamList())
- .startNodes(backfillWorkflowDTO.getStartNodes())
-
.backfillTimeList(stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()))
- .timeZone(DateUtils.getTimezone())
- .build();
- doCreateCommand(backfillWorkflowDTO, backfillWorkflowCommandParam);
+ final Integer workflowInstanceId = doBackfillWorkflow(
+ backfillWorkflowDTO,
+
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+ workflowInstanceIdList.add(workflowInstanceId);
}
+ return workflowInstanceIdList;
}
- private void doCreateCommand(final BackfillWorkflowDTO backfillWorkflowDTO,
- final BackfillWorkflowCommandParam
backfillWorkflowCommandParam) {
- List<String> backfillTimeList =
backfillWorkflowCommandParam.getBackfillTimeList();
- final Command command = Command.builder()
- .commandType(backfillWorkflowDTO.getExecType())
-
.processDefinitionCode(backfillWorkflowDTO.getWorkflowDefinition().getCode())
-
.processDefinitionVersion(backfillWorkflowDTO.getWorkflowDefinition().getVersion())
- .executorId(backfillWorkflowDTO.getLoginUser().getId())
- .scheduleTime(DateUtils.stringToDate(backfillTimeList.get(0)))
-
.commandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam))
- .taskDependType(backfillWorkflowDTO.getTaskDependType())
+ private Integer doBackfillWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
+ final List<String> backfillTimeList) {
+ final Server masterServer =
registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
+ if (masterServer == null) {
+ throw new ServiceException("no master server available");
+ }
+
+ final ProcessDefinition workflowDefinition =
backfillWorkflowDTO.getWorkflowDefinition();
+ final WorkflowBackfillTriggerRequest backfillTriggerRequest =
WorkflowBackfillTriggerRequest.builder()
+ .userId(backfillWorkflowDTO.getLoginUser().getId())
+ .backfillTimeList(backfillTimeList)
+ .workflowCode(workflowDefinition.getCode())
+ .workflowVersion(workflowDefinition.getVersion())
+ .startNodes(backfillWorkflowDTO.getStartNodes())
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+ .taskDependType(backfillWorkflowDTO.getTaskDependType())
+ .execType(backfillWorkflowDTO.getExecType())
.warningType(backfillWorkflowDTO.getWarningType())
.warningGroupId(backfillWorkflowDTO.getWarningGroupId())
- .startTime(new Date())
-
.processInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
- .updateTime(new Date())
+
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
.workerGroup(backfillWorkflowDTO.getWorkerGroup())
.tenantCode(backfillWorkflowDTO.getTenantCode())
- .dryRun(backfillWorkflowDTO.getDryRun().getCode())
- .testFlag(backfillWorkflowDTO.getTestFlag().getCode())
+ .environmentCode(backfillWorkflowDTO.getEnvironmentCode())
+ .startParamList(backfillWorkflowDTO.getStartParamList())
+ .dryRun(backfillWorkflowDTO.getDryRun())
+ .testFlag(backfillWorkflowDTO.getTestFlag())
.build();
- commandDao.insert(command);
+
+ final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
+ .withService(IWorkflowInstanceController.class)
+ .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
+ .backfillTriggerWorkflow(backfillTriggerRequest);
+ if (!backfillTriggerResponse.isSuccess()) {
+ throw new ServiceException("Backfill workflow failed: " +
backfillTriggerResponse.getMessage());
+ }
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowCommandParam, command);
+ doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
}
+ return backfillTriggerResponse.getWorkflowInstanceId();
}
- private void doBackfillDependentWorkflow(final
BackfillWorkflowCommandParam backfillWorkflowCommandParam,
- final Command backfillCommand) {
+ private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
Review Comment:
## Useless parameter
The parameter 'backfillWorkflowDTO' is never used.
[Show more
details](https://github.com/apache/dolphinscheduler/security/code-scanning/4568)
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:
##########
@@ -93,47 +96,60 @@
}
log.info("In parallel mode, current expectedParallelismNumber:{}",
expectedParallelismNumber);
+ final List<Integer> workflowInstanceIdList = Lists.newArrayList();
for (List<ZonedDateTime> stringDate : Lists.partition(listDate,
expectedParallelismNumber)) {
- final BackfillWorkflowCommandParam backfillWorkflowCommandParam =
BackfillWorkflowCommandParam.builder()
- .commandParams(backfillWorkflowDTO.getStartParamList())
- .startNodes(backfillWorkflowDTO.getStartNodes())
-
.backfillTimeList(stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()))
- .timeZone(DateUtils.getTimezone())
- .build();
- doCreateCommand(backfillWorkflowDTO, backfillWorkflowCommandParam);
+ final Integer workflowInstanceId = doBackfillWorkflow(
+ backfillWorkflowDTO,
+
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
+ workflowInstanceIdList.add(workflowInstanceId);
}
+ return workflowInstanceIdList;
}
- private void doCreateCommand(final BackfillWorkflowDTO backfillWorkflowDTO,
- final BackfillWorkflowCommandParam
backfillWorkflowCommandParam) {
- List<String> backfillTimeList =
backfillWorkflowCommandParam.getBackfillTimeList();
- final Command command = Command.builder()
- .commandType(backfillWorkflowDTO.getExecType())
-
.processDefinitionCode(backfillWorkflowDTO.getWorkflowDefinition().getCode())
-
.processDefinitionVersion(backfillWorkflowDTO.getWorkflowDefinition().getVersion())
- .executorId(backfillWorkflowDTO.getLoginUser().getId())
- .scheduleTime(DateUtils.stringToDate(backfillTimeList.get(0)))
-
.commandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam))
- .taskDependType(backfillWorkflowDTO.getTaskDependType())
+ private Integer doBackfillWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
+ final List<String> backfillTimeList) {
+ final Server masterServer =
registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
+ if (masterServer == null) {
+ throw new ServiceException("no master server available");
+ }
+
+ final ProcessDefinition workflowDefinition =
backfillWorkflowDTO.getWorkflowDefinition();
+ final WorkflowBackfillTriggerRequest backfillTriggerRequest =
WorkflowBackfillTriggerRequest.builder()
+ .userId(backfillWorkflowDTO.getLoginUser().getId())
+ .backfillTimeList(backfillTimeList)
+ .workflowCode(workflowDefinition.getCode())
+ .workflowVersion(workflowDefinition.getVersion())
+ .startNodes(backfillWorkflowDTO.getStartNodes())
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
+ .taskDependType(backfillWorkflowDTO.getTaskDependType())
+ .execType(backfillWorkflowDTO.getExecType())
.warningType(backfillWorkflowDTO.getWarningType())
.warningGroupId(backfillWorkflowDTO.getWarningGroupId())
- .startTime(new Date())
-
.processInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
- .updateTime(new Date())
+
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
.workerGroup(backfillWorkflowDTO.getWorkerGroup())
.tenantCode(backfillWorkflowDTO.getTenantCode())
- .dryRun(backfillWorkflowDTO.getDryRun().getCode())
- .testFlag(backfillWorkflowDTO.getTestFlag().getCode())
+ .environmentCode(backfillWorkflowDTO.getEnvironmentCode())
+ .startParamList(backfillWorkflowDTO.getStartParamList())
+ .dryRun(backfillWorkflowDTO.getDryRun())
+ .testFlag(backfillWorkflowDTO.getTestFlag())
.build();
- commandDao.insert(command);
+
+ final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
+ .withService(IWorkflowInstanceController.class)
+ .withHost(masterServer.getHost() + ":" +
masterServer.getPort())
+ .backfillTriggerWorkflow(backfillTriggerRequest);
+ if (!backfillTriggerResponse.isSuccess()) {
+ throw new ServiceException("Backfill workflow failed: " +
backfillTriggerResponse.getMessage());
+ }
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams =
backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() ==
ComplementDependentMode.ALL_DEPENDENT) {
- doBackfillDependentWorkflow(backfillWorkflowCommandParam, command);
+ doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
}
+ return backfillTriggerResponse.getWorkflowInstanceId();
}
- private void doBackfillDependentWorkflow(final
BackfillWorkflowCommandParam backfillWorkflowCommandParam,
- final Command backfillCommand) {
+ private void doBackfillDependentWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
+ final List<String>
backfillTimeList) {
Review Comment:
## Useless parameter
The parameter 'backfillTimeList' is never used.
[Show more
details](https://github.com/apache/dolphinscheduler/security/code-scanning/4569)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]