This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new a8f2894 [Feature][JsonSplit] Fix master/processInstance bug (#5206)
a8f2894 is described below
commit a8f2894b7916bde13bce5d2313f57ef968e9d24e
Author: JinyLeeChina <[email protected]>
AuthorDate: Sun Apr 4 11:26:42 2021 +0800
[Feature][JsonSplit] Fix master/processInstance bug (#5206)
* transform taskCode from long to string
* fix process bug
* code review
* code review
* code review
* Fix master/processInstance bug
Co-authored-by: JinyLeeChina <[email protected]>
---
.../api/service/impl/ExecutorServiceImpl.java | 3 ++-
.../api/service/impl/ProcessDefinitionServiceImpl.java | 1 -
.../api/service/impl/ProcessInstanceServiceImpl.java | 3 ++-
.../server/master/runner/MasterExecThread.java | 8 ++++++--
.../dolphinscheduler/service/process/ProcessService.java | 11 +++++++----
5 files changed, 17 insertions(+), 9 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 8e75eb9..be8b9ff 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -243,7 +243,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
return result;
}
- ProcessDefinition processDefinition =
processService.findProcessDefineById(processInstance.getProcessDefinitionId());
+ ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType !=
ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(processDefinition,
processInstance.getProcessDefinitionId());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 13048c5..dedc230 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -204,7 +204,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- // TODO relationName have ?
int saveResult = processService.saveProcessDefinition(loginUser,
project, processDefinitionName, desc,
locations, connects, processData, processDefinition);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 266528f..c26ee4c 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -202,7 +202,8 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
}
ProcessInstance processInstance =
processService.findProcessInstanceDetailById(processId);
- ProcessDefinition processDefinition =
processService.findProcessDefineById(processInstance.getProcessDefinitionId());
+ ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
result.put(DATA_LIST, processInstance);
putMsg(result, Status.SUCCESS);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 4f83958..0e5aa35 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -383,7 +383,7 @@ public class MasterExecThread implements Runnable {
List<TaskNode> taskNodeList =
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), new HashMap<>());
forbiddenTaskList.clear();
- taskNodeList.stream().forEach(taskNode -> {
+ taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {
forbiddenTaskList.put(taskNode.getName(), taskNode);
}
@@ -478,6 +478,8 @@ public class MasterExecThread implements Runnable {
TaskInstance taskInstance = findTaskIfExists(nodeName);
if (taskInstance == null) {
taskInstance = new TaskInstance();
+ taskInstance.setTaskCode(Long.parseLong(taskNode.getCode()));
+ taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
// task name
taskInstance.setName(nodeName);
// process instance define id
@@ -934,7 +936,9 @@ public class MasterExecThread implements Runnable {
// send warning email if process time out.
if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
alertManager.sendProcessTimeoutAlert(processInstance,
-
processService.findProcessDefineById(processInstance.getProcessDefinitionId()));
+
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+
processInstance.getProcessDefinitionVersion()));
+
sendTimeWarning = true;
}
for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry :
activeTaskNode.entrySet()) {
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 2c6349c..28751bb 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -390,7 +390,7 @@ public class ProcessService {
}
/**
- * find process define by id.
+ * find process define by code and version.
*
* @param processDefinitionCode processDefinitionCode
* @return process definition
@@ -593,6 +593,8 @@ public class ProcessService {
Command command,
Map<String, String>
cmdParam) {
ProcessInstance processInstance = new
ProcessInstance(processDefinition);
+ processInstance.setProcessDefinitionCode(processDefinition.getCode());
+
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
@@ -718,7 +720,7 @@ public class ProcessService {
*/
private ProcessInstance constructProcessInstance(Command command, String
host) {
- ProcessInstance processInstance = null;
+ ProcessInstance processInstance;
CommandType commandType = command.getCommandType();
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
@@ -732,7 +734,7 @@ public class ProcessService {
}
if (cmdParam != null) {
- Integer processInstanceId = 0;
+ int processInstanceId = 0;
// recover from failure or pause tasks
if
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
String processId =
cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING);
@@ -1226,7 +1228,8 @@ public class ProcessService {
* @param childDefinitionId childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance
parentProcessInstance, int childDefinitionId) {
- ProcessDefinition fatherDefinition =
this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
+ ProcessDefinition fatherDefinition =
this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
+ parentProcessInstance.getProcessDefinitionVersion());
ProcessDefinition childDefinition =
this.findProcessDefineById(childDefinitionId);
if (childDefinition != null && fatherDefinition != null) {
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());