This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 713fdbc444 Split the upgrade code (#13436)
713fdbc444 is described below
commit 713fdbc444fc0dbaf68c34277f9afa4561449336
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jan 20 20:48:07 2023 +0800
Split the upgrade code (#13436)
1. Split the upgrade code into different version
2. Log the dml/ddl sql in origin friendly format
3. Fix ddl of 3.0.0 and 3.1.0
4. Fix search resource regular expression injection
---
.../api/service/impl/ResourcesServiceImpl.java | 2 +-
.../common/utils/ScriptRunner.java | 18 +-
.../dolphinscheduler/dao/upgrade/JsonSplitDao.java | 7 -
.../dao/upgrade/ProcessDefinitionDao.java | 25 +-
.../dolphinscheduler/dao/upgrade/ProjectDao.java | 13 +-
.../dolphinscheduler/dao/upgrade/ScheduleDao.java | 14 +-
.../dao/upgrade/WorkerGroupDao.java | 14 +-
.../3.0.0_schema/mysql/dolphinscheduler_ddl.sql | 2 +-
.../3.1.0_schema/mysql/dolphinscheduler_ddl.sql | 59 +-
.../3.1.1_schema/mysql/dolphinscheduler_ddl.sql | 21 +-
.../tools/datasource/DolphinSchedulerManager.java | 17 +-
.../tools/datasource/dao/MySQLUpgradeDao.java | 22 +-
.../tools/datasource/dao/PostgreSQLUpgradeDao.java | 39 +-
.../tools/datasource/dao/ResourceDao.java | 71 +--
.../tools/datasource/dao/UpgradeDao.java | 607 ++-------------------
.../upgrader/DolphinSchedulerUpgrader.java | 2 +-
...rUpgrader.java => DolphinSchedulerVersion.java} | 28 +-
.../v130/V130DolphinSchedulerUpgrader.java | 97 ++++
.../v132/V132DolphinSchedulerUpgrader.java | 151 +++++
.../v200/V200DolphinSchedulerUpgrader.java} | 510 +++--------------
.../v320/V320DolphinSchedulerUpgrader.java | 5 +-
21 files changed, 511 insertions(+), 1213 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index f1821fcb63..430e84077f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -763,7 +763,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
String trimmedSearchVal = searchVal != null ? searchVal.trim() : "";
// filter based on trimmed searchVal
List<StorageEntity> filteredResourceList = resourcesList.stream()
- .filter(x -> x.getFileName().matches("(.*)" + trimmedSearchVal
+ "(.*)")).collect(Collectors.toList());
+ .filter(x ->
x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList());
// inefficient pagination
List<StorageEntity> slicedResourcesList =
filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize)
.limit(pageSize).collect(Collectors.toList());
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
index 33c632b8d6..e3212931f2 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
@@ -24,6 +24,8 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,17 +93,17 @@ public class ScriptRunner {
* @throws IOException if there is an error reading from the Reader
*/
private void runScript(Connection conn, Reader reader) throws IOException,
SQLException {
- StringBuffer command = null;
+ List<String> command = null;
try {
LineNumberReader lineReader = new LineNumberReader(reader);
String line;
while ((line = lineReader.readLine()) != null) {
if (command == null) {
- command = new StringBuffer();
+ command = new ArrayList<>();
}
String trimmedLine = line.trim();
if (trimmedLine.startsWith("--")) {
- logger.info(trimmedLine);
+ logger.info("\n{}", trimmedLine);
} else if (trimmedLine.length() < 1 ||
trimmedLine.startsWith("//")) {
// Do nothing
} else if (trimmedLine.startsWith("delimiter")) {
@@ -110,12 +112,11 @@ public class ScriptRunner {
} else if (!fullLineDelimiter &&
trimmedLine.endsWith(getDelimiter())
|| fullLineDelimiter &&
trimmedLine.equals(getDelimiter())) {
- command.append(line, 0, line.lastIndexOf(getDelimiter()));
- command.append(" ");
- logger.info("sql: {}", command);
+ command.add(line.substring(0,
line.lastIndexOf(getDelimiter())));
+ logger.info("\n{}", String.join("\n", command));
try (Statement statement = conn.createStatement()) {
- statement.execute(command.toString());
+ statement.execute(String.join(" ", command));
try (ResultSet rs = statement.getResultSet()) {
if (stopOnError && rs != null) {
ResultSetMetaData md = rs.getMetaData();
@@ -142,8 +143,7 @@ public class ScriptRunner {
command = null;
Thread.yield();
} else {
- command.append(line);
- command.append(" ");
+ command.add(line);
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
index f2cb65877f..25bdb8b3cc 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.dao.upgrade;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@@ -92,8 +91,6 @@ public class JsonSplitDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
- } finally {
- ConnectionUtils.releaseResource(conn);
}
}
@@ -160,8 +157,6 @@ public class JsonSplitDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
- } finally {
- ConnectionUtils.releaseResource(conn);
}
}
@@ -250,8 +245,6 @@ public class JsonSplitDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
- } finally {
- ConnectionUtils.releaseResource(conn);
}
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
index 75a66a5e2f..61fbf81900 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import java.sql.Connection;
@@ -50,11 +49,9 @@ public class ProcessDefinitionDao {
Map<Integer, String> processDefinitionJsonMap = new HashMap<>();
String sql = "SELECT id,process_definition_json FROM
t_ds_process_definition";
- ResultSet rs = null;
- PreparedStatement pstmt = null;
- try {
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
+ try (
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
Integer id = rs.getInt(1);
@@ -65,8 +62,6 @@ public class ProcessDefinitionDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return processDefinitionJsonMap;
@@ -91,8 +86,6 @@ public class ProcessDefinitionDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(conn);
}
}
@@ -100,11 +93,9 @@ public class ProcessDefinitionDao {
List<ProcessDefinition> processDefinitions = new ArrayList<>();
String sql =
"SELECT
id,code,project_code,user_id,locations,name,description,release_state,flag,create_time
FROM t_ds_process_definition";
- ResultSet rs = null;
- PreparedStatement pstmt = null;
- try {
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
+ try (
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(rs.getInt(1));
@@ -127,8 +118,6 @@ public class ProcessDefinitionDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return processDefinitions;
}
@@ -164,8 +153,6 @@ public class ProcessDefinitionDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(conn);
}
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
index 2906902f5a..e8e3f9ad2c 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -42,11 +41,9 @@ public class ProjectDao {
public Map<Integer, Long> queryAllProject(Connection conn) {
Map<Integer, Long> projectMap = new HashMap<>();
String sql = "SELECT id,code FROM t_ds_project";
- ResultSet rs = null;
- PreparedStatement pstmt = null;
- try {
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
+ try (
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
Integer id = rs.getInt(1);
long code = rs.getLong(2);
@@ -58,8 +55,6 @@ public class ProjectDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return projectMap;
}
@@ -83,8 +78,6 @@ public class ProjectDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(conn);
}
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
index 9d486d0077..3717df0a6b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.dao.upgrade;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -42,11 +40,9 @@ public class ScheduleDao {
public Map<Integer, Long> queryAllSchedule(Connection conn) {
Map<Integer, Long> scheduleMap = new HashMap<>();
String sql = "SELECT id,process_definition_code FROM t_ds_schedules";
- ResultSet rs = null;
- PreparedStatement pstmt = null;
- try {
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
+ try (
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
Integer id = rs.getInt(1);
long processDefinitionCode = rs.getLong(2);
@@ -55,8 +51,6 @@ public class ScheduleDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return scheduleMap;
}
@@ -92,8 +86,6 @@ public class ScheduleDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(conn);
}
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java
index e8a9fc7d82..797e4b94ea 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.dao.upgrade;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -40,12 +38,10 @@ public class WorkerGroupDao {
public Map<Integer, String> queryAllOldWorkerGroup(Connection conn) {
Map<Integer, String> workerGroupMap = new HashMap<>();
- String sql = String.format("select id,name from t_ds_worker_group");
- ResultSet rs = null;
- PreparedStatement pstmt = null;
- try {
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
+ String sql = "select id,name from t_ds_worker_group";
+ try (
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
int id = rs.getInt(1);
@@ -56,8 +52,6 @@ public class WorkerGroupDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return workerGroupMap;
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql
index 630984e3b0..72acb02051 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -469,7 +469,7 @@ BEGIN
ALTER TABLE `t_ds_alert` ADD COLUMN `project_code` bigint DEFAULT NULL COMMENT
'project_code';
ALTER TABLE `t_ds_alert` ADD COLUMN `process_definition_code` bigint DEFAULT
NULL COMMENT 'process_definition_code';
ALTER TABLE `t_ds_alert` ADD COLUMN `process_instance_id` int DEFAULT NULL
COMMENT 'process_instance_id';
-ALTER TABLE `t_ds_alert` MODIFY COLUMN `alert_type` int DEFAULT NULL COMMENT
'alert_type';
+ALTER TABLE `t_ds_alert` ADD COLUMN `alert_type` int DEFAULT NULL COMMENT
'alert_type';
END IF;
END;
d//
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql
index 059f0d27ef..85fe7390e8 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -392,41 +392,46 @@ CALL modify_t_ds_task_group_col_description;
DROP PROCEDURE modify_t_ds_task_group_col_description;
-- alter table `t_ds_worker_group` add `other_params_json` text;
--- alter table `t_ds_process_instance` add `state_history` text;
-drop procedure if exists add_column_safety;
+drop procedure if exists add_t_ds_task_group_other_params_json;
delimiter d//
-create procedure add_column_safety(target_table_name varchar(256),
target_column varchar(256),
- target_column_type varchar(256), sths_else
varchar(256))
-begin
- declare target_database varchar(256);
-select database() into target_database;
-IF EXISTS(SELECT *
- FROM information_schema.COLUMNS
- WHERE COLUMN_NAME = target_column
- AND TABLE_NAME = target_table_name
- )
+CREATE PROCEDURE add_t_ds_task_group_other_params_json()
+BEGIN
+ IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE TABLE_NAME='t_ds_worker_group'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND COLUMN_NAME='other_params_json')
THEN
- set @statement =
- concat('alter table ', target_table_name, ' change column ',
target_column, ' ', target_column, ' ',
- target_column_type, ' ',
- sths_else);
-PREPARE STMT_c FROM @statement;
-EXECUTE STMT_c;
+alter table `t_ds_worker_group` add column `other_params_json` text DEFAULT
NULL COMMENT "other params json";
ELSE
- set @statement =
- concat('alter table ', target_table_name, ' add column ',
target_column, ' ', target_column_type, ' ',
- sths_else);
-PREPARE STMT_a FROM @statement;
-EXECUTE STMT_a;
+alter table `t_ds_worker_group` modify column `other_params_json` text DEFAULT
NULL COMMENT "other params json";
END IF;
-end;
+END;
d//
delimiter ;
-call add_column_safety('t_ds_worker_group','other_params_json', 'text' ,
"DEFAULT NULL COMMENT 'other params json'");
-call add_column_safety('t_ds_process_instance','state_history', 'text' ,
"DEFAULT NULL COMMENT 'state history desc' AFTER `state`");
+call add_t_ds_task_group_other_params_json();
+drop procedure if exists add_t_ds_task_group_other_params_json;
+
+-- alter table `t_ds_process_instance` add `state_history` text;
+drop procedure if exists add_t_ds_process_instance_state_history;
+delimiter d//
+CREATE PROCEDURE add_t_ds_process_instance_state_history()
+BEGIN
+ IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE TABLE_NAME='t_ds_process_instance'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND COLUMN_NAME='state_history')
+ THEN
+alter table `t_ds_process_instance` add column `state_history` text DEFAULT
NULL COMMENT "other params json";
+ELSE
+alter table `t_ds_process_instance` modify column `state_history` text DEFAULT
NULL COMMENT "other params json";
+END IF;
+END;
+d//
+delimiter ;
+call add_t_ds_process_instance_state_history();
+drop procedure if exists add_t_ds_process_instance_state_history;
-drop procedure if exists add_column_safety;
alter table t_ds_process_instance alter column process_instance_priority set
default 2;
alter table t_ds_schedules alter column process_instance_priority set default
2;
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
index 978f1bf39f..989c9b51ab 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
@@ -65,5 +65,22 @@ d//
delimiter ;
-- ALTER TABLE t_ds_worker_group ADD COLUMN description varchar(255) DEFAULT
NULL COMMENT 'ds worker group description';
-call add_column_safety('t_ds_worker_group','description', 'varchar(255)' ,
"DEFAULT NULL COMMENT 'ds worker group description'");
-drop procedure if exists add_column_safety;
+drop procedure if exists modify_t_ds_worker_group_description;
+delimiter d//
+CREATE PROCEDURE modify_t_ds_worker_group_description()
+BEGIN
+ IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE TABLE_NAME='t_ds_worker_group'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND COLUMN_NAME='description')
+ THEN
+ alter table `t_ds_worker_group` add column `description` varchar(255)
DEFAULT NULL COMMENT "ds worker group description";
+ELSE
+alter table `t_ds_worker_group` modify column `description` varchar(255)
DEFAULT NULL COMMENT "ds worker group description";
+END IF;
+END;
+d//
+delimiter ;
+
+call modify_t_ds_worker_group_description();
+drop procedure if exists modify_t_ds_worker_group_description;
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
index 3fe4d0c276..74cf447621 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao;
import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
+import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
import org.apache.commons.collections4.CollectionUtils;
@@ -45,7 +46,7 @@ public class DolphinSchedulerManager {
private final UpgradeDao upgradeDao;
- private Map<String, DolphinSchedulerUpgrader> upgraderMap = new
HashMap<>();
+ private Map<DolphinSchedulerVersion, DolphinSchedulerUpgrader> upgraderMap
= new HashMap<>();
public DolphinSchedulerManager(DataSource dataSource, List<UpgradeDao>
daos,
List<DolphinSchedulerUpgrader>
dolphinSchedulerUpgraders) throws Exception {
@@ -121,21 +122,11 @@ public class DolphinSchedulerManager {
logger.info("upgrade DolphinScheduler metadata version
from {} to {}", version, schemaVersion);
logger.info("Begin upgrading DolphinScheduler's table
structure");
upgradeDao.upgradeDolphinScheduler(schemaDir);
- if ("1.3.0".equals(schemaVersion)) {
- upgradeDao.upgradeDolphinSchedulerWorkerGroup();
- } else if ("1.3.2".equals(schemaVersion)) {
- upgradeDao.upgradeDolphinSchedulerResourceList();
- } else if ("2.0.0".equals(schemaVersion)) {
- upgradeDao.upgradeDolphinSchedulerTo200(schemaDir);
- }
- DolphinSchedulerUpgrader dolphinSchedulerUpgrader =
upgraderMap.get(schemaVersion);
- if (dolphinSchedulerUpgrader != null) {
- dolphinSchedulerUpgrader.doUpgrade();
- }
+
DolphinSchedulerVersion.getVersion(schemaVersion).ifPresent(v ->
upgraderMap.get(v).doUpgrade());
version = schemaVersion;
}
}
-
+ // todo: do we need to do this in all version > 2.0.6?
if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion)
&&
SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceFileSize();
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java
index 7361932034..20651213ec 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
@@ -56,17 +55,13 @@ public class MySQLUpgradeDao extends UpgradeDao {
*/
@Override
public boolean isExistsTable(String tableName) {
- ResultSet rs = null;
- Connection conn = null;
- try {
- conn = dataSource.getConnection();
- rs = conn.getMetaData().getTables(conn.getCatalog(),
conn.getSchema(), tableName, null);
+ try (
+ Connection conn = dataSource.getConnection();
+ ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(),
conn.getSchema(), tableName, null)) {
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(rs, conn);
}
}
@@ -79,19 +74,16 @@ public class MySQLUpgradeDao extends UpgradeDao {
*/
@Override
public boolean isExistsColumn(String tableName, String columnName) {
- Connection conn = null;
- try {
- conn = dataSource.getConnection();
- ResultSet rs = conn.getMetaData().getColumns(conn.getCatalog(),
conn.getSchema(), tableName, columnName);
+ try (
+ Connection conn = dataSource.getConnection();
+ ResultSet rs =
+ conn.getMetaData().getColumns(conn.getCatalog(),
conn.getSchema(), tableName, columnName)) {
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(conn);
}
-
}
}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java
index 43761ba564..d854a6ef7a 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
@@ -51,13 +50,10 @@ public class PostgreSQLUpgradeDao extends UpgradeDao {
}
public String getSchema() {
- Connection conn = null;
- PreparedStatement pstmt = null;
- ResultSet resultSet = null;
- try {
- conn = dataSource.getConnection();
- pstmt = conn.prepareStatement("select current_schema()");
- resultSet = pstmt.executeQuery();
+ try (
+ Connection conn = dataSource.getConnection();
+ PreparedStatement pstmt = conn.prepareStatement("select
current_schema()");
+ ResultSet resultSet = pstmt.executeQuery()) {
while (resultSet.next()) {
if (resultSet.isFirst()) {
return resultSet.getString(1);
@@ -66,8 +62,6 @@ public class PostgreSQLUpgradeDao extends UpgradeDao {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(resultSet, pstmt, conn);
}
return "";
}
@@ -80,21 +74,14 @@ public class PostgreSQLUpgradeDao extends UpgradeDao {
*/
@Override
public boolean isExistsTable(String tableName) {
- Connection conn = null;
- ResultSet rs = null;
- try {
- conn = dataSource.getConnection();
-
- rs = conn.getMetaData().getTables(conn.getCatalog(), getSchema(),
tableName, null);
-
+ try (
+ Connection conn = dataSource.getConnection();
+ ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(),
getSchema(), tableName, null)) {
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(rs, conn);
}
-
}
/**
@@ -106,20 +93,14 @@ public class PostgreSQLUpgradeDao extends UpgradeDao {
*/
@Override
public boolean isExistsColumn(String tableName, String columnName) {
- Connection conn = null;
- ResultSet rs = null;
- try {
- conn = dataSource.getConnection();
- rs = conn.getMetaData().getColumns(conn.getCatalog(), getSchema(),
tableName, columnName);
+ try (
+ Connection conn = dataSource.getConnection();
+ ResultSet rs =
conn.getMetaData().getColumns(conn.getCatalog(), getSchema(), tableName,
columnName)) {
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(rs, conn);
-
}
-
}
}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
index 6bfe47baca..d542428a11 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
@@ -17,18 +17,14 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,38 +39,6 @@ public class ResourceDao {
public static final Logger logger =
LoggerFactory.getLogger(ResourceDao.class);
- /**
- * list all resources
- *
- * @param conn connection
- * @return map that key is full_name and value is id
- */
- Map<String, Integer> listAllResources(Connection conn) {
- Map<String, Integer> resourceMap = new HashMap<>();
-
- String sql = String.format("SELECT id,full_name FROM t_ds_resources");
- ResultSet rs = null;
- PreparedStatement pstmt = null;
- try {
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
-
- while (rs.next()) {
- Integer id = rs.getInt(1);
- String fullName = rs.getString(2);
- resourceMap.put(fullName, id);
- }
-
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
- }
-
- return resourceMap;
- }
-
/**
* list all resources by the type
*
@@ -86,15 +50,13 @@ public class ResourceDao {
String sql =
String.format("SELECT full_name, type, size, is_directory FROM
t_ds_resources where type = %d", type);
- ResultSet rs = null;
- PreparedStatement pstmt = null;
- try {
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
+ try (
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
String fullName = rs.getString("full_name");
- Boolean isDirectory = rs.getBoolean("is_directory");
+ boolean isDirectory = rs.getBoolean("is_directory");
long fileSize = rs.getLong("size");
if (StringUtils.isNotBlank(fullName) && !isDirectory) {
@@ -111,16 +73,6 @@ public class ResourceDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- if (Objects.nonNull(pstmt)) {
- try {
- if (!pstmt.isClosed()) {
- pstmt.close();
- }
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- }
- }
}
return resourceSizeMap;
}
@@ -134,9 +86,7 @@ public class ResourceDao {
Map<String, Long> resourceSizeMap = listAllResourcesByFileType(conn,
type);
String sql = "UPDATE t_ds_resources SET size=? where type=? and
full_name=? and is_directory = true";
- PreparedStatement pstmt = null;
- try {
- pstmt = conn.prepareStatement(sql);
+ try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
for (Map.Entry<String, Long> entry : resourceSizeMap.entrySet()) {
pstmt.setLong(1, entry.getValue());
pstmt.setInt(2, type);
@@ -147,17 +97,6 @@ public class ResourceDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
- } finally {
- if (Objects.nonNull(pstmt)) {
- try {
- if (!pstmt.isClosed()) {
- pstmt.close();
- }
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- }
- }
- ConnectionUtils.releaseResource(conn);
}
}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
index 7b4bb6b646..201dd5dfba 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
@@ -17,50 +17,17 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.enums.ConditionType;
-import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
-import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
-import org.apache.dolphinscheduler.dao.upgrade.JsonSplitDao;
-import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao;
-import org.apache.dolphinscheduler.dao.upgrade.ProjectDao;
-import org.apache.dolphinscheduler.dao.upgrade.ScheduleDao;
import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
-import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
import org.apache.dolphinscheduler.spi.enums.DbType;
-import org.apache.commons.collections4.CollectionUtils;
-
import java.io.FileNotFoundException;
-import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
import javax.sql.DataSource;
@@ -69,14 +36,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-
public abstract class UpgradeDao {
public static final Logger logger =
LoggerFactory.getLogger(UpgradeDao.class);
@@ -100,6 +59,7 @@ public abstract class UpgradeDao {
/**
* run init sql to init db schema
+ *
* @param dbType db type
*/
private void runInitSql(DbType dbType) {
@@ -112,8 +72,8 @@ public abstract class UpgradeDao {
initScriptRunner.runScript(initSqlReader);
}
} catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
+ logger.error("Execute init sql file: {} error", sqlFile, e);
+ throw new RuntimeException(String.format("Execute init sql file:
%s error", sqlFile), e);
}
}
@@ -123,56 +83,24 @@ public abstract class UpgradeDao {
public String getCurrentVersion(String versionName) {
String sql = String.format("select version from %s", versionName);
- Connection conn = null;
- ResultSet rs = null;
- PreparedStatement pstmt = null;
String version = null;
- try {
- conn = dataSource.getConnection();
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
-
+ try (
+ Connection conn = dataSource.getConnection();
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ ResultSet rs = pstmt.executeQuery()) {
if (rs.next()) {
version = rs.getString(1);
}
-
return version;
-
} catch (SQLException e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
+ logger.error("Get current version from database error, sql: {}",
sql, e);
+ throw new RuntimeException("Get current version from database
error, sql: " + sql, e);
}
}
public void upgradeDolphinScheduler(String schemaDir) {
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
- upgradeDolphinSchedulerDML(schemaDir);
- }
-
- /**
- * upgrade DolphinScheduler worker group
- * ds-1.3.0 modify the worker group for process definition json
- */
- public void upgradeDolphinSchedulerWorkerGroup() {
- updateProcessDefinitionJsonWorkerGroup();
- }
-
- /**
- * upgrade DolphinScheduler resource list
- * ds-1.3.2 modify the resource list for process definition json
- */
- public void upgradeDolphinSchedulerResourceList() {
- updateProcessDefinitionJsonResourceList();
- }
-
- /**
- * upgrade DolphinScheduler to 2.0.0
- */
- public void upgradeDolphinSchedulerTo200(String schemaDir) {
- processDefinitionJsonSplit();
- upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
+ upgradeDolphinSchedulerDML(schemaDir, "dolphinscheduler_dml.sql");
}
/**
@@ -180,183 +108,51 @@ public abstract class UpgradeDao {
*/
public void upgradeDolphinSchedulerResourceFileSize() {
ResourceDao resourceDao = new ResourceDao();
- try {
+ try (Connection conn = dataSource.getConnection()) {
// update the size of the folder that is the type of file.
-
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0);
+ resourceDao.updateResourceFolderSizeByFileType(conn, 0);
// update the size of the folder that is the type of udf.
-
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1);
+ resourceDao.updateResourceFolderSizeByFileType(conn, 1);
} catch (Exception ex) {
logger.error("Failed to upgrade because of failing to update the
folder's size of resource files.");
}
}
- /**
- * updateProcessDefinitionJsonWorkerGroup
- */
- protected void updateProcessDefinitionJsonWorkerGroup() {
- WorkerGroupDao workerGroupDao = new WorkerGroupDao();
- ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
- Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
- try {
- Map<Integer, String> oldWorkerGroupMap =
workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
- Map<Integer, String> processDefinitionJsonMap =
-
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
-
- for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
- ObjectNode jsonObject =
JSONUtils.parseObject(entry.getValue());
- ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
-
- for (int i = 0; i < tasks.size(); i++) {
- ObjectNode task = (ObjectNode) tasks.path(i);
- ObjectNode workerGroupNode = (ObjectNode)
task.path("workerGroupId");
- int workerGroupId = -1;
- if (workerGroupNode != null &&
workerGroupNode.canConvertToInt()) {
- workerGroupId = workerGroupNode.asInt(-1);
- }
- if (workerGroupId == -1) {
- task.put("workerGroup", "default");
- } else {
- task.put("workerGroup",
oldWorkerGroupMap.get(workerGroupId));
- }
- }
-
- jsonObject.remove("task");
-
- jsonObject.put("tasks", tasks);
-
- replaceProcessDefinitionMap.put(entry.getKey(),
jsonObject.toString());
- }
- if (replaceProcessDefinitionMap.size() > 0) {
-
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),
- replaceProcessDefinitionMap);
- }
- } catch (Exception e) {
- logger.error("update process definition json workergroup error",
e);
- }
- }
-
- protected void updateProcessDefinitionJsonResourceList() {
- ResourceDao resourceDao = new ResourceDao();
- ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
- Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
- try {
- Map<String, Integer> resourcesMap =
resourceDao.listAllResources(dataSource.getConnection());
- Map<Integer, String> processDefinitionJsonMap =
-
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
-
- for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
- ObjectNode jsonObject =
JSONUtils.parseObject(entry.getValue());
- ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
-
- for (int i = 0; i < tasks.size(); i++) {
- ObjectNode task = (ObjectNode) tasks.get(i);
- ObjectNode param = (ObjectNode) task.get("params");
- if (param != null) {
-
- List<ResourceInfo> resourceList =
-
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
- ResourceInfo mainJar =
-
JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
- if (mainJar != null && mainJar.getId() == null) {
- String fullName = mainJar.getRes().startsWith("/")
? mainJar.getRes()
- : String.format("/%s", mainJar.getRes());
- if (resourcesMap.containsKey(fullName)) {
- mainJar.setId(resourcesMap.get(fullName));
- param.put("mainJar",
JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
- }
- }
-
- if (CollectionUtils.isNotEmpty(resourceList)) {
- List<ResourceInfo> newResourceList =
resourceList.stream().map(resInfo -> {
- String fullName =
resInfo.getRes().startsWith("/") ? resInfo.getRes()
- : String.format("/%s",
resInfo.getRes());
- if (resInfo.getId() == null &&
resourcesMap.containsKey(fullName)) {
- resInfo.setId(resourcesMap.get(fullName));
- }
- return resInfo;
- }).collect(Collectors.toList());
- param.put("resourceList",
JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
- }
- }
- task.put("params", param);
-
- }
-
- jsonObject.remove("tasks");
-
- jsonObject.put("tasks", tasks);
-
- replaceProcessDefinitionMap.put(entry.getKey(),
jsonObject.toString());
- }
- if (replaceProcessDefinitionMap.size() > 0) {
-
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),
- replaceProcessDefinitionMap);
- }
- } catch (Exception e) {
- logger.error("update process definition json resource list error",
e);
- }
-
- }
-
- private void upgradeDolphinSchedulerDML(String schemaDir) {
+ private void upgradeDolphinSchedulerDML(String schemaDir, String
scriptFile) {
String schemaVersion = schemaDir.split("_")[0];
- Resource sqlFilePath = new
ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql",
- schemaDir, getDbType().name().toLowerCase()));
- logger.info("sqlSQLFilePath: {}", sqlFilePath);
- Connection conn = null;
- PreparedStatement pstmt = null;
- try {
- conn = dataSource.getConnection();
+ Resource sqlFilePath = new ClassPathResource(
+ String.format("sql/upgrade/%s/%s/%s", schemaDir,
getDbType().name().toLowerCase(), scriptFile));
+ try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Execute the upgraded dolphinscheduler dml
ScriptRunner scriptRunner = new ScriptRunner(conn, false, true);
try (Reader sqlReader = new
InputStreamReader(sqlFilePath.getInputStream())) {
scriptRunner.runScript(sqlReader);
+ String upgradeSQL;
if (isExistsTable(T_VERSION_NAME)) {
// Change version in the version table to the new version
- String upgradeSQL = String.format("update %s set version =
?", T_VERSION_NAME);
- pstmt = conn.prepareStatement(upgradeSQL);
- pstmt.setString(1, schemaVersion);
- pstmt.executeUpdate();
+ upgradeSQL = String.format("update %s set version = ?",
T_VERSION_NAME);
} else if (isExistsTable(T_NEW_VERSION_NAME)) {
// Change version in the version table to the new version
- String upgradeSQL = String.format("update %s set version =
?", T_NEW_VERSION_NAME);
- pstmt = conn.prepareStatement(upgradeSQL);
+ upgradeSQL = String.format("update %s set version = ?",
T_NEW_VERSION_NAME);
+ } else {
+ throw new RuntimeException("The version table does not
exist");
+ }
+ try (PreparedStatement pstmt =
conn.prepareStatement(upgradeSQL)) {
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
}
conn.commit();
}
+ logger.info("Success execute the dml file, schemaDir: {},
ddlScript: {}", schemaDir, scriptFile);
} catch (FileNotFoundException e) {
- try {
- conn.rollback();
- } catch (SQLException e1) {
- logger.error(e1.getMessage(), e1);
- }
- logger.error(e.getMessage(), e);
+ logger.error("Cannot find the DDL file, schemaDir: {}, ddlScript:
{}", schemaDir, scriptFile, e);
throw new RuntimeException("sql file not found ", e);
- } catch (IOException e) {
- try {
- conn.rollback();
- } catch (SQLException e1) {
- logger.error(e1.getMessage(), e1);
- }
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
- try {
- if (null != conn) {
- conn.rollback();
- }
- } catch (SQLException e1) {
- logger.error(e1.getMessage(), e1);
- }
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(pstmt, conn);
+ logger.error("Execute ddl file failed, meet an unknown exception,
schemaDir: {}, ddlScript: {}", schemaDir,
+ scriptFile, e);
+ throw new RuntimeException("Execute ddl file failed, meet an
unknown exception", e);
}
-
}
/**
@@ -364,30 +160,24 @@ public abstract class UpgradeDao {
*
* @param schemaDir schemaDir
*/
- private void upgradeDolphinSchedulerDDL(String schemaDir, String
scriptFile) {
+ public void upgradeDolphinSchedulerDDL(String schemaDir, String
scriptFile) {
Resource sqlFilePath = new ClassPathResource(
String.format("sql/upgrade/%s/%s/%s", schemaDir,
getDbType().name().toLowerCase(), scriptFile));
- Connection conn = null;
- PreparedStatement pstmt = null;
- try {
- conn = dataSource.getConnection();
- String dbName = conn.getCatalog();
- logger.info(dbName);
+ try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(true);
// Execute the dolphinscheduler ddl.sql for the upgrade
ScriptRunner scriptRunner = new ScriptRunner(conn, true, true);
try (Reader sqlReader = new
InputStreamReader(sqlFilePath.getInputStream())) {
scriptRunner.runScript(sqlReader);
}
+ logger.info("Success execute the ddl file, schemaDir: {},
ddlScript: {}", schemaDir, scriptFile);
} catch (FileNotFoundException e) {
-
- logger.error(e.getMessage(), e);
+ logger.error("Cannot find the DDL file, schemaDir: {}, ddlScript:
{}", schemaDir, scriptFile, e);
throw new RuntimeException("sql file not found ", e);
} catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(pstmt, conn);
+ logger.error("Execute ddl file failed, meet an unknown exception,
schemaDir: {}, ddlScript: {}", schemaDir,
+ scriptFile, e);
+ throw new RuntimeException("Execute ddl file failed, meet an
unknown exception", e);
}
}
@@ -403,330 +193,15 @@ public abstract class UpgradeDao {
versionName = "t_ds_version";
}
String upgradeSQL = String.format("update %s set version = ?",
versionName);
- PreparedStatement pstmt = null;
- Connection conn = null;
- try {
- conn = dataSource.getConnection();
- pstmt = conn.prepareStatement(upgradeSQL);
+ try (
+ Connection conn = dataSource.getConnection();
+ PreparedStatement pstmt = conn.prepareStatement(upgradeSQL)) {
pstmt.setString(1, version);
pstmt.executeUpdate();
} catch (SQLException e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException("sql: " + upgradeSQL, e);
- } finally {
- ConnectionUtils.releaseResource(pstmt, conn);
- }
-
- }
-
- /**
- * upgrade DolphinScheduler to 2.0.0, json split
- */
- private void processDefinitionJsonSplit() {
- ProjectDao projectDao = new ProjectDao();
- ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
- ScheduleDao scheduleDao = new ScheduleDao();
- JsonSplitDao jsonSplitDao = new JsonSplitDao();
- try {
- // execute project
- Map<Integer, Long> projectIdCodeMap =
projectDao.queryAllProject(dataSource.getConnection());
- projectDao.updateProjectCode(dataSource.getConnection(),
projectIdCodeMap);
-
- // execute process definition code
- List<ProcessDefinition> processDefinitions =
-
processDefinitionDao.queryProcessDefinition(dataSource.getConnection());
-
processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(),
processDefinitions,
- projectIdCodeMap);
-
- // execute schedule
- Map<Integer, Long> allSchedule =
scheduleDao.queryAllSchedule(dataSource.getConnection());
- Map<Integer, Long> processIdCodeMap = processDefinitions.stream()
- .collect(Collectors.toMap(ProcessDefinition::getId,
ProcessDefinition::getCode));
- scheduleDao.updateScheduleCode(dataSource.getConnection(),
allSchedule, processIdCodeMap);
-
- // json split
- Map<Integer, String> processDefinitionJsonMap =
-
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
- List<ProcessDefinitionLog> processDefinitionLogs = new
ArrayList<>();
- List<ProcessTaskRelationLog> processTaskRelationLogs = new
ArrayList<>();
- List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
- Map<Integer, Map<Long, Map<String, Long>>> processTaskMap = new
HashMap<>();
- splitProcessDefinitionJson(processDefinitions,
processDefinitionJsonMap, processDefinitionLogs,
- processTaskRelationLogs, taskDefinitionLogs,
processTaskMap);
- convertDependence(taskDefinitionLogs, projectIdCodeMap,
processTaskMap);
-
- // execute json split
-
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(),
processDefinitionLogs);
-
jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(),
processTaskRelationLogs);
-
jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(),
taskDefinitionLogs);
- } catch (Exception e) {
- logger.error("json split error", e);
- }
- }
-
- private void splitProcessDefinitionJson(List<ProcessDefinition>
processDefinitions,
- Map<Integer, String>
processDefinitionJsonMap,
- List<ProcessDefinitionLog>
processDefinitionLogs,
- List<ProcessTaskRelationLog>
processTaskRelationLogs,
- List<TaskDefinitionLog>
taskDefinitionLogs,
- Map<Integer, Map<Long, Map<String,
Long>>> processTaskMap) throws Exception {
- Map<Integer, ProcessDefinition> processDefinitionMap =
processDefinitions.stream()
- .collect(Collectors.toMap(ProcessDefinition::getId,
processDefinition -> processDefinition));
- Date now = new Date();
- for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
- if (entry.getValue() == null) {
- throw new Exception("processDefinitionJson is null");
- }
- ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
- ProcessDefinition processDefinition =
processDefinitionMap.get(entry.getKey());
- if (processDefinition != null) {
- processDefinition
- .setTenantId(jsonObject.get("tenantId") == null ? -1 :
jsonObject.get("tenantId").asInt());
-
processDefinition.setTimeout(jsonObject.get("timeout").asInt());
-
processDefinition.setGlobalParams(jsonObject.get("globalParams").toString());
- } else {
- throw new Exception("It can't find processDefinition, please
check !");
- }
- Map<String, Long> taskIdCodeMap = new HashMap<>();
- Map<String, List<String>> taskNamePreMap = new HashMap<>();
- Map<String, Long> taskNameCodeMap = new HashMap<>();
- Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = new
HashMap<>();
- List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
- ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
- for (int i = 0; i < tasks.size(); i++) {
- ObjectNode task = (ObjectNode) tasks.path(i);
- ObjectNode param = (ObjectNode) task.get("params");
- TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
- String taskType = task.get("type").asText();
- if (param != null) {
- JsonNode resourceJsonNode = param.get("resourceList");
- if (resourceJsonNode != null &&
!resourceJsonNode.isEmpty()) {
- List<ResourceInfo> resourceList =
-
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
- List<Integer> resourceIds =
-
resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
-
taskDefinitionLog.setResourceIds(Joiner.on(Constants.COMMA).join(resourceIds));
- } else {
- taskDefinitionLog.setResourceIds("");
- }
- if (TASK_TYPE_SUB_PROCESS.equals(taskType)) {
- JsonNode jsonNodeDefinitionId =
param.get("processDefinitionId");
- if (jsonNodeDefinitionId != null) {
- param.put("processDefinitionCode",
-
processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode());
- param.remove("processDefinitionId");
- }
- }
- param.put("conditionResult", task.get("conditionResult"));
- param.put("dependence", task.get("dependence"));
-
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
- }
- TaskTimeoutParameter timeout =
-
JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")),
TaskTimeoutParameter.class);
- if (timeout != null) {
- taskDefinitionLog.setTimeout(timeout.getInterval());
- taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ?
TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
-
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
- }
- String desc = task.get("description") != null ?
task.get("description").asText()
- : task.get("desc") != null ? task.get("desc").asText()
: "";
- taskDefinitionLog.setDescription(desc);
- taskDefinitionLog.setFlag(
-
Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ?
Flag.YES : Flag.NO);
- taskDefinitionLog.setTaskType(taskType);
- taskDefinitionLog.setFailRetryInterval(
- TASK_TYPE_SUB_PROCESS.equals(taskType) ? 1 :
task.get("retryInterval").asInt());
- taskDefinitionLog.setFailRetryTimes(
- TASK_TYPE_SUB_PROCESS.equals(taskType) ? 0 :
task.get("maxRetryTimes").asInt());
- taskDefinitionLog.setTaskPriority(JSONUtils
-
.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")),
Priority.class));
- String name = task.get("name").asText();
- taskDefinitionLog.setName(name);
- taskDefinitionLog
- .setWorkerGroup(task.get("workerGroup") == null ?
"default" : task.get("workerGroup").asText());
- long taskCode = CodeGenerateUtils.getInstance().genCode();
- taskDefinitionLog.setCode(taskCode);
- taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
-
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
- taskDefinitionLog.setUserId(processDefinition.getUserId());
- taskDefinitionLog.setEnvironmentCode(-1);
- taskDefinitionLog.setDelayTime(0);
- taskDefinitionLog.setOperator(1);
- taskDefinitionLog.setOperateTime(now);
- taskDefinitionLog.setCreateTime(now);
- taskDefinitionLog.setUpdateTime(now);
- taskDefinitionLogList.add(taskDefinitionLog);
- taskIdCodeMap.put(task.get("id").asText(), taskCode);
- List<String> preTasks =
JSONUtils.toList(task.get("preTasks").toString(), String.class);
- taskNamePreMap.put(name, preTasks);
- taskNameCodeMap.put(name, taskCode);
- }
- convertConditions(taskDefinitionLogList, taskNameCodeMap);
- taskDefinitionLogs.addAll(taskDefinitionLogList);
-
processDefinition.setLocations(convertLocations(processDefinition.getLocations(),
taskIdCodeMap));
- ProcessDefinitionLog processDefinitionLog = new
ProcessDefinitionLog(processDefinition);
- processDefinitionLog.setOperator(1);
- processDefinitionLog.setOperateTime(now);
- processDefinitionLog.setUpdateTime(now);
- processDefinitionLogs.add(processDefinitionLog);
- handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap,
processDefinition, processTaskRelationLogs);
- processCodeTaskNameCodeMap.put(processDefinition.getCode(),
taskNameCodeMap);
- processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap);
- }
- }
-
- public void convertConditions(List<TaskDefinitionLog>
taskDefinitionLogList,
- Map<String, Long> taskNameCodeMap) throws
Exception {
- for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
- if (TASK_TYPE_CONDITIONS.equals(taskDefinitionLog.getTaskType())) {
- ObjectMapper objectMapper = new ObjectMapper();
- ObjectNode taskParams =
JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
- // reset conditionResult
- ObjectNode conditionResult = (ObjectNode)
taskParams.get("conditionResult");
- List<String> successNode =
-
JSONUtils.toList(conditionResult.get("successNode").toString(), String.class);
- List<Long> nodeCode = new ArrayList<>();
- successNode.forEach(node ->
nodeCode.add(taskNameCodeMap.get(node)));
- conditionResult.set("successNode",
objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
- List<String> failedNode =
JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class);
- nodeCode.clear();
- failedNode.forEach(node ->
nodeCode.add(taskNameCodeMap.get(node)));
- conditionResult.set("failedNode",
objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
- // reset dependItemList
- ObjectNode dependence = (ObjectNode)
taskParams.get("dependence");
- ArrayNode dependTaskList =
-
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
- for (int i = 0; i < dependTaskList.size(); i++) {
- ObjectNode dependTask = (ObjectNode)
dependTaskList.path(i);
- ArrayNode dependItemList =
-
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
- for (int j = 0; j < dependItemList.size(); j++) {
- ObjectNode dependItem = (ObjectNode)
dependItemList.path(j);
- JsonNode depTasks = dependItem.get("depTasks");
- dependItem.put("depTaskCode",
taskNameCodeMap.get(depTasks.asText()));
- dependItem.remove("depTasks");
- dependItemList.set(j, dependItem);
- }
- dependTask.put("dependItemList", dependItemList);
- dependTaskList.set(i, dependTask);
- }
- dependence.put("dependTaskList", dependTaskList);
-
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
- }
- }
- }
-
- private String convertLocations(String locations, Map<String, Long>
taskIdCodeMap) {
- if (Strings.isNullOrEmpty(locations)) {
- return locations;
- }
- Map<String, ObjectNode> locationsMap =
- JSONUtils.parseObject(locations, new TypeReference<Map<String,
ObjectNode>>() {
- });
- if (locationsMap == null) {
- return locations;
+ logger.error("Update version error, sql: {}", upgradeSQL, e);
+ throw new RuntimeException("Upgrade version error, sql: " +
upgradeSQL, e);
}
- ArrayNode jsonNodes = JSONUtils.createArrayNode();
- for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
- ObjectNode nodes = JSONUtils.createObjectNode();
- nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
- ObjectNode oldNodes = entry.getValue();
- nodes.put("x", oldNodes.get("x").asInt());
- nodes.put("y", oldNodes.get("y").asInt());
- jsonNodes.add(nodes);
- }
- return jsonNodes.toString();
- }
-
- public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
- Map<Integer, Long> projectIdCodeMap,
- Map<Integer, Map<Long, Map<String, Long>>>
processTaskMap) {
- for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
- if (TASK_TYPE_DEPENDENT.equals(taskDefinitionLog.getTaskType())) {
- ObjectNode taskParams =
JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
- ObjectNode dependence = (ObjectNode)
taskParams.get("dependence");
- ArrayNode dependTaskList =
-
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
- for (int i = 0; i < dependTaskList.size(); i++) {
- ObjectNode dependTask = (ObjectNode)
dependTaskList.path(i);
- ArrayNode dependItemList =
-
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
- for (int j = 0; j < dependItemList.size(); j++) {
- ObjectNode dependItem = (ObjectNode)
dependItemList.path(j);
- dependItem.put("projectCode",
projectIdCodeMap.get(dependItem.get("projectId").asInt()));
- int definitionId =
dependItem.get("definitionId").asInt();
- Map<Long, Map<String, Long>>
processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
- if (processCodeTaskNameCodeMap == null) {
- logger.warn(
- "We can't find processDefinition [{}],
please check it is not exist, remove this dependence",
- definitionId);
- dependItemList.remove(j);
- continue;
- }
- Optional<Map.Entry<Long, Map<String, Long>>> mapEntry =
-
processCodeTaskNameCodeMap.entrySet().stream().findFirst();
- if (mapEntry.isPresent()) {
- Map.Entry<Long, Map<String, Long>>
processCodeTaskNameCodeEntry = mapEntry.get();
- dependItem.put("definitionCode",
processCodeTaskNameCodeEntry.getKey());
- String depTasks =
dependItem.get("depTasks").asText();
- long taskCode =
- "ALL".equals(depTasks) ||
processCodeTaskNameCodeEntry.getValue() == null ? 0L
- :
processCodeTaskNameCodeEntry.getValue().get(depTasks);
- dependItem.put("depTaskCode", taskCode);
- }
- dependItem.remove("projectId");
- dependItem.remove("definitionId");
- dependItem.remove("depTasks");
- dependItemList.set(j, dependItem);
- }
- dependTask.put("dependItemList", dependItemList);
- dependTaskList.set(i, dependTask);
- }
- dependence.put("dependTaskList", dependTaskList);
-
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
- }
- }
- }
-
- private void handleProcessTaskRelation(Map<String, List<String>>
taskNamePreMap,
- Map<String, Long> taskNameCodeMap,
- ProcessDefinition processDefinition,
- List<ProcessTaskRelationLog>
processTaskRelationLogs) {
- Date now = new Date();
- for (Map.Entry<String, List<String>> entry :
taskNamePreMap.entrySet()) {
- List<String> entryValue = entry.getValue();
- if (CollectionUtils.isNotEmpty(entryValue)) {
- for (String preTaskName : entryValue) {
- ProcessTaskRelationLog processTaskRelationLog =
setProcessTaskRelationLog(processDefinition, now);
-
processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName));
-
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
-
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
-
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
- processTaskRelationLogs.add(processTaskRelationLog);
- }
- } else {
- ProcessTaskRelationLog processTaskRelationLog =
setProcessTaskRelationLog(processDefinition, now);
- processTaskRelationLog.setPreTaskCode(0);
- processTaskRelationLog.setPreTaskVersion(0);
-
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
-
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
- processTaskRelationLogs.add(processTaskRelationLog);
- }
- }
- }
-
- private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition
processDefinition, Date now) {
- ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
-
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
-
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
-
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
- processTaskRelationLog.setConditionType(ConditionType.NONE);
- processTaskRelationLog.setConditionParams("{}");
- processTaskRelationLog.setOperator(1);
- processTaskRelationLog.setOperateTime(now);
- processTaskRelationLog.setCreateTime(now);
- processTaskRelationLog.setUpdateTime(now);
- return processTaskRelationLog;
}
}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java
index b9d7d0edcf..006a7438df 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java
@@ -21,5 +21,5 @@ public interface DolphinSchedulerUpgrader {
void doUpgrade();
- String getCurrentVersion();
+ DolphinSchedulerVersion getCurrentVersion();
}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java
similarity index 55%
copy from
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java
copy to
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java
index b9d7d0edcf..e28d04fe30 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java
@@ -17,9 +17,31 @@
package org.apache.dolphinscheduler.tools.datasource.upgrader;
-public interface DolphinSchedulerUpgrader {
+import java.util.Optional;
- void doUpgrade();
+public enum DolphinSchedulerVersion {
- String getCurrentVersion();
+ V1_3_0("1.3.0"),
+ V1_3_2("1.3.2"),
+ V2_0_0("2.0.0"),
+ V3_2_0("3.2.0"),
+ ;
+ private final String versionName;
+
+ DolphinSchedulerVersion(String versionName) {
+ this.versionName = versionName;
+ }
+
+ public String getVersionName() {
+ return versionName;
+ }
+
+ public static Optional<DolphinSchedulerVersion> getVersion(String
versionName) {
+ for (DolphinSchedulerVersion version :
DolphinSchedulerVersion.values()) {
+ if (version.getVersionName().equals(versionName)) {
+ return Optional.of(version);
+ }
+ }
+ return Optional.empty();
+ }
}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java
new file mode 100644
index 0000000000..27871277bf
--- /dev/null
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.tools.datasource.upgrader.v130;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao;
+import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
+import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
+
+import java.sql.Connection;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@Slf4j
+@Component
+public class V130DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
+
+ @Autowired
+ private DataSource dataSource;
+
+ @Override
+ public void doUpgrade() {
+ updateProcessDefinitionJsonWorkerGroup();
+ }
+
+ private void updateProcessDefinitionJsonWorkerGroup() {
+ WorkerGroupDao workerGroupDao = new WorkerGroupDao();
+ ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
+ Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
+ try (Connection connection = dataSource.getConnection()) {
+ Map<Integer, String> oldWorkerGroupMap =
workerGroupDao.queryAllOldWorkerGroup(connection);
+ Map<Integer, String> processDefinitionJsonMap =
+ processDefinitionDao.queryAllProcessDefinition(connection);
+
+ for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
+ ObjectNode jsonObject =
JSONUtils.parseObject(entry.getValue());
+ ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
+
+ for (int i = 0; i < tasks.size(); i++) {
+ ObjectNode task = (ObjectNode) tasks.path(i);
+ ObjectNode workerGroupNode = (ObjectNode)
task.path("workerGroupId");
+ int workerGroupId = -1;
+ if (workerGroupNode != null &&
workerGroupNode.canConvertToInt()) {
+ workerGroupId = workerGroupNode.asInt(-1);
+ }
+ if (workerGroupId == -1) {
+ task.put("workerGroup", "default");
+ } else {
+ task.put("workerGroup",
oldWorkerGroupMap.get(workerGroupId));
+ }
+ }
+
+ jsonObject.remove("task");
+
+ jsonObject.put("tasks", tasks);
+
+ replaceProcessDefinitionMap.put(entry.getKey(),
jsonObject.toString());
+ }
+ if (replaceProcessDefinitionMap.size() > 0) {
+ processDefinitionDao.updateProcessDefinitionJson(connection,
replaceProcessDefinitionMap);
+ }
+ } catch (Exception e) {
+ log.error("update process definition json workergroup error", e);
+ }
+ }
+
+ @Override
+ public DolphinSchedulerVersion getCurrentVersion() {
+ return DolphinSchedulerVersion.V1_3_0;
+ }
+}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java
new file mode 100644
index 0000000000..abb7045e34
--- /dev/null
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.tools.datasource.upgrader.v132;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
+import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.sql.DataSource;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@Slf4j
+@Component
+public class V132DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
+
+ @Autowired
+ private DataSource dataSource;
+
+ @Override
+ public void doUpgrade() {
+ updateProcessDefinitionJsonResourceList();
+ }
+
+ private void updateProcessDefinitionJsonResourceList() {
+ ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
+ Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
+ try (Connection connection = dataSource.getConnection()) {
+ Map<String, Integer> resourcesMap = listAllResources(connection);
+ Map<Integer, String> processDefinitionJsonMap =
+ processDefinitionDao.queryAllProcessDefinition(connection);
+
+ for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
+ ObjectNode jsonObject =
JSONUtils.parseObject(entry.getValue());
+ ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
+
+ for (int i = 0; i < tasks.size(); i++) {
+ ObjectNode task = (ObjectNode) tasks.get(i);
+ ObjectNode param = (ObjectNode) task.get("params");
+ if (param != null) {
+
+ List<ResourceInfo> resourceList =
+
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
+ ResourceInfo mainJar =
+
JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
+ if (mainJar != null && mainJar.getId() == null) {
+ String fullName = mainJar.getRes().startsWith("/")
? mainJar.getRes()
+ : String.format("/%s", mainJar.getRes());
+ if (resourcesMap.containsKey(fullName)) {
+ mainJar.setId(resourcesMap.get(fullName));
+ param.put("mainJar",
JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(resourceList)) {
+ List<ResourceInfo> newResourceList =
resourceList.stream().map(resInfo -> {
+ String fullName =
resInfo.getRes().startsWith("/") ? resInfo.getRes()
+ : String.format("/%s",
resInfo.getRes());
+ if (resInfo.getId() == null &&
resourcesMap.containsKey(fullName)) {
+ resInfo.setId(resourcesMap.get(fullName));
+ }
+ return resInfo;
+ }).collect(Collectors.toList());
+ param.put("resourceList",
JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
+ }
+ }
+ task.put("params", param);
+
+ }
+
+ jsonObject.remove("tasks");
+
+ jsonObject.put("tasks", tasks);
+
+ replaceProcessDefinitionMap.put(entry.getKey(),
jsonObject.toString());
+ }
+ if (replaceProcessDefinitionMap.size() > 0) {
+ processDefinitionDao.updateProcessDefinitionJson(connection,
replaceProcessDefinitionMap);
+ }
+ } catch (Exception e) {
+ log.error("update process definition json resource list error", e);
+ }
+
+ }
+
+ /**
+ * list all resources
+ *
+ * @param conn connection
+ * @return map that key is full_name and value is id
+ */
+ private Map<String, Integer> listAllResources(Connection conn) {
+ Map<String, Integer> resourceMap = new HashMap<>();
+
+ String sql = "SELECT id,full_name FROM t_ds_resources";
+ try (
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ ResultSet rs = pstmt.executeQuery()) {
+
+ while (rs.next()) {
+ Integer id = rs.getInt(1);
+ String fullName = rs.getString(2);
+ resourceMap.put(fullName, id);
+ }
+
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ throw new RuntimeException("sql: " + sql, e);
+ }
+
+ return resourceMap;
+ }
+
+ @Override
+ public DolphinSchedulerVersion getCurrentVersion() {
+ return DolphinSchedulerVersion.V1_3_2;
+ }
+}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java
similarity index 56%
copy from
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
copy to
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java
index 7b4bb6b646..c986119d8c 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.tools.datasource.dao;
+package org.apache.dolphinscheduler.tools.datasource.upgrader.v200;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
@@ -27,9 +27,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@@ -38,22 +36,15 @@ import org.apache.dolphinscheduler.dao.upgrade.JsonSplitDao;
import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.upgrade.ProjectDao;
import org.apache.dolphinscheduler.dao.upgrade.ScheduleDao;
-import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
-import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.spi.enums.DbType;
+import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao;
+import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
+import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
import org.apache.commons.collections4.CollectionUtils;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -64,10 +55,11 @@ import java.util.stream.Collectors;
import javax.sql.DataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.Resource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
@@ -77,377 +69,48 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
-public abstract class UpgradeDao {
-
- public static final Logger logger =
LoggerFactory.getLogger(UpgradeDao.class);
- private static final String T_VERSION_NAME = "t_escheduler_version";
- private static final String T_NEW_VERSION_NAME = "t_ds_version";
-
- protected final DataSource dataSource;
-
- protected UpgradeDao(DataSource dataSource) {
- this.dataSource = dataSource;
- }
-
- protected abstract String initSqlPath();
-
- public abstract DbType getDbType();
-
- public void initSchema() {
- // Execute the dolphinscheduler full sql
- runInitSql(getDbType());
- }
-
- /**
- * run init sql to init db schema
- * @param dbType db type
- */
- private void runInitSql(DbType dbType) {
- String sqlFile = String.format("dolphinscheduler_%s.sql",
dbType.getDescp());
- Resource mysqlSQLFilePath = new ClassPathResource("sql/" + sqlFile);
- try (Connection conn = dataSource.getConnection()) {
- // Execute the dolphinscheduler_ddl.sql script to create the table
structure of dolphinscheduler
- ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true);
- try (Reader initSqlReader = new
InputStreamReader(mysqlSQLFilePath.getInputStream())) {
- initScriptRunner.runScript(initSqlReader);
- }
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- public abstract boolean isExistsTable(String tableName);
-
- public abstract boolean isExistsColumn(String tableName, String
columnName);
-
- public String getCurrentVersion(String versionName) {
- String sql = String.format("select version from %s", versionName);
- Connection conn = null;
- ResultSet rs = null;
- PreparedStatement pstmt = null;
- String version = null;
- try {
- conn = dataSource.getConnection();
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
-
- if (rs.next()) {
- version = rs.getString(1);
- }
-
- return version;
+@Slf4j
+@Component
+public class V200DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException("sql: " + sql, e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
- }
- }
+ @Autowired
+ private DataSource dataSource;
- public void upgradeDolphinScheduler(String schemaDir) {
- upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
- upgradeDolphinSchedulerDML(schemaDir);
- }
+ @Lazy()
+ @Autowired
+ private UpgradeDao upgradeDao;
- /**
- * upgrade DolphinScheduler worker group
- * ds-1.3.0 modify the worker group for process definition json
- */
- public void upgradeDolphinSchedulerWorkerGroup() {
- updateProcessDefinitionJsonWorkerGroup();
- }
-
- /**
- * upgrade DolphinScheduler resource list
- * ds-1.3.2 modify the resource list for process definition json
- */
- public void upgradeDolphinSchedulerResourceList() {
- updateProcessDefinitionJsonResourceList();
- }
-
- /**
- * upgrade DolphinScheduler to 2.0.0
- */
- public void upgradeDolphinSchedulerTo200(String schemaDir) {
+ @Override
+ public void doUpgrade() {
processDefinitionJsonSplit();
- upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
- }
-
- /**
- * upgrade DolphinScheduler to 2.0.6
- */
- public void upgradeDolphinSchedulerResourceFileSize() {
- ResourceDao resourceDao = new ResourceDao();
- try {
- // update the size of the folder that is the type of file.
-
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0);
- // update the size of the folder that is the type of udf.
-
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1);
- } catch (Exception ex) {
- logger.error("Failed to upgrade because of failing to update the
folder's size of resource files.");
- }
- }
-
- /**
- * updateProcessDefinitionJsonWorkerGroup
- */
- protected void updateProcessDefinitionJsonWorkerGroup() {
- WorkerGroupDao workerGroupDao = new WorkerGroupDao();
- ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
- Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
- try {
- Map<Integer, String> oldWorkerGroupMap =
workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
- Map<Integer, String> processDefinitionJsonMap =
-
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
-
- for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
- ObjectNode jsonObject =
JSONUtils.parseObject(entry.getValue());
- ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
-
- for (int i = 0; i < tasks.size(); i++) {
- ObjectNode task = (ObjectNode) tasks.path(i);
- ObjectNode workerGroupNode = (ObjectNode)
task.path("workerGroupId");
- int workerGroupId = -1;
- if (workerGroupNode != null &&
workerGroupNode.canConvertToInt()) {
- workerGroupId = workerGroupNode.asInt(-1);
- }
- if (workerGroupId == -1) {
- task.put("workerGroup", "default");
- } else {
- task.put("workerGroup",
oldWorkerGroupMap.get(workerGroupId));
- }
- }
-
- jsonObject.remove("task");
-
- jsonObject.put("tasks", tasks);
-
- replaceProcessDefinitionMap.put(entry.getKey(),
jsonObject.toString());
- }
- if (replaceProcessDefinitionMap.size() > 0) {
-
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),
- replaceProcessDefinitionMap);
- }
- } catch (Exception e) {
- logger.error("update process definition json workergroup error",
e);
- }
+ upgradeDao.upgradeDolphinSchedulerDDL("2.0.0_schema",
"dolphinscheduler_ddl_post.sql");
}
- protected void updateProcessDefinitionJsonResourceList() {
- ResourceDao resourceDao = new ResourceDao();
- ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
- Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
- try {
- Map<String, Integer> resourcesMap =
resourceDao.listAllResources(dataSource.getConnection());
- Map<Integer, String> processDefinitionJsonMap =
-
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
-
- for (Map.Entry<Integer, String> entry :
processDefinitionJsonMap.entrySet()) {
- ObjectNode jsonObject =
JSONUtils.parseObject(entry.getValue());
- ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
-
- for (int i = 0; i < tasks.size(); i++) {
- ObjectNode task = (ObjectNode) tasks.get(i);
- ObjectNode param = (ObjectNode) task.get("params");
- if (param != null) {
-
- List<ResourceInfo> resourceList =
-
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
- ResourceInfo mainJar =
-
JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
- if (mainJar != null && mainJar.getId() == null) {
- String fullName = mainJar.getRes().startsWith("/")
? mainJar.getRes()
- : String.format("/%s", mainJar.getRes());
- if (resourcesMap.containsKey(fullName)) {
- mainJar.setId(resourcesMap.get(fullName));
- param.put("mainJar",
JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
- }
- }
-
- if (CollectionUtils.isNotEmpty(resourceList)) {
- List<ResourceInfo> newResourceList =
resourceList.stream().map(resInfo -> {
- String fullName =
resInfo.getRes().startsWith("/") ? resInfo.getRes()
- : String.format("/%s",
resInfo.getRes());
- if (resInfo.getId() == null &&
resourcesMap.containsKey(fullName)) {
- resInfo.setId(resourcesMap.get(fullName));
- }
- return resInfo;
- }).collect(Collectors.toList());
- param.put("resourceList",
JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
- }
- }
- task.put("params", param);
-
- }
-
- jsonObject.remove("tasks");
-
- jsonObject.put("tasks", tasks);
-
- replaceProcessDefinitionMap.put(entry.getKey(),
jsonObject.toString());
- }
- if (replaceProcessDefinitionMap.size() > 0) {
-
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),
- replaceProcessDefinitionMap);
- }
- } catch (Exception e) {
- logger.error("update process definition json resource list error",
e);
- }
-
- }
-
- private void upgradeDolphinSchedulerDML(String schemaDir) {
- String schemaVersion = schemaDir.split("_")[0];
- Resource sqlFilePath = new
ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql",
- schemaDir, getDbType().name().toLowerCase()));
- logger.info("sqlSQLFilePath: {}", sqlFilePath);
- Connection conn = null;
- PreparedStatement pstmt = null;
- try {
- conn = dataSource.getConnection();
- conn.setAutoCommit(false);
- // Execute the upgraded dolphinscheduler dml
- ScriptRunner scriptRunner = new ScriptRunner(conn, false, true);
- try (Reader sqlReader = new
InputStreamReader(sqlFilePath.getInputStream())) {
- scriptRunner.runScript(sqlReader);
- if (isExistsTable(T_VERSION_NAME)) {
- // Change version in the version table to the new version
- String upgradeSQL = String.format("update %s set version =
?", T_VERSION_NAME);
- pstmt = conn.prepareStatement(upgradeSQL);
- pstmt.setString(1, schemaVersion);
- pstmt.executeUpdate();
- } else if (isExistsTable(T_NEW_VERSION_NAME)) {
- // Change version in the version table to the new version
- String upgradeSQL = String.format("update %s set version =
?", T_NEW_VERSION_NAME);
- pstmt = conn.prepareStatement(upgradeSQL);
- pstmt.setString(1, schemaVersion);
- pstmt.executeUpdate();
- }
- conn.commit();
- }
- } catch (FileNotFoundException e) {
- try {
- conn.rollback();
- } catch (SQLException e1) {
- logger.error(e1.getMessage(), e1);
- }
- logger.error(e.getMessage(), e);
- throw new RuntimeException("sql file not found ", e);
- } catch (IOException e) {
- try {
- conn.rollback();
- } catch (SQLException e1) {
- logger.error(e1.getMessage(), e1);
- }
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
- } catch (Exception e) {
- try {
- if (null != conn) {
- conn.rollback();
- }
- } catch (SQLException e1) {
- logger.error(e1.getMessage(), e1);
- }
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(pstmt, conn);
- }
-
- }
-
- /**
- * upgradeDolphinScheduler DDL
- *
- * @param schemaDir schemaDir
- */
- private void upgradeDolphinSchedulerDDL(String schemaDir, String
scriptFile) {
- Resource sqlFilePath = new ClassPathResource(
- String.format("sql/upgrade/%s/%s/%s", schemaDir,
getDbType().name().toLowerCase(), scriptFile));
- Connection conn = null;
- PreparedStatement pstmt = null;
- try {
- conn = dataSource.getConnection();
- String dbName = conn.getCatalog();
- logger.info(dbName);
- conn.setAutoCommit(true);
- // Execute the dolphinscheduler ddl.sql for the upgrade
- ScriptRunner scriptRunner = new ScriptRunner(conn, true, true);
- try (Reader sqlReader = new
InputStreamReader(sqlFilePath.getInputStream())) {
- scriptRunner.runScript(sqlReader);
- }
- } catch (FileNotFoundException e) {
-
- logger.error(e.getMessage(), e);
- throw new RuntimeException("sql file not found ", e);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
- } finally {
- ConnectionUtils.releaseResource(pstmt, conn);
- }
- }
-
- /**
- * update version
- *
- * @param version version
- */
- public void updateVersion(String version) {
- // Change version in the version table to the new version
- String versionName = T_VERSION_NAME;
- if (!SchemaUtils.isAGreatVersion("1.2.0", version)) {
- versionName = "t_ds_version";
- }
- String upgradeSQL = String.format("update %s set version = ?",
versionName);
- PreparedStatement pstmt = null;
- Connection conn = null;
- try {
- conn = dataSource.getConnection();
- pstmt = conn.prepareStatement(upgradeSQL);
- pstmt.setString(1, version);
- pstmt.executeUpdate();
-
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException("sql: " + upgradeSQL, e);
- } finally {
- ConnectionUtils.releaseResource(pstmt, conn);
- }
-
- }
-
- /**
- * upgrade DolphinScheduler to 2.0.0, json split
- */
private void processDefinitionJsonSplit() {
ProjectDao projectDao = new ProjectDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
ScheduleDao scheduleDao = new ScheduleDao();
JsonSplitDao jsonSplitDao = new JsonSplitDao();
- try {
+ try (Connection connection = dataSource.getConnection()) {
// execute project
- Map<Integer, Long> projectIdCodeMap =
projectDao.queryAllProject(dataSource.getConnection());
- projectDao.updateProjectCode(dataSource.getConnection(),
projectIdCodeMap);
+ Map<Integer, Long> projectIdCodeMap =
projectDao.queryAllProject(connection);
+ projectDao.updateProjectCode(connection, projectIdCodeMap);
// execute process definition code
List<ProcessDefinition> processDefinitions =
-
processDefinitionDao.queryProcessDefinition(dataSource.getConnection());
-
processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(),
processDefinitions,
+ processDefinitionDao.queryProcessDefinition(connection);
+ processDefinitionDao.updateProcessDefinitionCode(connection,
processDefinitions,
projectIdCodeMap);
// execute schedule
- Map<Integer, Long> allSchedule =
scheduleDao.queryAllSchedule(dataSource.getConnection());
+ Map<Integer, Long> allSchedule =
scheduleDao.queryAllSchedule(connection);
Map<Integer, Long> processIdCodeMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId,
ProcessDefinition::getCode));
- scheduleDao.updateScheduleCode(dataSource.getConnection(),
allSchedule, processIdCodeMap);
+ scheduleDao.updateScheduleCode(connection, allSchedule,
processIdCodeMap);
// json split
Map<Integer, String> processDefinitionJsonMap =
-
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
+ processDefinitionDao.queryAllProcessDefinition(connection);
List<ProcessDefinitionLog> processDefinitionLogs = new
ArrayList<>();
List<ProcessTaskRelationLog> processTaskRelationLogs = new
ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
@@ -457,11 +120,11 @@ public abstract class UpgradeDao {
convertDependence(taskDefinitionLogs, projectIdCodeMap,
processTaskMap);
// execute json split
-
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(),
processDefinitionLogs);
-
jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(),
processTaskRelationLogs);
-
jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(),
taskDefinitionLogs);
+ jsonSplitDao.executeJsonSplitProcessDefinition(connection,
processDefinitionLogs);
+ jsonSplitDao.executeJsonSplitProcessTaskRelation(connection,
processTaskRelationLogs);
+ jsonSplitDao.executeJsonSplitTaskDefinition(connection,
taskDefinitionLogs);
} catch (Exception e) {
- logger.error("json split error", e);
+ log.error("json split error", e);
}
}
@@ -576,8 +239,58 @@ public abstract class UpgradeDao {
}
}
- public void convertConditions(List<TaskDefinitionLog>
taskDefinitionLogList,
- Map<String, Long> taskNameCodeMap) throws
Exception {
+ private void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
+ Map<Integer, Long> projectIdCodeMap,
+ Map<Integer, Map<Long, Map<String, Long>>>
processTaskMap) {
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+ if (TASK_TYPE_DEPENDENT.equals(taskDefinitionLog.getTaskType())) {
+ ObjectNode taskParams =
JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
+ ObjectNode dependence = (ObjectNode)
taskParams.get("dependence");
+ ArrayNode dependTaskList =
+
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
+ for (int i = 0; i < dependTaskList.size(); i++) {
+ ObjectNode dependTask = (ObjectNode)
dependTaskList.path(i);
+ ArrayNode dependItemList =
+
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
+ for (int j = 0; j < dependItemList.size(); j++) {
+ ObjectNode dependItem = (ObjectNode)
dependItemList.path(j);
+ dependItem.put("projectCode",
projectIdCodeMap.get(dependItem.get("projectId").asInt()));
+ int definitionId =
dependItem.get("definitionId").asInt();
+ Map<Long, Map<String, Long>>
processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
+ if (processCodeTaskNameCodeMap == null) {
+ log.warn(
+ "We can't find processDefinition [{}],
please check it is not exist, remove this dependence",
+ definitionId);
+ dependItemList.remove(j);
+ continue;
+ }
+ Optional<Map.Entry<Long, Map<String, Long>>> mapEntry =
+
processCodeTaskNameCodeMap.entrySet().stream().findFirst();
+ if (mapEntry.isPresent()) {
+ Map.Entry<Long, Map<String, Long>>
processCodeTaskNameCodeEntry = mapEntry.get();
+ dependItem.put("definitionCode",
processCodeTaskNameCodeEntry.getKey());
+ String depTasks =
dependItem.get("depTasks").asText();
+ long taskCode =
+ "ALL".equals(depTasks) ||
processCodeTaskNameCodeEntry.getValue() == null ? 0L
+ :
processCodeTaskNameCodeEntry.getValue().get(depTasks);
+ dependItem.put("depTaskCode", taskCode);
+ }
+ dependItem.remove("projectId");
+ dependItem.remove("definitionId");
+ dependItem.remove("depTasks");
+ dependItemList.set(j, dependItem);
+ }
+ dependTask.put("dependItemList", dependItemList);
+ dependTaskList.set(i, dependTask);
+ }
+ dependence.put("dependTaskList", dependTaskList);
+
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
+ }
+ }
+ }
+
+ private void convertConditions(List<TaskDefinitionLog>
taskDefinitionLogList,
+ Map<String, Long> taskNameCodeMap) throws
Exception {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
if (TASK_TYPE_CONDITIONS.equals(taskDefinitionLog.getTaskType())) {
ObjectMapper objectMapper = new ObjectMapper();
@@ -639,56 +352,6 @@ public abstract class UpgradeDao {
return jsonNodes.toString();
}
- public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
- Map<Integer, Long> projectIdCodeMap,
- Map<Integer, Map<Long, Map<String, Long>>>
processTaskMap) {
- for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
- if (TASK_TYPE_DEPENDENT.equals(taskDefinitionLog.getTaskType())) {
- ObjectNode taskParams =
JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
- ObjectNode dependence = (ObjectNode)
taskParams.get("dependence");
- ArrayNode dependTaskList =
-
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
- for (int i = 0; i < dependTaskList.size(); i++) {
- ObjectNode dependTask = (ObjectNode)
dependTaskList.path(i);
- ArrayNode dependItemList =
-
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
- for (int j = 0; j < dependItemList.size(); j++) {
- ObjectNode dependItem = (ObjectNode)
dependItemList.path(j);
- dependItem.put("projectCode",
projectIdCodeMap.get(dependItem.get("projectId").asInt()));
- int definitionId =
dependItem.get("definitionId").asInt();
- Map<Long, Map<String, Long>>
processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
- if (processCodeTaskNameCodeMap == null) {
- logger.warn(
- "We can't find processDefinition [{}],
please check it is not exist, remove this dependence",
- definitionId);
- dependItemList.remove(j);
- continue;
- }
- Optional<Map.Entry<Long, Map<String, Long>>> mapEntry =
-
processCodeTaskNameCodeMap.entrySet().stream().findFirst();
- if (mapEntry.isPresent()) {
- Map.Entry<Long, Map<String, Long>>
processCodeTaskNameCodeEntry = mapEntry.get();
- dependItem.put("definitionCode",
processCodeTaskNameCodeEntry.getKey());
- String depTasks =
dependItem.get("depTasks").asText();
- long taskCode =
- "ALL".equals(depTasks) ||
processCodeTaskNameCodeEntry.getValue() == null ? 0L
- :
processCodeTaskNameCodeEntry.getValue().get(depTasks);
- dependItem.put("depTaskCode", taskCode);
- }
- dependItem.remove("projectId");
- dependItem.remove("definitionId");
- dependItem.remove("depTasks");
- dependItemList.set(j, dependItem);
- }
- dependTask.put("dependItemList", dependItemList);
- dependTaskList.set(i, dependTask);
- }
- dependence.put("dependTaskList", dependTaskList);
-
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
- }
- }
- }
-
private void handleProcessTaskRelation(Map<String, List<String>>
taskNamePreMap,
Map<String, Long> taskNameCodeMap,
ProcessDefinition processDefinition,
@@ -729,4 +392,9 @@ public abstract class UpgradeDao {
processTaskRelationLog.setUpdateTime(now);
return processTaskRelationLog;
}
+
+ @Override
+ public DolphinSchedulerVersion getCurrentVersion() {
+ return DolphinSchedulerVersion.V2_0_0;
+ }
}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java
index 293ec2a61a..32e6b011d6 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java
@@ -28,6 +28,7 @@ import
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
+import
org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
import org.apache.commons.collections4.CollectionUtils;
@@ -139,7 +140,7 @@ public class V320DolphinSchedulerUpgrader implements
DolphinSchedulerUpgrader {
}
@Override
- public String getCurrentVersion() {
- return "3.2.0";
+ public DolphinSchedulerVersion getCurrentVersion() {
+ return DolphinSchedulerVersion.V3_2_0;
}
}