This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0-prepare by this push:
new cbc457b [Improvement-6474] [MasterServer] schedule time for process
instance optimization (#6477) (#6496)
cbc457b is described below
commit cbc457b1a79866566eb145a8e7c61051b4f06b07
Author: Kirs <[email protected]>
AuthorDate: Mon Oct 11 16:49:53 2021 +0800
[Improvement-6474] [MasterServer] schedule time for process instance
optimization (#6477) (#6496)
* [DS-6474][MasterServer] change to handle schedule time for process
instance in WorkflowExecuteThread
* delete all the valid tasks when complement data if id is not null
* checkstyle
Co-authored-by: caishunfeng <[email protected]>
Co-authored-by: wind <[email protected]>
Co-authored-by: caishunfeng <[email protected]>
---
.../server/master/runner/WorkflowExecuteThread.java | 9 +++++++++
.../service/process/ProcessService.java | 20 +++++++++-----------
2 files changed, 18 insertions(+), 11 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 43fcbd7..42effe7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -573,6 +573,15 @@ public class WorkflowExecuteThread implements Runnable {
complementListDate = CronUtils.getSelfFireDateList(start,
end, schedules);
logger.info(" process definition code:{} complement data:
{}",
processInstance.getProcessDefinitionCode(),
complementListDate.toString());
+
+ if (complementListDate.size() > 0 && Flag.NO ==
processInstance.getIsSubProcess()) {
+
processInstance.setScheduleTime(complementListDate.get(0));
+
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime()));
+ processService.updateProcessInstance(processInstance);
+ }
}
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 50aef45..3c3b41d 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -125,7 +125,6 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.facebook.presto.jdbc.internal.guava.collect.Lists;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -630,10 +629,8 @@ public class ProcessService {
processInstance.setWarningGroupId(warningGroupId);
processInstance.setDryRun(command.getDryRun());
- // schedule time
- Date scheduleTime = getScheduleTime(command, cmdParam);
- if (scheduleTime != null) {
- processInstance.setScheduleTime(scheduleTime);
+ if (command.getScheduleTime() != null) {
+ processInstance.setScheduleTime(command.getScheduleTime());
}
processInstance.setCommandStartTime(command.getStartTime());
processInstance.setLocations(processDefinition.getLocations());
@@ -878,13 +875,14 @@ public class ProcessService {
runStatus = processInstance.getState();
break;
case COMPLEMENT_DATA:
- // delete all the valid tasks when complement data
- List<TaskInstance> taskInstanceList =
this.findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance taskInstance : taskInstanceList) {
- taskInstance.setFlag(Flag.NO);
- this.updateTaskInstance(taskInstance);
+ // delete all the valid tasks when complement data if id is
not null
+ if (processInstance.getId() != 0) {
+ List<TaskInstance> taskInstanceList =
this.findValidTaskListByProcessId(processInstance.getId());
+ for (TaskInstance taskInstance : taskInstanceList) {
+ taskInstance.setFlag(Flag.NO);
+ this.updateTaskInstance(taskInstance);
+ }
}
- initComplementDataParam(processDefinition, processInstance,
cmdParam);
break;
case REPEAT_RUNNING:
// delete the recover task names from command parameter