caishunfeng commented on code in PR #10376:
URL: https://github.com/apache/dolphinscheduler/pull/10376#discussion_r894647361
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -686,89 +705,117 @@ private int createCommand(CommandType commandType, long
processDefineCode,
* create complement command
* close left and close right
*
- * @param start
- * @param end
+ * @param scheduleTimeParam
* @param runMode
* @return
*/
- protected int createComplementCommandList(Date start, Date end, RunMode
runMode, Command command,
+ protected int createComplementCommandList(String scheduleTimeParam,
RunMode runMode, Command command,
Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode) {
int createCount = 0;
+ String startDate = null;
+ String endDate = null;
+ String dateList = null;
int dependentProcessDefinitionCreateCount = 0;
-
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
+ Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
+
if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ dateList =
scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+ dateList = removeDuplicates(dateList);
+ }
+ if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) &&
scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+ startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ }
switch (runMode) {
case RUN_MODE_SERIAL: {
- if (start.after(end)) {
- logger.warn("The startDate {} is later than the endDate
{}", start, end);
- break;
+ if(StringUtils.isNotEmpty(dateList)){
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,
dateList);
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ createCount = processService.createCommand(command);
}
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(start));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(end));
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- createCount = processService.createCommand(command);
-
- // dependent process definition
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
-
- if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
- logger.info("process code: {} complement dependent in off
mode or schedule's size is 0, skip "
- + "dependent complement data",
command.getProcessDefinitionCode());
- } else {
- dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ if(startDate != null && endDate != null){
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
startDate);
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ createCount = processService.createCommand(command);
+
+ // dependent process definition
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+
+ if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement dependent in
off mode or schedule's size is 0, skip "
+ + "dependent complement data",
command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ }
}
-
break;
}
case RUN_MODE_PARALLEL: {
- if (start.after(end)) {
- logger.warn("The startDate {} is later than the endDate
{}", start, end);
- break;
- }
-
- List<Date> listDate = new ArrayList<>();
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
- listDate.addAll(CronUtils.getSelfFireDateList(start, end,
schedules));
- int listDateSize = listDate.size();
- createCount = listDate.size();
- if (!CollectionUtils.isEmpty(listDate)) {
- if (expectedParallelismNumber != null &&
expectedParallelismNumber != 0) {
- createCount = Math.min(listDate.size(),
expectedParallelismNumber);
- if (listDateSize < createCount) {
- createCount = listDateSize;
+ if(startDate != null && endDate != null){
Review Comment:
It's better to separate these out into a method like:
createComplementCommandByCron and createComplementCommandByDateList, just a
code improvement suggestion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]