This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new d60f31b [Feature][JsonSplit] update processDefinite from
processInstance (#5325)
d60f31b is described below
commit d60f31bcf34810fc12a2ed0195d00b253d04a972
Author: JinyLeeChina <[email protected]>
AuthorDate: Tue Apr 20 15:33:37 2021 +0800
[Feature][JsonSplit] update processDefinite from processInstance (#5325)
* update SnowFlake
* update processDefinite from processInstance
* update processDefinite from processInstance
Co-authored-by: JinyLeeChina <[email protected]>
---
.../service/impl/ProcessDefinitionServiceImpl.java | 4 +-
.../service/impl/ProcessInstanceServiceImpl.java | 39 +++++++-------------
.../api/service/ProcessDefinitionServiceTest.java | 3 +-
.../api/service/ProcessInstanceServiceTest.java | 2 +-
.../service/process/ProcessService.java | 43 +++++-----------------
.../service/process/ProcessServiceTest.java | 2 +-
sql/dolphinscheduler_mysql.sql | 32 ++--------------
sql/dolphinscheduler_postgre.sql | 26 -------------
8 files changed, 31 insertions(+), 120 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 693a858..5f424e1 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -205,7 +205,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
int saveResult = processService.saveProcessDefinition(loginUser,
project, processDefinitionName, desc,
- locations, connects, processData, processDefinition);
+ locations, connects, processData, processDefinition, true);
if (saveResult > 0) {
putMsg(result, Status.SUCCESS);
@@ -414,7 +414,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessData newProcessData =
JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
int saveResult = processService.saveProcessDefinition(loginUser,
project, name, desc,
- locations, connects, newProcessData, processDefinition);
+ locations, connects, newProcessData, processDefinition, true);
if (saveResult > 0) {
putMsg(result, Status.SUCCESS);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 67d6183..1a28411 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -52,7 +52,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@@ -72,7 +71,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -425,12 +423,12 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
* @param locations locations
* @param connects connects
* @return update result code
- * @throws ParseException parse exception for json parse
*/
+ @Transactional
@Override
public Map<String, Object> updateProcessInstance(User loginUser, String
projectName, Integer processInstanceId,
String
processInstanceJson, String scheduleTime, Boolean syncDefine,
- Flag flag, String
locations, String connects) throws ParseException {
+ Flag flag, String
locations, String connects) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
//check project permission
@@ -461,10 +459,10 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
}
Tenant tenant =
processService.getTenantForProcess(processData.getTenantId(),
processDefinition.getUserId());
- setProcessInstance(processInstance, tenant, scheduleTime, locations,
- connects, processInstanceJson, processData);
+ setProcessInstance(processInstance, tenant, scheduleTime, processData);
int updateDefine = 1;
if (Boolean.TRUE.equals(syncDefine)) {
+
processDefinition.setId(processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()).getId());
updateDefine = syncDefinition(loginUser, project, locations,
connects,
processInstance, processDefinition, processData);
@@ -495,37 +493,29 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
processDefinition.setTimeout(processInstance.getTimeout());
processDefinition.setUpdateTime(new Date());
- int updateDefine = processService.saveProcessDefinition(loginUser,
project, processDefinition.getName(),
+ return processService.saveProcessDefinition(loginUser, project,
processDefinition.getName(),
processDefinition.getDescription(), locations, connects,
- processData, processDefinition);
- return updateDefine;
+ processData, processDefinition, false);
}
/**
* update process instance attributes
- *
- * @return false if check failed or
*/
- private void setProcessInstance(ProcessInstance processInstance, Tenant
tenant,
- String scheduleTime, String locations,
String connects, String processInstanceJson,
- ProcessData processData) {
+ private void setProcessInstance(ProcessInstance processInstance, Tenant
tenant, String scheduleTime, ProcessData processData) {
Date schedule = processInstance.getScheduleTime();
if (scheduleTime != null) {
schedule = DateUtils.getScheduleDate(scheduleTime);
}
processInstance.setScheduleTime(schedule);
- processInstance.setLocations(locations);
- processInstance.setConnects(connects);
- if (StringUtils.isNotEmpty(processInstanceJson)) {
- return;
- }
List<Property> globalParamList = processData.getGlobalParams();
- Map<String, String> globalParamMap =
Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp,
Property::getValue));
+ Map<String, String> globalParamMap =
Optional.ofNullable(globalParamList)
+ .orElse(Collections.emptyList())
+ .stream()
+ .collect(Collectors.toMap(Property::getProp,
Property::getValue));
String globalParams =
ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
processInstance.getCmdTypeIfComplement(), schedule);
- int timeout = processData.getTimeout();
- processInstance.setTimeout(timeout);
+ processInstance.setTimeout(processData.getTimeout());
if (tenant != null) {
processInstance.setTenantCode(tenant.getTenantCode());
}
@@ -706,13 +696,10 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
throw new RuntimeException("workflow instance is null");
}
- ProcessDefinitionLog processDefinitionLog =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+ ProcessDefinition processDefinition =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()
);
- ProcessDefinition processDefinition =
JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog),
- ProcessDefinition.class);
-
GanttDto ganttDto = new GanttDto();
DAG<String, TaskNode, TaskNodeRelation> dag =
processService.genDagGraph(processDefinition);
//topological sort
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 2a4443d..2470676 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -456,7 +456,8 @@ public class ProcessDefinitionServiceTest {
, Mockito.anyString()
, Mockito.anyString()
, Mockito.any(ProcessData.class)
- , Mockito.any(ProcessDefinition.class)))
+ , Mockito.any(ProcessDefinition.class)
+ ,true))
.thenReturn(1);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData());
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index e85449d..3656ab2 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -409,7 +409,7 @@ public class ProcessInstanceServiceTest {
when(processDefineMapper.updateById(processDefinition)).thenReturn(1);
when(processService.saveProcessDefinition(Mockito.any(), Mockito.any(),
Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
- Mockito.anyString(), Mockito.any(),
Mockito.any())).thenReturn(1);
+ Mockito.anyString(), Mockito.any(), Mockito.any(),
true)).thenReturn(1);
putMsg(result, Status.SUCCESS, projectName);
Map<String, Object> successRes =
processInstanceService.updateProcessInstance(loginUser, projectName, 1,
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 44fa494..89a89be 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1587,32 +1587,6 @@ public class ProcessService {
}
/**
- * update the process instance
- *
- * @param processInstanceId processInstanceId
- * @param processJson processJson
- * @param globalParams globalParams
- * @param scheduleTime scheduleTime
- * @param flag flag
- * @param locations locations
- * @param connects connects
- * @return update process instance result
- */
- public int updateProcessInstance(Integer processInstanceId, String
processJson,
- String globalParams, Date scheduleTime,
Flag flag,
- String locations, String connects) {
- ProcessInstance processInstance =
processInstanceMapper.queryDetailById(processInstanceId);
- if (processInstance != null) {
- processInstance.setGlobalParams(globalParams);
- processInstance.setScheduleTime(scheduleTime);
- processInstance.setLocations(locations);
- processInstance.setConnects(connects);
- return processInstanceMapper.updateById(processInstance);
- }
- return 0;
- }
-
- /**
* change task state
*
* @param state state
@@ -2163,13 +2137,13 @@ public class ProcessService {
/**
* switch process definition version to process definition log version
*/
- public int processDefinitionToDB(ProcessDefinition processDefinition,
ProcessDefinitionLog processDefinitionLog) {
+ public int processDefinitionToDB(ProcessDefinition processDefinition,
ProcessDefinitionLog processDefinitionLog, Boolean isFromProcessDefine) {
if (null == processDefinition || null == processDefinitionLog) {
return Constants.DEFINITION_FAILURE;
}
processDefinitionLog.setId(processDefinition.getId());
- processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
+ processDefinitionLog.setReleaseState(isFromProcessDefine ?
ReleaseState.OFFLINE : ReleaseState.ONLINE);
processDefinitionLog.setFlag(Flag.YES);
int result;
@@ -2185,7 +2159,7 @@ public class ProcessService {
* switch process definition version to process definition log version
*/
public int switchVersion(ProcessDefinition processDefinition,
ProcessDefinitionLog processDefinitionLog) {
- int switchResult = processDefinitionToDB(processDefinition,
processDefinitionLog);
+ int switchResult = processDefinitionToDB(processDefinition,
processDefinitionLog, true);
if (switchResult != Constants.DEFINITION_FAILURE) {
switchProcessTaskRelationVersion(processDefinition);
}
@@ -2266,14 +2240,15 @@ public class ProcessService {
* save processDefinition (including create or update processDefinition)
*/
public int saveProcessDefinition(User operator, Project project, String
name, String desc, String locations,
- String connects, ProcessData processData,
ProcessDefinition processDefinition) {
+ String connects, ProcessData processData,
ProcessDefinition processDefinition,
+ Boolean isFromProcessDefine) {
ProcessDefinitionLog processDefinitionLog =
insertProcessDefinitionLog(operator, processDefinition.getCode(),
name, processData, project, desc, locations, connects);
- Map<String, TaskDefinition> taskDefinitionMap =
handleTaskDefinition(operator, project.getCode(), processData.getTasks());
+ Map<String, TaskDefinition> taskDefinitionMap =
handleTaskDefinition(operator, project.getCode(), processData.getTasks(),
isFromProcessDefine);
if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator,
project.getCode(), processDefinitionLog, processData.getTasks(),
taskDefinitionMap)) {
return Constants.DEFINITION_FAILURE;
}
- return processDefinitionToDB(processDefinition, processDefinitionLog);
+ return processDefinitionToDB(processDefinition, processDefinitionLog,
isFromProcessDefine);
}
/**
@@ -2319,7 +2294,7 @@ public class ProcessService {
/**
* handle task definition
*/
- public Map<String, TaskDefinition> handleTaskDefinition(User operator,
Long projectCode, List<TaskNode> taskNodes) {
+ public Map<String, TaskDefinition> handleTaskDefinition(User operator,
Long projectCode, List<TaskNode> taskNodes, Boolean isFromProcessDefine) {
if (taskNodes == null) {
return null;
}
@@ -2336,7 +2311,7 @@ public class ProcessService {
}
saveTaskDefinition(operator, projectCode, taskNode,
taskDefinition);
} else {
- if (isTaskOnline(taskDefinition.getCode())) {
+ if (isFromProcessDefine &&
isTaskOnline(taskDefinition.getCode())) {
throw new ServiceException(String.format("The task %s is
on line in process", taskNode.getName()));
}
updateTaskDefinition(operator, projectCode, taskNode,
taskDefinition);
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 3bee978..c8cb176 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -361,7 +361,7 @@ public class ProcessServiceTest {
Mockito.when(processDefineMapper.updateById(any())).thenReturn(1);
Mockito.when(processDefineLogMapper.insert(any())).thenReturn(1);
- int i = processService.saveProcessDefinition(user, project, "name",
"desc", "locations", "connects", processData, processDefinition);
+ int i = processService.saveProcessDefinition(user, project, "name",
"desc", "locations", "connects", processData, processDefinition, true);
Assert.assertEquals(1, i);
}
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 5997046..cc1d70a 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -455,7 +455,7 @@ CREATE TABLE `t_ds_task_definition` (
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
- `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
+ `task_type` varchar(50) NOT NULL COMMENT 'task type',
`task_params` text COMMENT 'job custom parameters',
`flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
`task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
@@ -484,7 +484,7 @@ CREATE TABLE `t_ds_task_definition_log` (
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
- `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
+ `task_type` varchar(50) NOT NULL COMMENT 'task type',
`task_params` text COMMENT 'job custom parameters',
`flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
`task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
@@ -547,32 +547,6 @@ CREATE TABLE `t_ds_process_task_relation_log` (
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
--- Table structure for t_ds_process_definition_version
--- ----------------------------
-DROP TABLE IF EXISTS `t_ds_process_definition_version`;
-CREATE TABLE `t_ds_process_definition_version` (
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
- `process_definition_id` int(11) NOT NULL COMMENT 'process definition id',
- `version` int(11) DEFAULT NULL COMMENT 'process definition version',
- `process_definition_json` longtext COMMENT 'process definition json content',
- `description` text,
- `global_params` text COMMENT 'global parameters',
- `locations` text COMMENT 'Node location information',
- `connects` text COMMENT 'Node connection information',
- `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
- `create_time` datetime DEFAULT NULL COMMENT 'create time',
- `timeout` int(11) DEFAULT '0' COMMENT 'time out',
- `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource ids',
- PRIMARY KEY (`id`),
- UNIQUE KEY `process_definition_id_and_version`
(`process_definition_id`,`version`) USING BTREE,
- KEY `process_definition_index` (`id`) USING BTREE
-) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-
--- ----------------------------
--- Records of t_ds_process_definition
--- ----------------------------
-
--- ----------------------------
-- Table structure for t_ds_process_instance
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_process_instance`;
@@ -814,7 +788,7 @@ DROP TABLE IF EXISTS `t_ds_task_instance`;
CREATE TABLE `t_ds_task_instance` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`name` varchar(255) DEFAULT NULL COMMENT 'task name',
- `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
+ `task_type` varchar(50) NOT NULL COMMENT 'task type',
`task_code` bigint(20) NOT NULL COMMENT 'task definition code',
`task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition
version',
`process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index ae12df5..37b07f3 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -429,29 +429,6 @@ CREATE TABLE t_ds_process_task_relation_log (
) ;
--
--- Table structure for table t_ds_process_definition_version
---
-
-DROP TABLE IF EXISTS t_ds_process_definition_version;
-CREATE TABLE t_ds_process_definition_version (
- id int NOT NULL ,
- process_definition_id int NOT NULL ,
- version int DEFAULT NULL ,
- process_definition_json text ,
- description text ,
- global_params text ,
- locations text ,
- connects text ,
- warning_group_id int4 DEFAULT NULL,
- create_time timestamp DEFAULT NULL ,
- timeout int DEFAULT '0' ,
- resource_ids varchar(64),
- PRIMARY KEY (id)
-) ;
-
-create index process_definition_id_and_version on
t_ds_process_definition_version (process_definition_id,version);
-
---
-- Table structure for table t_ds_process_instance
--
@@ -834,9 +811,6 @@ ALTER TABLE t_ds_process_task_relation ALTER COLUMN id SET
DEFAULT NEXTVAL('t_ds
DROP SEQUENCE IF EXISTS t_ds_process_task_relation_log_id_sequence;
CREATE SEQUENCE t_ds_process_task_relation_log_id_sequence;
ALTER TABLE t_ds_process_task_relation_log ALTER COLUMN id SET DEFAULT
NEXTVAL('t_ds_process_task_relation_log_id_sequence');
-DROP SEQUENCE IF EXISTS t_ds_process_definition_version_id_sequence;
-CREATE SEQUENCE t_ds_process_definition_version_id_sequence;
-ALTER TABLE t_ds_process_definition_version ALTER COLUMN id SET DEFAULT
NEXTVAL('t_ds_process_definition_version_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_process_instance_id_sequence;
CREATE SEQUENCE t_ds_process_instance_id_sequence;
ALTER TABLE t_ds_process_instance ALTER COLUMN id SET DEFAULT
NEXTVAL('t_ds_process_instance_id_sequence');