This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0-prepare by this push:
     new cbc457b  [Improvement-6474] [MasterServer] schedule time for process 
instance optimization  (#6477) (#6496)
cbc457b is described below

commit cbc457b1a79866566eb145a8e7c61051b4f06b07
Author: Kirs <[email protected]>
AuthorDate: Mon Oct 11 16:49:53 2021 +0800

    [Improvement-6474] [MasterServer] schedule time for process instance 
optimization  (#6477) (#6496)
    
    * [DS-6474][MasterServer] change to handle schedule time for process 
instance in WorkflowExecuteThread
    
    * delete all the valid tasks when complement data if id is not null
    
    * checkstyle
    
    Co-authored-by: caishunfeng <[email protected]>
    
    Co-authored-by: wind <[email protected]>
    Co-authored-by: caishunfeng <[email protected]>
---
 .../server/master/runner/WorkflowExecuteThread.java  |  9 +++++++++
 .../service/process/ProcessService.java              | 20 +++++++++-----------
 2 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 43fcbd7..42effe7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -573,6 +573,15 @@ public class WorkflowExecuteThread implements Runnable {
                     complementListDate = CronUtils.getSelfFireDateList(start, 
end, schedules);
                     logger.info(" process definition code:{} complement data: 
{}",
                             processInstance.getProcessDefinitionCode(), 
complementListDate.toString());
+
+                    if (complementListDate.size() > 0 && Flag.NO == 
processInstance.getIsSubProcess()) {
+                        
processInstance.setScheduleTime(complementListDate.get(0));
+                        
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
+                                processDefinition.getGlobalParamMap(),
+                                processDefinition.getGlobalParamList(),
+                                CommandType.COMPLEMENT_DATA, 
processInstance.getScheduleTime()));
+                        processService.updateProcessInstance(processInstance);
+                    }
                 }
             }
         }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 50aef45..3c3b41d 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -125,7 +125,6 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.facebook.presto.jdbc.internal.guava.collect.Lists;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -630,10 +629,8 @@ public class ProcessService {
         processInstance.setWarningGroupId(warningGroupId);
         processInstance.setDryRun(command.getDryRun());
 
-        // schedule time
-        Date scheduleTime = getScheduleTime(command, cmdParam);
-        if (scheduleTime != null) {
-            processInstance.setScheduleTime(scheduleTime);
+        if (command.getScheduleTime() != null) {
+            processInstance.setScheduleTime(command.getScheduleTime());
         }
         processInstance.setCommandStartTime(command.getStartTime());
         processInstance.setLocations(processDefinition.getLocations());
@@ -878,13 +875,14 @@ public class ProcessService {
                 runStatus = processInstance.getState();
                 break;
             case COMPLEMENT_DATA:
-                // delete all the valid tasks when complement data
-                List<TaskInstance> taskInstanceList = 
this.findValidTaskListByProcessId(processInstance.getId());
-                for (TaskInstance taskInstance : taskInstanceList) {
-                    taskInstance.setFlag(Flag.NO);
-                    this.updateTaskInstance(taskInstance);
+                // delete all the valid tasks when complement data if id is 
not null
+                if (processInstance.getId() != 0) {
+                    List<TaskInstance> taskInstanceList = 
this.findValidTaskListByProcessId(processInstance.getId());
+                    for (TaskInstance taskInstance : taskInstanceList) {
+                        taskInstance.setFlag(Flag.NO);
+                        this.updateTaskInstance(taskInstance);
+                    }
                 }
-                initComplementDataParam(processDefinition, processInstance, 
cmdParam);
                 break;
             case REPEAT_RUNNING:
                 // delete the recover task names from command parameter

Reply via email to