This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new c013b49 [Fix-6347][Master-API]fix bug #6347 the first schedule_time
is error in complement data (#6389)
c013b49 is described below
commit c013b49e728d942bf22574f1b59d4c5a9e69a9f3
Author: OS <[email protected]>
AuthorDate: Tue Sep 28 17:00:46 2021 +0800
[Fix-6347][Master-API]fix bug #6347 the first schedule_time is error in
complement data (#6389)
* fix bug #6347 complement data errors
---
.../api/service/impl/ExecutorServiceImpl.java | 114 ++++++++++++---------
.../apache/dolphinscheduler/common/Constants.java | 10 ++
.../master/runner/WorkflowExecuteThread.java | 25 +++--
.../service/process/ProcessService.java | 34 ++----
.../service/quartz/cron/CronUtils.java | 25 +++--
.../service/quartz/cron/CronUtilsTest.java | 11 +-
6 files changed, 124 insertions(+), 95 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index eb0f39e..a213573 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -551,66 +551,78 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
if (interval.length == 2) {
start = DateUtils.getScheduleDate(interval[0]);
end = DateUtils.getScheduleDate(interval[1]);
+ if (start.after(end)) {
+ logger.info("complement data error, wrong date start:{}
and end date:{} ",
+ start, end
+ );
+ return 0;
+ }
}
}
// determine whether to complement
if (commandType == CommandType.COMPLEMENT_DATA) {
- runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
- if (null != start && null != end && !start.after(end)) {
- if (runMode == RunMode.RUN_MODE_SERIAL) {
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(start));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(end));
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- return processService.createCommand(command);
- } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineCode);
- LinkedList<Date> listDate = new LinkedList<>();
- if (!CollectionUtils.isEmpty(schedules)) {
- for (Schedule item : schedules) {
-
listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
- }
- }
- if (!CollectionUtils.isEmpty(listDate)) {
- int effectThreadsCount = expectedParallelismNumber ==
null ? listDate.size() : Math.min(listDate.size(), expectedParallelismNumber);
- logger.info("In parallel mode, current
expectedParallelismNumber:{}", effectThreadsCount);
-
- int chunkSize = listDate.size() / effectThreadsCount;
- listDate.addFirst(start);
- listDate.addLast(end);
-
- for (int i = 0; i < effectThreadsCount; i++) {
- int rangeStart = i == 0 ? i : (i * chunkSize);
- int rangeEnd = i == effectThreadsCount - 1 ?
listDate.size() - 1
- : rangeStart + chunkSize + 1;
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(listDate.get(rangeStart)));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(rangeEnd)));
-
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processService.createCommand(command);
- }
-
- return effectThreadsCount;
- } else {
- // loop by day
- int runCunt = 0;
- while (!start.after(end)) {
- runCunt += 1;
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(start));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(start));
-
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processService.createCommand(command);
- start = DateUtils.getSomeDay(start, 1);
- }
- return runCunt;
- }
- }
- } else {
- logger.error("there is not valid schedule date for the process
definition code:{}", processDefineCode);
+ if (start == null || end == null) {
+ return 0;
}
+ return createComplementCommandList(start, end, runMode, command,
expectedParallelismNumber);
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
}
+ }
- return 0;
+ /**
+ * create complement command
+ * close left open right
+ *
+ * @param start
+ * @param end
+ * @param runMode
+ * @return
+ */
+ private int createComplementCommandList(Date start, Date end, RunMode
runMode, Command command, Integer expectedParallelismNumber) {
+ int createCount = 0;
+ runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
+ Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
+ switch (runMode) {
+ case RUN_MODE_SERIAL: {
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(start));
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(end));
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ createCount = processService.createCommand(command);
+ break;
+ }
+ case RUN_MODE_PARALLEL: {
+ LinkedList<Date> listDate = new LinkedList<>();
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+ listDate.addAll(CronUtils.getSelfFireDateList(start, end,
schedules));
+ createCount = listDate.size();
+ if (!CollectionUtils.isEmpty(listDate)) {
+ if (expectedParallelismNumber != null &&
expectedParallelismNumber != 0) {
+ createCount = Math.min(listDate.size(),
expectedParallelismNumber);
+ }
+ logger.info("In parallel mode, current
expectedParallelismNumber:{}", createCount);
+ int chunkSize = listDate.size() / createCount;
+
+ for (int i = 0; i < createCount; i++) {
+ int rangeStart = i == 0 ? i : (i * chunkSize);
+ int rangeEnd = i == createCount - 1 ? listDate.size()
- 1
+ : rangeStart + chunkSize;
+ if (rangeEnd == listDate.size()) {
+ rangeEnd = listDate.size() - 1;
+ }
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(listDate.get(rangeStart)));
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(rangeEnd)));
+
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ processService.createCommand(command);
+ }
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ logger.info("create complement command count: {}", createCount);
+ return createCount;
}
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 4f9aca5..417ce59 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -449,6 +449,11 @@ public final class Constants {
*/
public static final String CMDPARAM_COMPLEMENT_DATA_END_DATE =
"complementEndDate";
+ /**
+ * complement date default cron string
+ */
+ public static final String DEFAULT_CRON_STRING = "0 0 0 * * ? *";
+
/**
* data source config
@@ -504,6 +509,11 @@ public final class Constants {
public static final int SLEEP_TIME_MILLIS = 1000;
/**
+ * one second mils
+ */
+ public static final int SECOND_TIME_MILLIS = 1000;
+
+ /**
* master task instance cache-database refresh interval
*/
public static final int CACHE_REFRESH_TIME_MILLIS = 20 * 1000;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 2e151cb..9de1a0b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -436,11 +436,15 @@ public class WorkflowExecuteThread implements Runnable {
scheduleDate = complementListDate.get(0);
} else if (processInstance.getState().typeIsFinished()) {
endProcess();
+ if (complementListDate.size() <= 0) {
+ logger.info("process complement end. process id:{}",
processInstance.getId());
+ return true;
+ }
int index = complementListDate.indexOf(scheduleDate);
if (index >= complementListDate.size() - 1 ||
!processInstance.getState().typeIsSuccess()) {
logger.info("process complement end. process id:{}",
processInstance.getId());
// complement data ends || no success
- return false;
+ return true;
}
logger.info("process complement continue. process id:{}, schedule
time:{} complementListDate:{}",
processInstance.getId(),
@@ -559,14 +563,19 @@ public class WorkflowExecuteThread implements Runnable {
}
}
- if (complementListDate.size() == 0 && needComplementProcess()) {
- complementListDate = processService.getComplementDateList(
- JSONUtils.toMap(processInstance.getCommandParam()),
- processInstance.getProcessDefinitionCode());
- logger.info(" process definition code:{} complement data: {}",
- processInstance.getProcessDefinitionCode(),
complementListDate.toString());
+ if (processInstance.isComplementData() && complementListDate.size() ==
0) {
+ Map<String, String> cmdParam =
JSONUtils.toMap(processInstance.getCommandParam());
+ if (cmdParam != null &&
cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+ Date start =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+ Date end =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+ if (complementListDate.size() == 0 && needComplementProcess())
{
+ complementListDate = CronUtils.getSelfFireDateList(start,
end, schedules);
+ logger.info(" process definition code:{} complement data:
{}",
+ processInstance.getProcessDefinitionCode(),
complementListDate.toString());
+ }
+ }
}
-
}
/**
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c1e392b..c4fce3e 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -587,7 +587,12 @@ public class ProcessService {
if (scheduleTime == null
&& cmdParam != null
&& cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
- List<Date> complementDateList = getComplementDateList(cmdParam,
command.getProcessDefinitionCode());
+
+ Date start =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+ Date end =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ List<Schedule> schedules =
queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+ List<Date> complementDateList =
CronUtils.getSelfFireDateList(start, end, schedules);
+
if (complementDateList.size() > 0) {
scheduleTime = complementDateList.get(0);
} else {
@@ -972,7 +977,10 @@ public class ProcessService {
return;
}
- List<Date> complementDate = getComplementDateList(cmdParam,
processInstance.getProcessDefinitionCode());
+ Date start =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+ Date end =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ List<Schedule> listSchedules =
queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+ List<Date> complementDate = CronUtils.getSelfFireDateList(start, end,
listSchedules);
if (complementDate.size() > 0
&& Flag.NO == processInstance.getIsSubProcess()) {
@@ -982,28 +990,6 @@ public class ProcessService {
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
-
- }
-
- /**
- * return complement date list
- *
- * @param cmdParam
- * @param processDefinitionCode
- * @return
- */
- public List<Date> getComplementDateList(Map<String, String> cmdParam, Long
processDefinitionCode) {
- List<Date> result = new ArrayList<>();
- Date startDate =
DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
- Date endDate =
DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
- if (startDate.after(endDate)) {
- Date tmp = startDate;
- startDate = endDate;
- endDate = tmp;
- }
- List<Schedule> schedules =
queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
- result.addAll(CronUtils.getSelfFireDateList(startDate, endDate,
schedules));
- return result;
}
/**
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
index 418d063..ab9a97b 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
@@ -25,6 +25,7 @@ import static
org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.week;
import static com.cronutils.model.CronType.QUARTZ;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@@ -182,18 +183,20 @@ public class CronUtils {
* gets all scheduled times for a period of time based on self dependency
* if schedulers is empty then default scheduler = 1 day
*/
- public static List<Date> getSelfFireDateList(Date startTime, Date endTime,
List<Schedule> schedules) {
+ public static List<Date> getSelfFireDateList(final Date startTime, final
Date endTime, final List<Schedule> schedules) {
List<Date> result = new ArrayList<>();
- if (!CollectionUtils.isEmpty(schedules)) {
- for (Schedule schedule : schedules) {
- result.addAll(CronUtils.getSelfFireDateList(startTime,
endTime, schedule.getCrontab()));
- }
- } else {
- Date start = startTime;
- for (int i = 0; start.before(endTime); i++) {
- start = DateUtils.getSomeDay(startTime, i);
- result.add(start);
- }
+ Date from = new Date(startTime.getTime() -
Constants.SECOND_TIME_MILLIS);
+ Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS);
+
+ List<Schedule> listSchedule = new ArrayList<>();
+ listSchedule.addAll(schedules);
+ if (CollectionUtils.isEmpty(listSchedule)) {
+ Schedule schedule = new Schedule();
+ schedule.setCrontab(Constants.DEFAULT_CRON_STRING);
+ listSchedule.add(schedule);
+ }
+ for (Schedule schedule : listSchedule) {
+ result.addAll(CronUtils.getSelfFireDateList(from, to,
schedule.getCrontab()));
}
return result;
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java
index b4f864c..55cc19d 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java
@@ -167,7 +167,7 @@ public class CronUtilsTest {
}
@Test
- public void getSelfFireDateList() throws ParseException{
+ public void getSelfFireDateList() throws ParseException {
Date from = DateUtils.stringToDate("2020-01-01 00:00:00");
Date to = DateUtils.stringToDate("2020-01-31 00:00:00");
// test date
@@ -179,6 +179,15 @@ public class CronUtilsTest {
// test other
Assert.assertEquals(30, CronUtils.getFireDateList(from, to,
CronUtils.parse2CronExpression("0 0 0 * * ? ")).size());
Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to,
CronUtils.parse2CronExpression("0 0 0 * * ? "), 5).size());
+ from = DateUtils.stringToDate("2020-01-01 00:02:00");
+ to = DateUtils.stringToDate("2020-01-01 00:02:00");
+ Assert.assertEquals(1, CronUtils.getFireDateList(new
Date(from.getTime() - 1000), to, CronUtils.parse2CronExpression("0 * * * * ?
")).size());
+
+ from = DateUtils.stringToDate("2020-01-01 00:02:00");
+ to = DateUtils.stringToDate("2020-01-01 00:04:00");
+ Assert.assertEquals(2, CronUtils.getFireDateList(new
Date(from.getTime() - 1000),
+ new Date(to.getTime() - 1000),
+ CronUtils.parse2CronExpression("0 * * * * ? ")).size());
}
@Test