SbloodyS commented on code in PR #10376:
URL: https://github.com/apache/dolphinscheduler/pull/10376#discussion_r894384470
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -854,4 +923,42 @@ private List<DependentProcessDefinition>
checkDependentProcessDefinitionValid(Li
return validDependentProcessDefinitionList;
}
+
+ /**
+ *
+ * @param schedule
+ * @return check error return 0 otherwish 1
+ */
+ private int checkScheduleTime(String schedule){
+ Date start = null;
+ Date end = null;
+ Map<String,String> scheduleResult = JSONUtils.toMap(schedule);
+ if(scheduleResult == null){
+ return 0;
+ }
+
if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ if(scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)
== null){
+ return 0;
+ }
+ }
+ if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
+ String startDate =
scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ String endDate =
scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ if (startDate == null || endDate == null) {
+ return 0;
+ }
+ start = DateUtils.getScheduleDate(startDate);
+ end = DateUtils.getScheduleDate(endDate);
+ if(start == null || end == null){
+ return 0;
+ }
+ if (start.after(end)) {
+ logger.info("complement data error, wrong date start:{} and
end date:{} ",
Review Comment:
This should be ```logger.error```
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -686,89 +728,116 @@ private int createCommand(CommandType commandType, long
processDefineCode,
* create complement command
* close left and close right
*
- * @param start
- * @param end
+ * @param scheduleTimeParam
* @param runMode
* @return
*/
- protected int createComplementCommandList(Date start, Date end, RunMode
runMode, Command command,
+ protected int createComplementCommandList(String scheduleTimeParam,
RunMode runMode, Command command,
Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode) {
int createCount = 0;
+ String startDate = null;
+ String endDate = null;
+ String dateList = null;
int dependentProcessDefinitionCreateCount = 0;
-
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
+ Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
+
if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ dateList =
scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+ }
+ if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) &&
scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+ startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ }
switch (runMode) {
case RUN_MODE_SERIAL: {
- if (start.after(end)) {
- logger.warn("The startDate {} is later than the endDate
{}", start, end);
- break;
+ if(StringUtils.isNotEmpty(dateList) || dateList != null){
Review Comment:
I think ```StringUtils.isNotEmpty(null)``` would return false. So it's
duplicated.
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -75,13 +71,7 @@
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
Review Comment:
This has not been addressed.
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -686,89 +728,116 @@ private int createCommand(CommandType commandType, long
processDefineCode,
* create complement command
* close left and close right
*
- * @param start
- * @param end
+ * @param scheduleTimeParam
* @param runMode
* @return
*/
- protected int createComplementCommandList(Date start, Date end, RunMode
runMode, Command command,
+ protected int createComplementCommandList(String scheduleTimeParam,
RunMode runMode, Command command,
Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode) {
int createCount = 0;
+ String startDate = null;
+ String endDate = null;
+ String dateList = null;
int dependentProcessDefinitionCreateCount = 0;
-
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
+ Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
+
if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ dateList =
scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+ }
+ if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) &&
scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+ startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ }
switch (runMode) {
case RUN_MODE_SERIAL: {
- if (start.after(end)) {
- logger.warn("The startDate {} is later than the endDate
{}", start, end);
- break;
+ if(StringUtils.isNotEmpty(dateList) || dateList != null){
+
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,dateList);
Review Comment:
This line should be formated.
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -187,11 +183,20 @@ public Map<String, Object> execProcessInstance(User
loginUser, long projectCode,
return result;
}
+ if(!checkScheduleTimeNum(commandType,cronTime)){
+ putMsg(result, Status.SCHEDULE_TIME_NUMBER);
+ return result;
+ }
+
+ if(!checkScheduleTimeRepeat(commandType,cronTime)){
Review Comment:
I think we should delete duplicate data here instead of return error. This
will improve the user experience.
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -686,89 +728,116 @@ private int createCommand(CommandType commandType, long
processDefineCode,
* create complement command
* close left and close right
*
- * @param start
- * @param end
+ * @param scheduleTimeParam
* @param runMode
* @return
*/
- protected int createComplementCommandList(Date start, Date end, RunMode
runMode, Command command,
+ protected int createComplementCommandList(String scheduleTimeParam,
RunMode runMode, Command command,
Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode) {
int createCount = 0;
+ String startDate = null;
+ String endDate = null;
+ String dateList = null;
int dependentProcessDefinitionCreateCount = 0;
-
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
+ Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
+
if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ dateList =
scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+ }
+ if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) &&
scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+ startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ }
switch (runMode) {
case RUN_MODE_SERIAL: {
- if (start.after(end)) {
- logger.warn("The startDate {} is later than the endDate
{}", start, end);
- break;
+ if(StringUtils.isNotEmpty(dateList) || dateList != null){
+
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,dateList);
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ createCount = processService.createCommand(command);
}
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(start));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(end));
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- createCount = processService.createCommand(command);
-
- // dependent process definition
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
-
- if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
- logger.info("process code: {} complement dependent in off
mode or schedule's size is 0, skip "
- + "dependent complement data",
command.getProcessDefinitionCode());
- } else {
- dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ if(startDate != null && endDate != null){
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
startDate);
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ createCount = processService.createCommand(command);
+
+ // dependent process definition
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+
+ if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement dependent in
off mode or schedule's size is 0, skip "
+ + "dependent complement data",
command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ }
}
-
break;
}
case RUN_MODE_PARALLEL: {
- if (start.after(end)) {
- logger.warn("The startDate {} is later than the endDate
{}", start, end);
- break;
- }
-
- List<Date> listDate = new ArrayList<>();
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
- listDate.addAll(CronUtils.getSelfFireDateList(start, end,
schedules));
- int listDateSize = listDate.size();
- createCount = listDate.size();
- if (!CollectionUtils.isEmpty(listDate)) {
- if (expectedParallelismNumber != null &&
expectedParallelismNumber != 0) {
- createCount = Math.min(listDate.size(),
expectedParallelismNumber);
- if (listDateSize < createCount) {
- createCount = listDateSize;
+ if(startDate != null && endDate != null){
+ List<Date> listDate = new ArrayList<>();
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+
listDate.addAll(CronUtils.getSelfFireDateList(DateUtils.getScheduleDate(startDate),
DateUtils.getScheduleDate(endDate), schedules));
+ int listDateSize = listDate.size();
+ createCount = listDate.size();
+ if (!CollectionUtils.isEmpty(listDate)) {
+ if (expectedParallelismNumber != null &&
expectedParallelismNumber != 0) {
+ createCount = Math.min(listDate.size(),
expectedParallelismNumber);
+ if (listDateSize < createCount) {
+ createCount = listDateSize;
+ }
+ }
+ logger.info("In parallel mode, current
expectedParallelismNumber:{}", createCount);
+
+ // Distribute the number of tasks equally to each
command.
+ // The last command with insufficient quantity will be
assigned to the remaining tasks.
+ int itemsPerCommand = (listDateSize / createCount);
+ int remainingItems = (listDateSize % createCount);
+ int startDateIndex = 0;
+ int endDateIndex = 0;
+
+ for (int i = 1; i <= createCount; i++) {
+ int extra = (i <= remainingItems) ? 1 : 0;
+ int singleCommandItems = (itemsPerCommand + extra);
+
+ if (i == 1) {
+ endDateIndex += singleCommandItems - 1;
+ } else {
+ startDateIndex = endDateIndex + 1;
+ endDateIndex += singleCommandItems;
+ }
+
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(listDate.get(startDateIndex)));
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(endDateIndex)));
+
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ processService.createCommand(command);
+
+ if (schedules.isEmpty() || complementDependentMode
== ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement
dependent in off mode or schedule's size is 0, skip "
+ + "dependent complement data",
command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ }
}
}
- logger.info("In parallel mode, current
expectedParallelismNumber:{}", createCount);
-
- // Distribute the number of tasks equally to each command.
- // The last command with insufficient quantity will be
assigned to the remaining tasks.
- int itemsPerCommand = (listDateSize / createCount);
- int remainingItems = (listDateSize % createCount);
- int startDateIndex = 0;
- int endDateIndex = 0;
-
- for (int i = 1; i <= createCount; i++) {
- int extra = (i <= remainingItems) ? 1 : 0;
- int singleCommandItems = (itemsPerCommand + extra);
-
- if (i == 1) {
- endDateIndex += singleCommandItems - 1;
- } else {
- startDateIndex = endDateIndex + 1;
- endDateIndex += singleCommandItems;
+ }
+ if(StringUtils.isNotEmpty(dateList) || dateList != null){
Review Comment:
Same as 754 line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]