This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch 3.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.2-prepare by this push:
new b2ce1fb7d3 fix procedure task output param (#12894)
b2ce1fb7d3 is described below
commit b2ce1fb7d3e1f6d9a9be2749061c571460a7c798
Author: caishunfeng <[email protected]>
AuthorDate: Mon Nov 14 19:48:35 2022 +0800
fix procedure task output param (#12894)
---
docs/docs/en/guide/task/stored-procedure.md | 48 +++++++++++++--------
docs/docs/zh/guide/task/stored-procedure.md | 19 ++++++--
docs/img/procedure-en.png | Bin 165706 -> 0 bytes
docs/img/procedure_edit.png | Bin 49986 -> 115800 bytes
.../plugin/task/procedure/ProcedureTask.java | 32 ++++++++++----
5 files changed, 68 insertions(+), 31 deletions(-)
diff --git a/docs/docs/en/guide/task/stored-procedure.md
b/docs/docs/en/guide/task/stored-procedure.md
index 5aff75207c..a1a3942091 100644
--- a/docs/docs/en/guide/task/stored-procedure.md
+++ b/docs/docs/en/guide/task/stored-procedure.md
@@ -2,27 +2,39 @@
- Execute the stored procedure according to the selected DataSource.
-> Drag from the toolbar

task node into the canvas, as shown in the figure below:
+> Drag from the `PROCEDURE` task node into the canvas, as shown in the figure
below:
<p align="center">
- <img src="../../../../img/procedure-en.png" width="80%" />
+ <img src="../../../../img/procedure_edit.png" width="80%" />
</p>
## Task Parameters
-| **Parameter** | **Description** |
-| ------- | ---------- |
-| Node Name | Set the name of the task. Node names within a workflow
definition are unique. |
-| Run flag | Indicates whether the node can be scheduled normally. If it is
not necessary to execute, you can turn on the prohibiting execution switch. |
-| Description | Describes the function of this node. |
-| Task priority | When the number of worker threads is insufficient, they are
executed in order from high to low according to the priority, and they are
executed according to the first-in, first-out principle when the priority is
the same. |
-| Worker group | The task is assigned to the machines in the worker group for
execution. If Default is selected, a worker machine will be randomly selected
for execution. |
-| Task group name | The group in Resources, if not configured, it will not be
used. |
-| Environment Name | Configure the environment in which to run the script. |
-| Number of failed retries | The number of times the task is resubmitted after
failure. It supports drop-down and manual filling. |
-| Failure Retry Interval | The time interval for resubmitting the task if the
task fails. It supports drop-down and manual filling. |
-| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task
exceeds the "timeout duration", an alarm email will be sent and the task
execution will fail. |
-| DataSource | The DataSource type of the stored procedure supports MySQL and
POSTGRESQL, select the corresponding DataSource. |
-| Method | The method name of the stored procedure. |
-| Custom parameters | The custom parameter types of the stored procedure
support `IN` and `OUT`, and the data types support: VARCHAR, INTEGER, LONG,
FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and BOOLEAN. |
-| Predecessor task | Selecting the predecessor task of the current task will
set the selected predecessor task as the upstream of the current task. |
\ No newline at end of file
+| **Parameter** |
**Description**
|
+|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Node Name | Set the name of the task. Node names within a
workflow definition are unique.
|
+| Run flag | Indicates whether the node can be scheduled
normally. If it is not necessary to execute, you can turn on the prohibiting
execution switch.
|
+| Description | Describes the function of this node.
|
+| Task priority | When the number of worker threads is
insufficient, they are executed in order from high to low according to the
priority, and they are executed according to the first-in, first-out principle
when the priority is the same. |
+| Worker group | The task is assigned to the machines in the
worker group for execution. If Default is selected, a worker machine will be
randomly selected for execution.
|
+| Task group name | The group in Resources, if not configured, it
will not be used.
|
+| Environment Name | Configure the environment in which to run the
script.
|
+| Number of failed retries | The number of times the task is resubmitted after
failure. It supports drop-down and manual filling.
|
+| Failure Retry Interval | The time interval for resubmitting the task if
the task fails. It supports drop-down and manual filling.
|
+| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the
task exceeds the "timeout duration", an alarm email will be sent and the task
execution will fail.
|
+| DataSource | The DataSource type of the stored procedure
supports MySQL, POSTGRESQL, ORACLE.
|
+| SQL Statement | call a stored procedure, such as `call
test(${in1},${out1});`.
|
+| Custom parameters | The custom parameter types of the stored
procedure support `IN` and `OUT`, and the data types support: VARCHAR, INTEGER,
LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and BOOLEAN.
|
+| Predecessor task | Selecting the predecessor task of the current
task will set the selected predecessor task as the upstream of the current
task.
|
+
+## Remark
+
+- Prepare: Create a stored procedure in the database, such as:
+
+ ```
+ CREATE PROCEDURE dolphinscheduler.test(in in1 INT, out out1 INT)
+ begin
+ set out1=in1;
+ END
+ ```
+
diff --git a/docs/docs/zh/guide/task/stored-procedure.md
b/docs/docs/zh/guide/task/stored-procedure.md
index bca32550ee..51140f3bce 100644
--- a/docs/docs/zh/guide/task/stored-procedure.md
+++ b/docs/docs/zh/guide/task/stored-procedure.md
@@ -1,12 +1,23 @@
# 存储过程节点
- 根据选择的数据源,执行存储过程。
->
拖动工具栏中的任务节点到画板中,如下图所示:
+
+> 拖动工具栏中的`PROCEDURE`任务节点到画板中,如下图所示:
<p align="center">
<img src="/img/procedure_edit.png" width="80%" />
</p>
-- 数据源:存储过程的数据源类型支持MySQL和POSTGRESQL两种,选择对应的数据源
-- 方法:是存储过程的方法名称
--
自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型
+- 前提:在该数据库里面创建存储过程,如:
+
+```
+CREATE PROCEDURE dolphinscheduler.test(in in1 INT, out out1 INT)
+begin
+ set out1=in1;
+END
+```
+
+- 数据源:存储过程的数据源类型支持MySQL、POSTGRESQL、ORACLE,选择对应的数据源
+- SQL Statement:调用存储过程,如 `call test(${in1},${out1});`;
+-
自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型;
+
diff --git a/docs/img/procedure-en.png b/docs/img/procedure-en.png
deleted file mode 100644
index b4679b37ab..0000000000
Binary files a/docs/img/procedure-en.png and /dev/null differ
diff --git a/docs/img/procedure_edit.png b/docs/img/procedure_edit.png
index 004c0ba1b2..06bcd99d47 100644
Binary files a/docs/img/procedure_edit.png and b/docs/img/procedure_edit.png
differ
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 4f61ef1022..bfc89c113f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -73,14 +73,16 @@ public class ProcedureTask extends AbstractTaskExecutor {
logger.info("procedure task params {}",
taskExecutionContext.getTaskParams());
- this.procedureParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ProcedureParameters.class);
+ this.procedureParameters =
+ JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ProcedureParameters.class);
// check parameters
if (!procedureParameters.checkParameters()) {
throw new RuntimeException("procedure task params is not valid");
}
- procedureTaskExecutionContext =
procedureParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ procedureTaskExecutionContext =
+
procedureParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}
@Override
@@ -97,13 +99,22 @@ public class ProcedureTask extends AbstractTaskExecutor {
// load class
DbType dbType = DbType.valueOf(procedureParameters.getType());
// get datasource
- ConnectionParam connectionParam =
DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
- procedureTaskExecutionContext.getConnectionParams());
+ ConnectionParam connectionParam =
+
DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
+
procedureTaskExecutionContext.getConnectionParams());
// get jdbc connection
connection =
DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
+
Map<Integer, Property> sqlParamsMap = new HashMap<>();
+
Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext, getParameters());
+ if (procedureParameters.getOutProperty() != null) {
+ // set out params before format sql
+ paramsMap.putAll(procedureParameters.getOutProperty());
+ }
+
+ // format sql
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method
stmt = connection.prepareCall(proceduerSql);
@@ -131,7 +142,8 @@ public class ProcedureTask extends AbstractTaskExecutor {
private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String,
Property> paramsMap) {
// combining local and global parameters
- setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap,
paramsMap, taskExecutionContext.getTaskInstanceId());
+ setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap,
paramsMap,
+ taskExecutionContext.getTaskInstanceId());
return procedureParameters.getMethod().replaceAll(rgex, "?");
}
@@ -162,8 +174,8 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @return outParameterMap
* @throws Exception Exception
*/
- private Map<Integer, Property> getOutParameterMap(CallableStatement stmt,
Map<Integer, Property> paramsMap
- , Map<String, Property> totalParamsMap) throws Exception {
+ private Map<Integer, Property> getOutParameterMap(CallableStatement stmt,
Map<Integer, Property> paramsMap,
+ Map<String, Property>
totalParamsMap) throws Exception {
Map<Integer, Property> outParameterMap = new HashMap<>();
if (procedureParameters.getLocalParametersMap() == null) {
return outParameterMap;
@@ -174,7 +186,8 @@ public class ProcedureTask extends AbstractTaskExecutor {
for (Map.Entry<Integer, Property> entry : paramsMap.entrySet()) {
Property property = entry.getValue();
if (property.getDirect().equals(Direct.IN)) {
- ParameterUtils.setInParameter(index, stmt,
property.getType(), totalParamsMap.get(property.getProp()).getValue());
+ ParameterUtils.setInParameter(index, stmt,
property.getType(),
+ totalParamsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index, stmt, property.getType(),
totalParamsMap.get(property.getProp()).getValue());
outParameterMap.put(index, property);
@@ -231,7 +244,8 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @param dataType dataType
* @throws SQLException SQLException
*/
- private Object getOutputParameter(CallableStatement stmt, int index,
String prop, DataType dataType) throws SQLException {
+ private Object getOutputParameter(CallableStatement stmt, int index,
String prop,
+ DataType dataType) throws SQLException {
Object value = null;
switch (dataType) {
case VARCHAR: