This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0e7c98d465 [Feature-9177][Task] The sql task supports configuring
segmentation notation to provide execution of multiple statements (#9917)
0e7c98d465 is described below
commit 0e7c98d4655dad8e1bcea4e3723fb4e632d07779
Author: Kerwin <[email protected]>
AuthorDate: Sun May 8 15:04:35 2022 +0800
[Feature-9177][Task] The sql task supports configuring segmentation
notation to provide execution of multiple statements (#9917)
* Support sql segmentation to execute multiple functions.
---
.../plugin/task/api/parameters/SqlParameters.java | 18 +++++
.../plugin/task/sql/SqlSplitter.java | 65 ++++++++++++++++++
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 80 ++++++++--------------
dolphinscheduler-ui/src/locales/modules/en_US.ts | 2 +
dolphinscheduler-ui/src/locales/modules/zh_CN.ts | 2 +
.../task/components/node/fields/use-sql-type.ts | 10 +++
.../projects/task/components/node/format-data.ts | 1 +
.../projects/task/components/node/tasks/use-sql.ts | 1 +
.../views/projects/task/components/node/types.ts | 1 +
9 files changed, 129 insertions(+), 51 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
index 045495ce05..8c7725ccf8 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
@@ -113,6 +113,15 @@ public class SqlParameters extends AbstractParameters {
private int limit;
+ /**
+ * segment separator
+ *
+ * <p>The segment separator is used
+ * when the data source does not support multi-segment SQL execution,
+ * and the client needs to split the SQL and execute it multiple times.</p>
+ */
+ private String segmentSeparator;
+
public int getLimit() {
return limit;
}
@@ -225,6 +234,14 @@ public class SqlParameters extends AbstractParameters {
this.groupId = groupId;
}
+ public String getSegmentSeparator() {
+ return segmentSeparator;
+ }
+
+ public void setSegmentSeparator(String segmentSeparator) {
+ this.segmentSeparator = segmentSeparator;
+ }
+
@Override
public boolean checkParameters() {
return datasource != 0 && StringUtils.isNotEmpty(type) &&
StringUtils.isNotEmpty(sql);
@@ -292,6 +309,7 @@ public class SqlParameters extends AbstractParameters {
+ ", sendEmail=" + sendEmail
+ ", displayRows=" + displayRows
+ ", limit=" + limit
+ + ", segmentSeparator=" + segmentSeparator
+ ", udfs='" + udfs + '\''
+ ", showType='" + showType + '\''
+ ", connParams='" + connParams + '\''
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlSplitter.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlSplitter.java
new file mode 100644
index 0000000000..a461748a93
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlSplitter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sql;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class SqlSplitter {
+
+ private SqlSplitter() {
+ }
+
+ private static final String LINE_SEPARATOR = "\n";
+
+ /**
+ * split sql by segment separator
+ * <p>The segment separator is used
+ * when the data source does not support multi-segment SQL execution,
+ * and the client needs to split the SQL and execute it multiple times.</p>
+ * @param sql
+ * @param segmentSeparator
+ * @return
+ */
+ public static List<String> split(String sql, String segmentSeparator) {
+ if (StringUtils.isBlank(segmentSeparator)) {
+ return Collections.singletonList(sql);
+ }
+
+ String[] lines = sql.split(LINE_SEPARATOR);
+ List<String> segments = new ArrayList<>();
+ StringBuilder stmt = new StringBuilder();
+ for (String line : lines) {
+ if (line.trim().isEmpty() || line.startsWith("--")) {
+ continue;
+ }
+ stmt.append(LINE_SEPARATOR).append(line);
+ if (line.trim().endsWith(segmentSeparator)) {
+ segments.add(stmt.toString());
+ stmt.setLength(0);
+ }
+ }
+ if (stmt.length() > 0) {
+ segments.add(stmt.toString());
+ }
+ return segments;
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index ad31202004..3b0fa0e8c5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -136,7 +136,11 @@ public class SqlTask extends AbstractTaskExecutor {
sqlTaskExecutionContext.getConnectionParams());
// ready to execute SQL and parameter entity Map
- SqlBinds mainSqlBinds =
getSqlAndSqlParamsMap(sqlParameters.getSql());
+ List<SqlBinds> mainStatementSqlBinds =
SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator())
+ .stream()
+ .map(this::getSqlAndSqlParamsMap)
+ .collect(Collectors.toList());
+
List<SqlBinds> preStatementSqlBinds =
Optional.ofNullable(sqlParameters.getPreStatements())
.orElse(new ArrayList<>())
.stream()
@@ -151,7 +155,7 @@ public class SqlTask extends AbstractTaskExecutor {
List<String> createFuncs =
createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList(), logger);
// execute sql task
- executeFuncAndSql(mainSqlBinds, preStatementSqlBinds,
postStatementSqlBinds, createFuncs);
+ executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds,
postStatementSqlBinds, createFuncs);
setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
@@ -165,17 +169,16 @@ public class SqlTask extends AbstractTaskExecutor {
/**
* execute function and sql
*
- * @param mainSqlBinds main sql binds
+ * @param mainStatementsBinds main statements binds
* @param preStatementsBinds pre statements binds
* @param postStatementsBinds post statements binds
* @param createFuncs create functions
*/
- public void executeFuncAndSql(SqlBinds mainSqlBinds,
+ public void executeFuncAndSql(List<SqlBinds> mainStatementsBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs) throws Exception {
Connection connection = null;
- PreparedStatement stmt = null;
ResultSet resultSet = null;
try {
@@ -186,30 +189,31 @@ public class SqlTask extends AbstractTaskExecutor {
createTempFunction(connection, createFuncs);
}
- // pre sql
- preSql(connection, preStatementsBinds);
- stmt = prepareStatementAndBind(connection, mainSqlBinds);
+ // pre execute
+ executeUpdate(connection, preStatementsBinds, "pre");
+ // main execute
String result = null;
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and
inserted into Alert to send
- resultSet = stmt.executeQuery();
+ resultSet = executeQuery(connection,
mainStatementsBinds.get(0), "main");
result = resultProcess(resultSet);
-
} else if (sqlParameters.getSqlType() ==
SqlType.NON_QUERY.ordinal()) {
// non query statement
- String updateResult = String.valueOf(stmt.executeUpdate());
+ String updateResult = executeUpdate(connection,
mainStatementsBinds, "main");
result = setNonQuerySqlReturn(updateResult,
sqlParameters.getLocalParams());
}
//deal out params
sqlParameters.dealOutParam(result);
- postSql(connection, postStatementsBinds);
+
+ // post execute
+ executeUpdate(connection, postStatementsBinds, "post");
} catch (Exception e) {
logger.error("execute sql error: {}", e.getMessage());
throw e;
} finally {
- close(resultSet, stmt, connection);
+ close(resultSet, connection);
}
}
@@ -288,37 +292,22 @@ public class SqlTask extends AbstractTaskExecutor {
setTaskAlertInfo(taskAlertInfo);
}
- /**
- * pre sql
- *
- * @param connection connection
- * @param preStatementsBinds preStatementsBinds
- */
- private void preSql(Connection connection,
- List<SqlBinds> preStatementsBinds) throws Exception {
- for (SqlBinds sqlBind : preStatementsBinds) {
- try (PreparedStatement pstmt = prepareStatementAndBind(connection,
sqlBind)) {
- int result = pstmt.executeUpdate();
- logger.info("pre statement execute result: {}, for sql: {}",
result, sqlBind.getSql());
-
- }
+ private ResultSet executeQuery(Connection connection, SqlBinds sqlBinds,
String handlerType) throws Exception {
+ try (PreparedStatement statement = prepareStatementAndBind(connection,
sqlBinds)) {
+ logger.info("{} statement execute query, for sql: {}",
handlerType, sqlBinds.getSql());
+ return statement.executeQuery();
}
}
- /**
- * post sql
- *
- * @param connection connection
- * @param postStatementsBinds postStatementsBinds
- */
- private void postSql(Connection connection,
- List<SqlBinds> postStatementsBinds) throws Exception {
- for (SqlBinds sqlBind : postStatementsBinds) {
- try (PreparedStatement pstmt = prepareStatementAndBind(connection,
sqlBind)) {
- int result = pstmt.executeUpdate();
- logger.info("post statement execute result: {},for sql: {}",
result, sqlBind.getSql());
+ private String executeUpdate(Connection connection, List<SqlBinds>
statementsBinds, String handlerType) throws Exception {
+ int result = 0;
+ for (SqlBinds sqlBind : statementsBinds) {
+ try (PreparedStatement statement =
prepareStatementAndBind(connection, sqlBind)) {
+ result = statement.executeUpdate();
+ logger.info("{} statement execute update result: {}, for sql:
{}", handlerType, result, sqlBind.getSql());
}
}
+ return String.valueOf(result);
}
/**
@@ -341,12 +330,9 @@ public class SqlTask extends AbstractTaskExecutor {
* close jdbc resource
*
* @param resultSet resultSet
- * @param pstmt pstmt
* @param connection connection
*/
- private void close(ResultSet resultSet,
- PreparedStatement pstmt,
- Connection connection) {
+ private void close(ResultSet resultSet, Connection connection) {
if (resultSet != null) {
try {
resultSet.close();
@@ -355,14 +341,6 @@ public class SqlTask extends AbstractTaskExecutor {
}
}
- if (pstmt != null) {
- try {
- pstmt.close();
- } catch (SQLException e) {
- logger.error("close prepared statement error : {}",
e.getMessage(), e);
- }
- }
-
if (connection != null) {
try {
connection.close();
diff --git a/dolphinscheduler-ui/src/locales/modules/en_US.ts
b/dolphinscheduler-ui/src/locales/modules/en_US.ts
index 4a391d0301..67b26f1ae6 100644
--- a/dolphinscheduler-ui/src/locales/modules/en_US.ts
+++ b/dolphinscheduler-ui/src/locales/modules/en_US.ts
@@ -924,6 +924,8 @@ const project = {
required: 'required',
emr_flow_define_json: 'jobFlowDefineJson',
emr_flow_define_json_tips: 'Please enter the definition of the job flow.',
+ segment_separator: 'Segment Execution Separator',
+ segment_separator_tips: 'Please enter the segment execution separator',
zeppelin_note_id: 'zeppelinNoteId',
zeppelin_note_id_tips: 'Please enter the note id of your zeppelin note',
zeppelin_paragraph_id: 'zeppelinParagraphId',
diff --git a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
index 88d0aab6c3..ea8a12809f 100644
--- a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
+++ b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
@@ -913,6 +913,8 @@ const project = {
required: '必填',
emr_flow_define_json: 'jobFlowDefineJson',
emr_flow_define_json_tips: '请输入工作流定义',
+ segment_separator: '分段执行符号',
+ segment_separator_tips: '请输入分段执行符号',
zeppelin_note_id: 'zeppelin_note_id',
zeppelin_note_id_tips: '请输入zeppelin note id',
zeppelin_paragraph_id: 'zeppelin_paragraph_id',
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts
index 1e93c9e208..3ad01cf846 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts
@@ -24,6 +24,7 @@ import type { IJsonItem } from '../types'
export function useSqlType(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const querySpan = computed(() => (model.sqlType === '0' ? 6 : 0))
+ const nonQuerySpan = computed(() => (model.sqlType === '1' ? 6 : 0))
const emailSpan = computed(() =>
model.sqlType === '0' && model.sendEmail ? 24 : 0
)
@@ -67,6 +68,15 @@ export function useSqlType(model: { [field: string]: any }):
IJsonItem[] {
required: true
}
},
+ {
+ type: 'input',
+ field: 'segmentSeparator',
+ name: t('project.node.segment_separator'),
+ props: {
+ placeholder: t('project.node.segment_separator_tips')
+ },
+ span: nonQuerySpan
+ },
{
type: 'switch',
field: 'sendEmail',
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index d391203e47..dc583e7c0c 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -179,6 +179,7 @@ export function formatParams(data: INodeData): {
taskParams.sqlType = data.sqlType
taskParams.preStatements = data.preStatements
taskParams.postStatements = data.postStatements
+ taskParams.segmentSeparator = data.segmentSeparator
taskParams.sendEmail = data.sendEmail
taskParams.displayRows = data.displayRows
if (data.sqlType === '0' && data.sendEmail) {
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts
index de02be448e..63111d54fe 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts
@@ -46,6 +46,7 @@ export function useSql({
timeout: 30,
type: 'MYSQL',
displayRows: 10,
+ segmentSeparator: '',
sql: '',
sqlType: '0',
preStatements: [],
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index d334de299f..9d9e413db1 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -241,6 +241,7 @@ interface ITaskParams {
datasource?: string
sql?: string
sqlType?: string
+ segmentSeparator?: string
sendEmail?: boolean
displayRows?: number
title?: string