This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new a8f2894  [Feature][JsonSplit] Fix master/processInstance bug (#5206)
a8f2894 is described below

commit a8f2894b7916bde13bce5d2313f57ef968e9d24e
Author: JinyLeeChina <[email protected]>
AuthorDate: Sun Apr 4 11:26:42 2021 +0800

    [Feature][JsonSplit] Fix master/processInstance bug (#5206)
    
    * transform taskCode from long to string
    
    * fix process bug
    
    * code review
    
    * code review
    
    * code review
    
    * Fix master/processInstance bug
    
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../api/service/impl/ExecutorServiceImpl.java                 |  3 ++-
 .../api/service/impl/ProcessDefinitionServiceImpl.java        |  1 -
 .../api/service/impl/ProcessInstanceServiceImpl.java          |  3 ++-
 .../server/master/runner/MasterExecThread.java                |  8 ++++++--
 .../dolphinscheduler/service/process/ProcessService.java      | 11 +++++++----
 5 files changed, 17 insertions(+), 9 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 8e75eb9..be8b9ff 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -243,7 +243,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             return result;
         }
 
-        ProcessDefinition processDefinition = 
processService.findProcessDefineById(processInstance.getProcessDefinitionId());
+        ProcessDefinition processDefinition = 
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
         if (executeType != ExecuteType.STOP && executeType != 
ExecuteType.PAUSE) {
             result = checkProcessDefinitionValid(processDefinition, 
processInstance.getProcessDefinitionId());
             if (result.get(Constants.STATUS) != Status.SUCCESS) {
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 13048c5..dedc230 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -204,7 +204,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
 
-        // TODO relationName have ?
         int saveResult = processService.saveProcessDefinition(loginUser, 
project, processDefinitionName, desc,
                 locations, connects, processData, processDefinition);
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 266528f..c26ee4c 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -202,7 +202,8 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
         }
         ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(processId);
 
-        ProcessDefinition processDefinition = 
processService.findProcessDefineById(processInstance.getProcessDefinitionId());
+        ProcessDefinition processDefinition = 
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
         
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
         result.put(DATA_LIST, processInstance);
         putMsg(result, Status.SUCCESS);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 4f83958..0e5aa35 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -383,7 +383,7 @@ public class MasterExecThread implements Runnable {
         List<TaskNode> taskNodeList =
                 
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), 
processInstance.getProcessDefinitionVersion(), new HashMap<>());
         forbiddenTaskList.clear();
-        taskNodeList.stream().forEach(taskNode -> {
+        taskNodeList.forEach(taskNode -> {
             if (taskNode.isForbidden()) {
                 forbiddenTaskList.put(taskNode.getName(), taskNode);
             }
@@ -478,6 +478,8 @@ public class MasterExecThread implements Runnable {
         TaskInstance taskInstance = findTaskIfExists(nodeName);
         if (taskInstance == null) {
             taskInstance = new TaskInstance();
+            taskInstance.setTaskCode(Long.parseLong(taskNode.getCode()));
+            taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
             // task name
             taskInstance.setName(nodeName);
             // process instance define id
@@ -934,7 +936,9 @@ public class MasterExecThread implements Runnable {
             // send warning email if process time out.
             if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
                 alertManager.sendProcessTimeoutAlert(processInstance,
-                        
processService.findProcessDefineById(processInstance.getProcessDefinitionId()));
+                        
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+                                
processInstance.getProcessDefinitionVersion()));
+
                 sendTimeWarning = true;
             }
             for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry : 
activeTaskNode.entrySet()) {
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 2c6349c..28751bb 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -390,7 +390,7 @@ public class ProcessService {
     }
 
     /**
-     * find process define by id.
+     * find process define by code and version.
      *
      * @param processDefinitionCode processDefinitionCode
      * @return process definition
@@ -593,6 +593,8 @@ public class ProcessService {
                                                        Command command,
                                                        Map<String, String> 
cmdParam) {
         ProcessInstance processInstance = new 
ProcessInstance(processDefinition);
+        processInstance.setProcessDefinitionCode(processDefinition.getCode());
+        
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
         processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         processInstance.setRecovery(Flag.NO);
         processInstance.setStartTime(new Date());
@@ -718,7 +720,7 @@ public class ProcessService {
      */
     private ProcessInstance constructProcessInstance(Command command, String 
host) {
 
-        ProcessInstance processInstance = null;
+        ProcessInstance processInstance;
         CommandType commandType = command.getCommandType();
         Map<String, String> cmdParam = 
JSONUtils.toMap(command.getCommandParam());
 
@@ -732,7 +734,7 @@ public class ProcessService {
         }
 
         if (cmdParam != null) {
-            Integer processInstanceId = 0;
+            int processInstanceId = 0;
             // recover from failure or pause tasks
             if 
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
                 String processId = 
cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING);
@@ -1226,7 +1228,8 @@ public class ProcessService {
      * @param childDefinitionId childDefinitionId
      */
     private void updateSubProcessDefinitionByParent(ProcessInstance 
parentProcessInstance, int childDefinitionId) {
-        ProcessDefinition fatherDefinition = 
this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
+        ProcessDefinition fatherDefinition = 
this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
+                parentProcessInstance.getProcessDefinitionVersion());
         ProcessDefinition childDefinition = 
this.findProcessDefineById(childDefinitionId);
         if (childDefinition != null && fatherDefinition != null) {
             
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());

Reply via email to