This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 66fbcae [Upgrade][Install] add upgrade 2.0 code (#6672)
66fbcae is described below
commit 66fbcae9ddd6c0ca925c24ef01345fc793d85586
Author: JinYong Li <[email protected]>
AuthorDate: Thu Nov 4 17:20:23 2021 +0800
[Upgrade][Install] add upgrade 2.0 code (#6672)
* Optimizing SQL scripts
* add upgrade 2.0 ddl
* add upgrade 2.0.0 code
* fix valid license header
* fix valid license header
* fix valid license header
* fix ut
* fix code style
* fix code style
---
.../dao/upgrade/DolphinSchedulerManager.java | 2 +-
.../dolphinscheduler/dao/upgrade/JsonSplitDao.java | 248 +++++++++++++++++++++
.../dao/upgrade/ProcessDefinitionDao.java | 79 ++++++-
.../{ProcessDefinitionDao.java => ProjectDao.java} | 45 ++--
...{ProcessDefinitionDao.java => ScheduleDao.java} | 55 ++---
.../dolphinscheduler/dao/upgrade/UpgradeDao.java | 228 +++++++++++++++++--
.../2.0.0_schema/mysql/dolphinscheduler_ddl.sql | 2 +-
.../mysql/dolphinscheduler_ddl_post.sql | 1 -
8 files changed, 588 insertions(+), 72 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
index 7b8c4b3..a6ad6ca 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
@@ -119,7 +119,7 @@ public class DolphinSchedulerManager {
} else if ("1.3.2".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceList();
} else if ("2.0.0".equals(schemaVersion)) {
- upgradeDao.upgradeDolphinSchedulerJsonSplit();
+ upgradeDao.upgradeDolphinSchedulerTo200(schemaDir);
}
version = schemaVersion;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
new file mode 100644
index 0000000..790d33a
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.upgrade;
+
+import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JsonSplitDao {
+
+ public static final Logger logger =
LoggerFactory.getLogger(JsonSplitDao.class);
+
+ /**
+ * executeJsonSplitProcessDefinition
+ *
+ * @param conn jdbc connection
+ * @param processDefinitionLogs processDefinitionLogs
+ */
+ public void executeJsonSplitProcessDefinition(Connection conn,
List<ProcessDefinitionLog> processDefinitionLogs) {
+ String updateSql = "UPDATE t_ds_process_definition SET
global_params=?,timeout=?,tenant_id=?,locations=?,update_time=? where id=?";
+ String insertLogSql = "insert into t_ds_process_definition_log
(code,name,version,description,project_code,release_state,user_id,"
+ +
"global_params,flag,locations,timeout,tenant_id,operator,operate_time,create_time,update_time)
values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ try {
+ PreparedStatement processUpdate = conn.prepareStatement(updateSql);
+ PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
+ int i = 0;
+ for (ProcessDefinitionLog processDefinitionLog :
processDefinitionLogs) {
+ processUpdate.setString(1,
processDefinitionLog.getGlobalParams());
+ processUpdate.setInt(2, processDefinitionLog.getTimeout());
+ processUpdate.setInt(3, processDefinitionLog.getTenantId());
+ processUpdate.setString(4,
processDefinitionLog.getLocations());
+ processUpdate.setDate(5, (Date)
processDefinitionLog.getUpdateTime());
+ processUpdate.setInt(6, processDefinitionLog.getId());
+ processUpdate.addBatch();
+
+ insertLog.setLong(1, processDefinitionLog.getCode());
+ insertLog.setString(2, processDefinitionLog.getName());
+ insertLog.setInt(3, processDefinitionLog.getVersion());
+ insertLog.setString(4, processDefinitionLog.getDescription());
+ insertLog.setLong(5, processDefinitionLog.getProjectCode());
+ insertLog.setInt(6,
processDefinitionLog.getReleaseState().getCode());
+ insertLog.setInt(7, processDefinitionLog.getUserId());
+ insertLog.setString(8, processDefinitionLog.getGlobalParams());
+ insertLog.setInt(9, processDefinitionLog.getFlag().getCode());
+ insertLog.setString(10, processDefinitionLog.getLocations());
+ insertLog.setInt(11, processDefinitionLog.getTimeout());
+ insertLog.setInt(12, processDefinitionLog.getTenantId());
+ insertLog.setInt(13, processDefinitionLog.getOperator());
+ insertLog.setDate(14, (Date)
processDefinitionLog.getOperateTime());
+ insertLog.setDate(15, (Date)
processDefinitionLog.getCreateTime());
+ insertLog.setDate(16, (Date)
processDefinitionLog.getUpdateTime());
+ insertLog.addBatch();
+
+ i++;
+ if (i % 1000 == 0) {
+ processUpdate.executeBatch();
+ processUpdate.clearBatch();
+ insertLog.executeBatch();
+ insertLog.clearBatch();
+ }
+ }
+ processUpdate.executeBatch();
+ insertLog.executeBatch();
+ processUpdate.close();
+ insertLog.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ } finally {
+ ConnectionUtils.releaseResource(conn);
+ }
+ }
+
+ /**
+ * executeJsonSplitProcessDefinition
+ *
+ * @param conn jdbc connection
+ * @param processTaskRelationLogs processTaskRelationLogs
+ */
+ public void executeJsonSplitProcessTaskRelation(Connection conn,
List<ProcessTaskRelationLog> processTaskRelationLogs) {
+ String insertSql = "insert into t_ds_process_task_relation
(project_code,process_definition_code,process_definition_version,pre_task_code,pre_task_version,"
+ +
"post_task_code,post_task_version,condition_type,condition_params,create_time,update_time)
values (?,?,?,?,?,?,?,?,?,?,?)";
+ String insertLogSql = "insert into t_ds_process_task_relation_log
(project_code,process_definition_code,process_definition_version,pre_task_code,"
+ +
"pre_task_version,post_task_code,post_task_version,condition_type,condition_params,operator,operate_time,create_time,update_time)
"
+ + "values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ try {
+ PreparedStatement insert = conn.prepareStatement(insertSql);
+ PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
+ int i = 0;
+ for (ProcessTaskRelationLog processTaskRelationLog :
processTaskRelationLogs) {
+ insert.setLong(1, processTaskRelationLog.getProjectCode());
+ insert.setLong(2,
processTaskRelationLog.getProcessDefinitionCode());
+ insert.setInt(3,
processTaskRelationLog.getProcessDefinitionVersion());
+ insert.setLong(4, processTaskRelationLog.getPreTaskCode());
+ insert.setInt(5, processTaskRelationLog.getPreTaskVersion());
+ insert.setLong(6, processTaskRelationLog.getPostTaskCode());
+ insert.setInt(7, processTaskRelationLog.getPostTaskVersion());
+ insert.setInt(8,
processTaskRelationLog.getConditionType().getCode());
+ insert.setString(9,
processTaskRelationLog.getConditionParams());
+ insert.setDate(10, (Date)
processTaskRelationLog.getCreateTime());
+ insert.setDate(11, (Date)
processTaskRelationLog.getUpdateTime());
+ insert.addBatch();
+
+ insertLog.setLong(1, processTaskRelationLog.getProjectCode());
+ insertLog.setLong(2,
processTaskRelationLog.getProcessDefinitionCode());
+ insertLog.setInt(3,
processTaskRelationLog.getProcessDefinitionVersion());
+ insertLog.setLong(4, processTaskRelationLog.getPreTaskCode());
+ insertLog.setInt(5,
processTaskRelationLog.getPreTaskVersion());
+ insertLog.setLong(6, processTaskRelationLog.getPostTaskCode());
+ insertLog.setInt(7,
processTaskRelationLog.getPostTaskVersion());
+ insertLog.setInt(8,
processTaskRelationLog.getConditionType().getCode());
+ insertLog.setString(9,
processTaskRelationLog.getConditionParams());
+ insertLog.setInt(10, processTaskRelationLog.getOperator());
+ insertLog.setDate(11, (Date)
processTaskRelationLog.getOperateTime());
+ insertLog.setDate(12, (Date)
processTaskRelationLog.getCreateTime());
+ insertLog.setDate(13, (Date)
processTaskRelationLog.getUpdateTime());
+ insertLog.addBatch();
+
+ i++;
+ if (i % 1000 == 0) {
+ insert.executeBatch();
+ insert.clearBatch();
+ insertLog.executeBatch();
+ insertLog.clearBatch();
+ }
+ }
+ insert.executeBatch();
+ insertLog.executeBatch();
+ insert.close();
+ insertLog.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ } finally {
+ ConnectionUtils.releaseResource(conn);
+ }
+ }
+
+ /**
+ * executeJsonSplitTaskDefinition
+ *
+ * @param conn jdbc connection
+ * @param taskDefinitionLogs taskDefinitionLogs
+ */
+ public void executeJsonSplitTaskDefinition(Connection conn,
List<TaskDefinitionLog> taskDefinitionLogs) {
+ String insertSql = "insert into t_ds_task_definition
(code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
+ +
"worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,"
+ + "create_time,update_time) values values
(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ String insertLogSql = "insert into t_ds_task_definition_log
(code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
+ +
"worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator,"
+ + "operate_time,create_time,update_time) values values
(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ try {
+ PreparedStatement insert = conn.prepareStatement(insertSql);
+ PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
+ int i = 0;
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+ insert.setLong(1, taskDefinitionLog.getCode());
+ insert.setString(2, taskDefinitionLog.getName());
+ insert.setInt(3, taskDefinitionLog.getVersion());
+ insert.setString(4, taskDefinitionLog.getDescription());
+ insert.setLong(5, taskDefinitionLog.getProjectCode());
+ insert.setInt(6, taskDefinitionLog.getUserId());
+ insert.setString(7, taskDefinitionLog.getTaskType());
+ insert.setString(8, taskDefinitionLog.getTaskParams());
+ insert.setInt(9, taskDefinitionLog.getFlag().getCode());
+ insert.setInt(10,
taskDefinitionLog.getTaskPriority().getCode());
+ insert.setString(11, taskDefinitionLog.getWorkerGroup());
+ insert.setLong(12, taskDefinitionLog.getEnvironmentCode());
+ insert.setInt(13, taskDefinitionLog.getFailRetryTimes());
+ insert.setInt(14, taskDefinitionLog.getFailRetryInterval());
+ insert.setInt(15,
taskDefinitionLog.getTimeoutFlag().getCode());
+ insert.setInt(16,
taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
+ insert.setInt(17, taskDefinitionLog.getTimeout());
+ insert.setInt(18, taskDefinitionLog.getDelayTime());
+ insert.setString(19, taskDefinitionLog.getResourceIds());
+ insert.setDate(20, (Date) taskDefinitionLog.getCreateTime());
+ insert.setDate(21, (Date) taskDefinitionLog.getUpdateTime());
+ insert.addBatch();
+
+ insertLog.setLong(1, taskDefinitionLog.getCode());
+ insertLog.setString(2, taskDefinitionLog.getName());
+ insertLog.setInt(3, taskDefinitionLog.getVersion());
+ insertLog.setString(4, taskDefinitionLog.getDescription());
+ insertLog.setLong(5, taskDefinitionLog.getProjectCode());
+ insertLog.setInt(6, taskDefinitionLog.getUserId());
+ insertLog.setString(7, taskDefinitionLog.getTaskType());
+ insertLog.setString(8, taskDefinitionLog.getTaskParams());
+ insertLog.setInt(9, taskDefinitionLog.getFlag().getCode());
+ insertLog.setInt(10,
taskDefinitionLog.getTaskPriority().getCode());
+ insertLog.setString(11, taskDefinitionLog.getWorkerGroup());
+ insertLog.setLong(12, taskDefinitionLog.getEnvironmentCode());
+ insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes());
+ insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval());
+ insertLog.setInt(15,
taskDefinitionLog.getTimeoutFlag().getCode());
+ insertLog.setInt(16,
taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
+ insertLog.setInt(17, taskDefinitionLog.getTimeout());
+ insertLog.setInt(18, taskDefinitionLog.getDelayTime());
+ insertLog.setString(19, taskDefinitionLog.getResourceIds());
+ insertLog.setInt(20, taskDefinitionLog.getOperator());
+ insertLog.setDate(21, (Date)
taskDefinitionLog.getOperateTime());
+ insertLog.setDate(22, (Date)
taskDefinitionLog.getCreateTime());
+ insertLog.setDate(23, (Date)
taskDefinitionLog.getUpdateTime());
+ insertLog.addBatch();
+
+ i++;
+ if (i % 1000 == 0) {
+ insert.executeBatch();
+ insert.clearBatch();
+ insertLog.executeBatch();
+ insertLog.clearBatch();
+ }
+ }
+ insert.executeBatch();
+ insertLog.executeBatch();
+ insert.close();
+ insertLog.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ } finally {
+ ConnectionUtils.releaseResource(conn);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
index 8b6d762..9cb03b8 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
@@ -17,12 +17,19 @@
package org.apache.dolphinscheduler.dao.upgrade;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
@@ -43,7 +50,7 @@ public class ProcessDefinitionDao {
Map<Integer, String> processDefinitionJsonMap = new HashMap<>();
- String sql = String.format("SELECT id,process_definition_json FROM
t_ds_process_definition");
+ String sql = "SELECT id,process_definition_json FROM
t_ds_process_definition";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
@@ -66,7 +73,6 @@ public class ProcessDefinitionDao {
return processDefinitionJsonMap;
}
-
/**
* updateProcessDefinitionJson
*
@@ -82,9 +88,78 @@ public class ProcessDefinitionDao {
pstmt.setInt(2, entry.getKey());
pstmt.executeUpdate();
}
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException("sql: " + sql, e);
+ } finally {
+ ConnectionUtils.releaseResource(conn);
+ }
+ }
+ public List<ProcessDefinition> queryProcessDefinition(Connection conn) {
+ List<ProcessDefinition> processDefinitions = new ArrayList<>();
+ String sql = "SELECT
id,code,project_code,user_id,locations,name,description,release_state,flag,create_time
FROM t_ds_process_definition";
+ ResultSet rs = null;
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = conn.prepareStatement(sql);
+ rs = pstmt.executeQuery();
+ while (rs.next()) {
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setId(rs.getInt(1));
+ long code = rs.getLong(2);
+ if (code == 0L) {
+ code = SnowFlakeUtils.getInstance().nextId();
+ }
+ processDefinition.setCode(code);
+ processDefinition.setVersion(Constants.VERSION_FIRST);
+ processDefinition.setProjectCode(rs.getLong(3));
+ processDefinition.setUserId(rs.getInt(4));
+ processDefinition.setLocations(rs.getString(5));
+ processDefinition.setName(rs.getString(6));
+ processDefinition.setDescription(rs.getString(7));
+
processDefinition.setReleaseState(ReleaseState.getEnum(rs.getInt(8)));
+ processDefinition.setFlag(rs.getInt(9) == 1 ? Flag.YES :
Flag.NO);
+ processDefinition.setCreateTime(rs.getDate(10));
+ processDefinitions.add(processDefinition);
}
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException("sql: " + sql, e);
+ } finally {
+ ConnectionUtils.releaseResource(rs, pstmt, conn);
+ }
+ return processDefinitions;
+ }
+ /**
+ * updateProcessDefinitionCode
+ *
+ * @param conn jdbc connection
+ * @param processDefinitions processDefinitions
+ * @param projectIdCodeMap projectIdCodeMap
+ */
+ public void updateProcessDefinitionCode(Connection conn,
List<ProcessDefinition> processDefinitions, Map<Integer, Long>
projectIdCodeMap) {
+ String sql = "UPDATE t_ds_process_definition SET code=?,
project_code=?, version=? where id=?";
+ try {
+ for (ProcessDefinition processDefinition : processDefinitions) {
+ try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
+ pstmt.setLong(1, processDefinition.getCode());
+ long projectCode = processDefinition.getProjectCode();
+ if (String.valueOf(projectCode).length() <= 10) {
+ Integer projectId =
Integer.getInteger(String.valueOf(projectCode));
+ if (projectIdCodeMap.containsKey(projectId)) {
+ projectCode = projectIdCodeMap.get(projectId);
+ processDefinition.setProjectCode(projectCode);
+ }
+ }
+ pstmt.setLong(2, projectCode);
+ pstmt.setInt(3, processDefinition.getVersion());
+ pstmt.setInt(4, processDefinition.getId());
+ pstmt.executeUpdate();
+ }
+ }
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
similarity index 68%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
copy to
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
index 8b6d762..794d71a 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -28,63 +29,57 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ProcessDefinitionDao {
+public class ProjectDao {
-
- public static final Logger logger =
LoggerFactory.getLogger(ProcessDefinitionDao.class);
+ public static final Logger logger =
LoggerFactory.getLogger(ProjectDao.class);
/**
- * queryAllProcessDefinition
+ * queryAllProject
*
* @param conn jdbc connection
- * @return ProcessDefinition Json List
+ * @return Project List
*/
- public Map<Integer, String> queryAllProcessDefinition(Connection conn) {
-
- Map<Integer, String> processDefinitionJsonMap = new HashMap<>();
-
- String sql = String.format("SELECT id,process_definition_json FROM
t_ds_process_definition");
+ public Map<Integer, Long> queryAllProject(Connection conn) {
+ Map<Integer, Long> projectMap = new HashMap<>();
+ String sql = "SELECT id,code FROM t_ds_project";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
-
while (rs.next()) {
Integer id = rs.getInt(1);
- String processDefinitionJson = rs.getString(2);
- processDefinitionJsonMap.put(id, processDefinitionJson);
+ long code = rs.getLong(2);
+ if (code == 0L) {
+ code = SnowFlakeUtils.getInstance().nextId();
+ }
+ projectMap.put(id, code);
}
-
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
-
- return processDefinitionJsonMap;
+ return projectMap;
}
-
/**
- * updateProcessDefinitionJson
+ * updateProjectCode
*
* @param conn jdbc connection
- * @param processDefinitionJsonMap processDefinitionJsonMap
+ * @param projectMap projectMap
*/
- public void updateProcessDefinitionJson(Connection conn, Map<Integer,
String> processDefinitionJsonMap) {
- String sql = "UPDATE t_ds_process_definition SET
process_definition_json=? where id=?";
+ public void updateProjectCode(Connection conn, Map<Integer, Long>
projectMap) {
+ String sql = "UPDATE t_ds_project SET code=? where id=?";
try {
- for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
+ for (Map.Entry<Integer, Long> entry : projectMap.entrySet()) {
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
- pstmt.setString(1, entry.getValue());
+ pstmt.setLong(1, entry.getValue());
pstmt.setInt(2, entry.getKey());
pstmt.executeUpdate();
}
-
}
-
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
similarity index 56%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
copy to
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
index 8b6d762..0970200 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
@@ -22,69 +22,72 @@ import
org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ProcessDefinitionDao {
+public class ScheduleDao {
-
- public static final Logger logger =
LoggerFactory.getLogger(ProcessDefinitionDao.class);
+ public static final Logger logger =
LoggerFactory.getLogger(ScheduleDao.class);
/**
- * queryAllProcessDefinition
+ * queryAllSchedule
*
* @param conn jdbc connection
- * @return ProcessDefinition Json List
+ * @return Schedule List
*/
- public Map<Integer, String> queryAllProcessDefinition(Connection conn) {
-
- Map<Integer, String> processDefinitionJsonMap = new HashMap<>();
-
- String sql = String.format("SELECT id,process_definition_json FROM
t_ds_process_definition");
+ public Map<Integer, Long> queryAllSchedule(Connection conn) {
+ Map<Integer, Long> scheduleMap = new HashMap<>();
+ String sql = "SELECT id,process_definition_code FROM t_ds_schedules";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
-
while (rs.next()) {
Integer id = rs.getInt(1);
- String processDefinitionJson = rs.getString(2);
- processDefinitionJsonMap.put(id, processDefinitionJson);
+ long processDefinitionCode = rs.getLong(2);
+ scheduleMap.put(id, processDefinitionCode);
}
-
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
-
- return processDefinitionJsonMap;
+ return scheduleMap;
}
-
/**
- * updateProcessDefinitionJson
+ * update schedule
*
* @param conn jdbc connection
- * @param processDefinitionJsonMap processDefinitionJsonMap
+ * @param scheduleMap scheduleMap
+ * @param processIdCodeMap processIdCodeMap
*/
- public void updateProcessDefinitionJson(Connection conn, Map<Integer,
String> processDefinitionJsonMap) {
- String sql = "UPDATE t_ds_process_definition SET
process_definition_json=? where id=?";
+ public void updateScheduleCode(Connection conn, Map<Integer, Long>
scheduleMap, Map<Integer, Long> processIdCodeMap) {
+ String sql = "UPDATE t_ds_schedules SET
process_definition_code=?,timezone_id=?,environment_code=-1 where id=?";
try {
- for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
+ Clock clock = Clock.systemDefaultZone();
+ String timezoneId = clock.getZone().getId();
+ for (Map.Entry<Integer, Long> entry : scheduleMap.entrySet()) {
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
- pstmt.setString(1, entry.getValue());
- pstmt.setInt(2, entry.getKey());
+ long projectDefinitionCode = entry.getValue();
+ if (String.valueOf(projectDefinitionCode).length() <= 10) {
+ Integer projectDefinitionId =
Integer.getInteger(String.valueOf(projectDefinitionCode));
+ if (processIdCodeMap.containsKey(projectDefinitionId))
{
+ projectDefinitionCode =
processIdCodeMap.get(projectDefinitionId);
+ }
+ }
+ pstmt.setLong(1, projectDefinitionCode);
+ pstmt.setString(2, timezoneId);
+ pstmt.setInt(3, entry.getKey());
pstmt.executeUpdate();
}
-
}
-
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index 47eeedb..a7acafc 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -16,31 +16,55 @@
*/
package org.apache.dolphinscheduler.dao.upgrade;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.DbType;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.SchemaUtils;
+import org.apache.dolphinscheduler.common.utils.ScriptRunner;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.sql.DataSource;
-import java.io.*;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public abstract class UpgradeDao extends AbstractBaseDao {
public static final Logger logger =
LoggerFactory.getLogger(UpgradeDao.class);
@@ -266,9 +290,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
* @param schemaDir schema dir
*/
public void upgradeDolphinScheduler(String schemaDir) {
-
- upgradeDolphinSchedulerDDL(schemaDir);
-
+ upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
upgradeDolphinSchedulerDML(schemaDir);
}
@@ -292,8 +314,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* upgrade DolphinScheduler to 2.0.0
*/
- public void upgradeDolphinSchedulerJsonSplit() {
+ public void upgradeDolphinSchedulerTo200(String schemaDir) {
processDefinitionJsonSplit();
+ upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
}
/**
@@ -481,11 +504,11 @@ public abstract class UpgradeDao extends AbstractBaseDao {
*
* @param schemaDir schemaDir
*/
- private void upgradeDolphinSchedulerDDL(String schemaDir) {
+ private void upgradeDolphinSchedulerDDL(String schemaDir, String
scriptFile) {
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not
found");
}
- String sqlFilePath =
MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_ddl.sql",
rootDir, schemaDir, getDbType().name().toLowerCase());
+ String sqlFilePath =
MessageFormat.format("{0}/sql/upgrade/{1}/{2}/{3}", rootDir, schemaDir,
getDbType().name().toLowerCase(), scriptFile);
Connection conn = null;
PreparedStatement pstmt = null;
try {
@@ -517,7 +540,6 @@ public abstract class UpgradeDao extends AbstractBaseDao {
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
-
}
@@ -550,7 +572,181 @@ public abstract class UpgradeDao extends AbstractBaseDao {
}
- public void processDefinitionJsonSplit() {
+ /**
+ * upgrade DolphinScheduler to 2.0.0, json split
+ */
+ private void processDefinitionJsonSplit() {
+ ProjectDao projectDao = new ProjectDao();
+ ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
+ ScheduleDao scheduleDao = new ScheduleDao();
+ JsonSplitDao jsonSplitDao = new JsonSplitDao();
+ try {
+ // execute project
+ Map<Integer, Long> projectIdCodeMap =
projectDao.queryAllProject(dataSource.getConnection());
+ projectDao.updateProjectCode(dataSource.getConnection(),
projectIdCodeMap);
+
+ // execute process definition code
+ List<ProcessDefinition> processDefinitions =
processDefinitionDao.queryProcessDefinition(dataSource.getConnection());
+
processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(),
processDefinitions, projectIdCodeMap);
+
+ // execute schedule
+ Map<Integer, Long> allSchedule =
scheduleDao.queryAllSchedule(dataSource.getConnection());
+ Map<Integer, Long> processIdCodeMap =
processDefinitions.stream().collect(Collectors.toMap(ProcessDefinition::getId,
ProcessDefinition::getCode));
+ scheduleDao.updateScheduleCode(dataSource.getConnection(),
allSchedule, processIdCodeMap);
+
+ // json split
+ Map<Integer, String> processDefinitionJsonMap =
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
+ List<ProcessDefinitionLog> processDefinitionLogs = new
ArrayList<>();
+ List<ProcessTaskRelationLog> processTaskRelationLogs = new
ArrayList<>();
+ List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+ splitProcessDefinitionJson(processDefinitions,
processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs,
taskDefinitionLogs);
+
+ // execute json split
+
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(),
processDefinitionLogs);
+
jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(),
processTaskRelationLogs);
+
jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(),
taskDefinitionLogs);
+ } catch (Exception e) {
+ logger.error("json split error", e);
+ }
+ }
+
+ private void splitProcessDefinitionJson(List<ProcessDefinition>
processDefinitions,
+ Map<Integer, String>
processDefinitionJsonMap,
+ List<ProcessDefinitionLog>
processDefinitionLogs,
+ List<ProcessTaskRelationLog>
processTaskRelationLogs,
+ List<TaskDefinitionLog>
taskDefinitionLogs) throws Exception {
+ Map<Integer, ProcessDefinition> processDefinitionMap =
processDefinitions.stream()
+ .collect(Collectors.toMap(ProcessDefinition::getId,
processDefinition -> processDefinition));
+ Date now = new Date();
+ for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
+ if (entry.getValue() == null) {
+ throw new Exception("processDefinitionJson is null");
+ }
+ ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
+ ProcessDefinition processDefinition =
processDefinitionMap.get(entry.getKey());
+ if (processDefinition != null) {
+
processDefinition.setTenantId(jsonObject.get("tenantId").asInt());
+
processDefinition.setTimeout(jsonObject.get("timeout").asInt());
+
processDefinition.setGlobalParams(jsonObject.get("globalParams").toString());
+ } else {
+ throw new Exception("It can't find processDefinition, please
check !");
+ }
+ Map<String, Long> taskIdCodeMap = new HashMap<>();
+ Map<String, List<String>> taskNamePreMap = new HashMap<>();
+ Map<String, Long> taskNameCodeMap = new HashMap<>();
+ ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
+ for (int i = 0; i < tasks.size(); i++) {
+ ObjectNode task = (ObjectNode) tasks.path(i);
+ ObjectNode param = (ObjectNode) task.get("params");
+ TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+ if (param != null) {
+ List<ResourceInfo> resourceList =
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
+ if (!resourceList.isEmpty()) {
+ List<Integer> resourceIds =
resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
+
taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, ","));
+ }
+ param.put("conditionResult", task.get("conditionResult"));
+ param.put("dependence", task.get("dependence"));
+ taskDefinitionLog.setTaskParams(param.toString());
+ }
+ TaskTimeoutParameter timeout =
JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")),
TaskTimeoutParameter.class);
+ if (timeout != null) {
+ taskDefinitionLog.setTimeout(timeout.getInterval());
+ taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ?
TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
+
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
+ }
+
taskDefinitionLog.setDescription(task.get("description").toString());
+
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").toString())
? Flag.YES : Flag.NO);
+ taskDefinitionLog.setTaskType(task.get("type").toString());
+
taskDefinitionLog.setFailRetryInterval(task.get("retryInterval").asInt());
+
taskDefinitionLog.setFailRetryTimes(task.get("maxRetryTimes").asInt());
+
taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")),
Priority.class));
+ String name = task.get("name").toString();
+ taskDefinitionLog.setName(name);
+
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").toString());
+ long taskCode = SnowFlakeUtils.getInstance().nextId();
+ taskDefinitionLog.setCode(taskCode);
+ taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
+
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
+ taskDefinitionLog.setUserId(processDefinition.getUserId());
+ taskDefinitionLog.setEnvironmentCode(-1);
+ taskDefinitionLog.setDelayTime(0);
+ taskDefinitionLog.setOperator(1);
+ taskDefinitionLog.setOperateTime(now);
+ taskDefinitionLog.setCreateTime(now);
+ taskDefinitionLog.setUpdateTime(now);
+ taskDefinitionLogs.add(taskDefinitionLog);
+ taskIdCodeMap.put(task.get("id").toString(), taskCode);
+ List<String> preTasks =
JSONUtils.toList(task.get("preTasks").toString(), String.class);
+ taskNamePreMap.put(name, preTasks);
+ taskNameCodeMap.put(name, taskCode);
+ }
+
processDefinition.setLocations(convertLocations(processDefinition.getLocations(),
taskIdCodeMap));
+ ProcessDefinitionLog processDefinitionLog = new
ProcessDefinitionLog(processDefinition);
+ processDefinitionLog.setOperator(1);
+ processDefinitionLog.setOperateTime(now);
+ processDefinitionLog.setUpdateTime(now);
+ processDefinitionLogs.add(processDefinitionLog);
+ handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap,
processDefinition, processTaskRelationLogs);
+ }
+ }
+
+ private String convertLocations(String locations, Map<String, Long>
taskIdCodeMap) {
+ if (StringUtils.isBlank(locations)) {
+ return locations;
+ }
+ Map<String, String> locationsMap = JSONUtils.toMap(locations);
+ JsonNodeFactory factory = new JsonNodeFactory(false);
+ ArrayNode jsonNodes = factory.arrayNode();
+ for (Map.Entry<String, String> entry : locationsMap.entrySet()) {
+ ObjectNode nodes = factory.objectNode();
+ nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
+ ObjectNode oldNodes = JSONUtils.parseObject(entry.getValue());
+ nodes.put("x", oldNodes.get("x").asInt());
+ nodes.put("y", oldNodes.get("y").asInt());
+ jsonNodes.add(nodes);
+ }
+ return jsonNodes.toString();
+ }
+
+ private void handleProcessTaskRelation(Map<String, List<String>>
taskNamePreMap,
+ Map<String, Long> taskNameCodeMap,
+ ProcessDefinition processDefinition,
+ List<ProcessTaskRelationLog>
processTaskRelationLogs) {
+ Date now = new Date();
+ for (Map.Entry<String, List<String>> entry :
taskNamePreMap.entrySet()) {
+ List<String> entryValue = entry.getValue();
+ if (CollectionUtils.isNotEmpty(entryValue)) {
+ for (String preTaskName : entryValue) {
+ ProcessTaskRelationLog processTaskRelationLog =
setProcessTaskRelationLog(processDefinition, now);
+
processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName));
+
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
+
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
+
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
+ processTaskRelationLogs.add(processTaskRelationLog);
+ }
+ } else {
+ ProcessTaskRelationLog processTaskRelationLog =
setProcessTaskRelationLog(processDefinition, now);
+ processTaskRelationLog.setPreTaskCode(0);
+ processTaskRelationLog.setPreTaskVersion(0);
+
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
+
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
+ processTaskRelationLogs.add(processTaskRelationLog);
+ }
+ }
+ }
+ private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition
processDefinition, Date now) {
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
+
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
+
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
+
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
+ processTaskRelationLog.setConditionType(ConditionType.NONE);
+ processTaskRelationLog.setConditionParams("{}");
+ processTaskRelationLog.setOperator(1);
+ processTaskRelationLog.setOperateTime(now);
+ processTaskRelationLog.setCreateTime(now);
+ processTaskRelationLog.setUpdateTime(now);
+ return processTaskRelationLog;
}
}
diff --git a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql
b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql
index 9dbd6a0..2b8d494 100644
--- a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -400,7 +400,7 @@ alter table t_ds_schedules add environment_code bigint(20)
DEFAULT '-1' COMMENT
-- t_ds_process_definition
alter table t_ds_process_definition add `code` bigint(20) NOT NULL COMMENT
'encoding' AFTER `id`;
-alter table t_ds_process_definition add `project_code` bigint(20) NOT NULL
COMMENT 'encoding' AFTER `project_id`;
+alter table t_ds_process_definition change project_id project_code bigint(20)
NOT NULL COMMENT 'project code' AFTER `description`;
alter table t_ds_process_definition add `warning_group_id` int(11) DEFAULT
NULL COMMENT 'alert group id' AFTER `locations`;
alter table t_ds_process_definition add UNIQUE KEY `process_unique`
(`name`,`project_code`) USING BTREE;
alter table t_ds_process_definition modify `description` text COMMENT
'description' after `version`;
diff --git a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
index b0f00a0..5f3f65f 100644
--- a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
+++ b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
@@ -19,7 +19,6 @@ alter table t_ds_process_definition drop primary key;
ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`);
ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`;
ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`;
-alter table t_ds_process_definition drop project_id;
alter table t_ds_process_definition drop process_definition_json;
alter table t_ds_process_definition drop connects;
alter table t_ds_process_definition drop receivers;