This is an automated email from the ASF dual-hosted git repository.
zhoujieguang pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 68d2670 [improve] More pythonic way for parameter local_params in
class task (#53)
68d2670 is described below
commit 68d2670242ca9e47d4eb4b878334ab3fe972b262
Author: JieguangZhou <[email protected]>
AuthorDate: Tue Dec 20 17:16:28 2022 +0800
[improve] More pythonic way for parameter local_params in class task (#53)
---
.gitignore | 3 +
docs/source/concept.rst | 32 +++++
examples/yaml_define/MoreConfiguration.yaml | 14 +-
src/pydolphinscheduler/core/parameter.py | 147 +++++++++++++++++++++
src/pydolphinscheduler/core/task.py | 94 ++++++++++++-
src/pydolphinscheduler/core/yaml_workflow.py | 35 ++++-
.../examples/local_parameter_example.py | 116 ++++++++++++++++
tests/core/test_local_parameter.py | 116 ++++++++++++++++
tests/core/test_task.py | 64 +++++++++
tests/core/test_yaml_workflow.py | 16 +++
10 files changed, 627 insertions(+), 10 deletions(-)
diff --git a/.gitignore b/.gitignore
index d20c274..aaf4a8f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,3 +15,6 @@ build/
.coverage
coverage.xml
htmlcov/
+
+# the pydolphinscheduler config
+config.yaml
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 084980c..28a797a 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -261,6 +261,38 @@ After that, we could see new file named ``bare-create.py``
is be created in reso
Both parameter ``resource_list`` in workflow and task is list of string
which mean you could upload and reference
multiple files. For more complex usage, you could read
:doc:`howto/multi-resources`.
+Local Parameters
+----------------
+
+In DolphinScheduler, we can define parameter in task, aka Local Parameter.
+
+We can set parameters to variables in tasks to better manage our tasks.
+
+For example:
+
+
+.. literalinclude::
../../src/pydolphinscheduler/examples/local_parameter_example.py
+ :start-after: [start parameter example]
+ :end-before: [end parameter example]
+ :language: python
+
+
+There are two ways to define local parameters:
+
+.. literalinclude::
../../src/pydolphinscheduler/examples/local_parameter_example.py
+ :start-after: [start parameter define]
+ :end-before: [end parameter define]
+ :language: python
+
+
+Full example:
+
+.. literalinclude::
../../src/pydolphinscheduler/examples/local_parameter_example.py
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+ :language: python
+
+
Authentication Token
--------------------
diff --git a/examples/yaml_define/MoreConfiguration.yaml
b/examples/yaml_define/MoreConfiguration.yaml
index 678315a..849ee4b 100644
--- a/examples/yaml_define/MoreConfiguration.yaml
+++ b/examples/yaml_define/MoreConfiguration.yaml
@@ -30,11 +30,19 @@ tasks:
command: |
echo "$ENV{HOME}"
echo "${n}"
+ echo "123" >> text.txt
task_priority: "HIGH"
delay_time: 20
fail_retry_times: 30
fail_retry_interval: 5
timeout: 60
- is_cahce: true
- local_params:
- - { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }
+ is_cache: true
+ input_params:
+ value_VARCHAR: "abc"
+ value_INTEGER: 123
+ value_FLOAT: 0.1
+ value_BOOLEAN: True
+ output_params:
+ value_output: ""
+ value_output2: VARCHAR()
+ value_data_path: FILE(text.txt)
diff --git a/src/pydolphinscheduler/core/parameter.py
b/src/pydolphinscheduler/core/parameter.py
new file mode 100644
index 0000000..6bac357
--- /dev/null
+++ b/src/pydolphinscheduler/core/parameter.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DolphinScheduler parameter object."""
+
+from pydolphinscheduler.exceptions import PyDSParamException
+
+
+class Direction:
+ """Constants for direction."""
+
+ IN = "IN"
+ OUT = "OUT"
+
+
+class BaseDataType:
+ """Base data type.
+
+ Use to convert value to ParameterType
+ """
+
+ def __init__(self, value=None):
+ self.data_type = self.__class__.__name__
+ self.value = self.convert_value(value) if value is not None else ""
+
+ def convert_value(self, value=None):
+ """Convert value."""
+ if value is None or value == "":
+ return ""
+ else:
+ return self._convert(value)
+
+ def _convert(self, value=None):
+ return str(value)
+
+ def __eq__(self, data):
+ return (
+ type(self) == type(data)
+ and self.data_type == data.data_type
+ and self.value == data.value
+ )
+
+
+def create_data_type(class_name, convert_func=None):
+ """Create ParameterType and set the convert_func."""
+ convert = convert_func or BaseDataType._convert
+ return type(class_name, (BaseDataType,), {"_convert": convert})
+
+
+class ParameterType:
+ """ParameterType corresponds to dolphinscheduler."""
+
+ VARCHAR = create_data_type("VARCHAR", str)
+ LONG = create_data_type("LONG")
+ INTEGER = create_data_type("INTEGER", int)
+ FLOAT = create_data_type("FLOAT", float)
+ DOUBLE = create_data_type("DOUBLE")
+ DATE = create_data_type("DATE")
+ TIME = create_data_type("TIME")
+ TIMESTAMP = create_data_type("TIMESTAMP")
+ BOOLEAN = create_data_type("BOOLEAN", bool)
+ LIST = create_data_type("LIST")
+ FILE = create_data_type("FILE")
+
+ type_sets = {
+ key: value for key, value in locals().items() if not
key.startswith("_")
+ }
+
+ _TYPE_MAPPING = {
+ "int": INTEGER,
+ "float": FLOAT,
+ "ScalarFloat": FLOAT,
+ "str": VARCHAR,
+ "bool": BOOLEAN,
+ "NoneType": VARCHAR,
+ }
+
+
+class Parameter:
+ """Parameter."""
+
+ def __init__(self, name, direction, data_type, value=None):
+ self.name = name
+ self.direction = direction
+ self.data_type = data_type
+ self.value = value or ""
+
+ @property
+ def data(self):
+ """Convert to local_params in task define."""
+ return {
+ "prop": self.name,
+ "direct": self.direction,
+ "type": self.data_type,
+ "value": self.value,
+ }
+
+
+class ParameterHelper:
+ """Use for task to handle parameters."""
+
+ @staticmethod
+ def convert_params(params, direction):
+ """Convert params to format local_params.
+
+ :param params: dict[str, Any], the input_params or output_params of
Task.
+ :param direction: [Direction.IN | Direction.OUT], direction of
parameter.
+ """
+ parameters = []
+ params = params or {}
+ if not isinstance(params, dict):
+ raise PyDSParamException("input_params must be a dict")
+ for key, value in params.items():
+ if not isinstance(value, BaseDataType):
+ data_type_cls = ParameterHelper.infer_parameter_type(value)
+ value = data_type_cls(value)
+
+ parameter = Parameter(key, direction, value.data_type, value.value)
+ parameters.append(parameter)
+ return [p.data for p in parameters]
+
+ @staticmethod
+ def infer_parameter_type(value):
+ """Infer to ParameterType from the input value."""
+ value_type = type(value).__name__
+
+ if value_type not in ParameterType._TYPE_MAPPING:
+ raise PyDSParamException(
+ f"Can not infer parameter type {value}, please use
ParameterType"
+ )
+
+ data_type_cls = ParameterType._TYPE_MAPPING[value_type]
+ return data_type_cls
diff --git a/src/pydolphinscheduler/core/task.py
b/src/pydolphinscheduler/core/task.py
index 7b942eb..f0d3e83 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -33,6 +33,7 @@ from pydolphinscheduler.constants import (
TaskPriority,
TaskTimeoutFlag,
)
+from pydolphinscheduler.core.parameter import BaseDataType, Direction,
ParameterHelper
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.core.workflow import Workflow, WorkflowContext
@@ -82,7 +83,28 @@ class TaskRelation(Base):
class Task(Base):
- """Task object, parent class for all exactly task type."""
+ """Task object, parent class for all exactly task type.
+
+ :param name: The name of the task. Node names within the same workflow
must be unique.
+ :param task_type:
+ :param description: default None
+ :param flag: default TaskFlag.YES,
+ :param task_priority: default TaskPriority.MEDIUM
+ :param worker_group: default configuration.WORKFLOW_WORKER_GROUP
+ :param environment_name: default None
+ :param delay_time: deault 0
+ :param fail_retry_times: default 0
+ :param fail_retry_interval: default 1
+ :param timeout_notify_strategy: default, None
+ :param timeout: default None
+ :param resource_list: default None
+ :param wait_start_timeout: default None
+ :param condition_result: default None,
+ :param resource_plugin: default None
+ :param is_cache: default False
+ :param input_params: default None, input parameters, {param_name:
param_value}
+ :param output_params: default None, input parameters, {param_name:
param_value}
+ """
_DEFINE_ATTR = {
"name",
@@ -137,13 +159,14 @@ class Task(Base):
timeout_notify_strategy: Optional = None,
timeout: Optional[timedelta] = None,
workflow: Optional[Workflow] = None,
- local_params: Optional[List] = None,
resource_list: Optional[List] = None,
dependence: Optional[Dict] = None,
wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
is_cache: Optional[bool] = False,
+ input_params: Optional[Dict] = None,
+ output_params: Optional[Dict] = None,
*args,
**kwargs,
):
@@ -161,6 +184,8 @@ class Task(Base):
self.timeout_notify_strategy = timeout_notify_strategy
self._timeout: timedelta = timeout
self._workflow = None
+ self._input_params = input_params or {}
+ self._output_params = output_params or {}
if "process_definition" in kwargs:
warnings.warn(
"The `process_definition` parameter is deprecated, please use
`workflow` instead.",
@@ -169,6 +194,16 @@ class Task(Base):
self.workflow = kwargs.pop("process_definition")
else:
self.workflow: Workflow = workflow or WorkflowContext.get()
+
+ if "local_params" in kwargs:
+ warnings.warn(
+ """The `local_params` parameter is deprecated,
+ please use `input_params` and `output_params` instead.
+ """,
+ DeprecationWarning,
+ )
+ self._local_params = kwargs.get("local_params")
+
self._upstream_task_codes: Set[int] = set()
self._downstream_task_codes: Set[int] = set()
self._task_relation: Set[TaskRelation] = set()
@@ -185,7 +220,6 @@ class Task(Base):
)
# Attribute for task param
- self.local_params = local_params or []
self._resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
@@ -415,3 +449,57 @@ class Task(Base):
if self._environment_name is None:
return None
return gateway.query_environment_info(self._environment_name)
+
+ @property
+ def local_params(self):
+ """Convert local params."""
+ local_params = (
+ copy.deepcopy(self._local_params) if hasattr(self,
"_local_params") else []
+ )
+ local_params.extend(
+ ParameterHelper.convert_params(self._input_params, Direction.IN)
+ )
+ local_params.extend(
+ ParameterHelper.convert_params(self._output_params, Direction.OUT)
+ )
+ return local_params
+
+ def add_in(
+ self,
+ name: str,
+ value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
+ ):
+ """Add input parameters.
+
+ :param name: name of the input parameter.
+ :param value: value of the input parameter.
+
+ It could be simply command::
+
+ task.add_in("a")
+ task.add_in("b", 123)
+ task.add_in("c", bool)
+ task.add_in("d", ParameterType.LONG(123))
+
+ """
+ self._input_params[name] = value
+
+ def add_out(
+ self,
+ name: str,
+ value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
+ ):
+ """Add output parameters.
+
+ :param name: name of the output parameter.
+ :param value: value of the output parameter.
+
+ It could be simply command::
+
+ task.add_out("a")
+ task.add_out("b", 123)
+ task.add_out("c", bool)
+ task.add_out("d", ParameterType.LONG(123))
+
+ """
+ self._output_params[name] = value
diff --git a/src/pydolphinscheduler/core/yaml_workflow.py
b/src/pydolphinscheduler/core/yaml_workflow.py
index 507a518..b7a4744 100644
--- a/src/pydolphinscheduler/core/yaml_workflow.py
+++ b/src/pydolphinscheduler/core/yaml_workflow.py
@@ -24,6 +24,8 @@ from pathlib import Path
from typing import Any, Dict
from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.constants import Symbol
+from pydolphinscheduler.core.parameter import ParameterType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
@@ -75,6 +77,24 @@ class ParseTool:
return string_param
+ @staticmethod
+ def parse_string_param_if_parameter(string_param: str, **kwargs):
+ """Use TYPE(value) to set local params."""
+ key_path = kwargs.get("key_path")
+ if key_path.split(Symbol.POINT)[0] not in {"input_params",
"output_params"}:
+ return string_param
+
+ if not isinstance(string_param, str):
+ return string_param
+
+ result = re.findall(r"^(.*?)\((.*?)\)", string_param)
+ if len(result) == 1 and len(result[0]) == 2:
+ type_ = result[0][0].rstrip()
+ value = result[0][1].rstrip()
+ return ParameterType.type_sets[type_](value)
+ else:
+ return string_param
+
@staticmethod
def get_possible_path(file_path, base_folder):
"""Get file possible path.
@@ -123,6 +143,7 @@ class YamlWorkflow(YamlParser):
ParseTool.parse_string_param_if_file,
ParseTool.parse_string_param_if_env,
ParseTool.parse_string_param_if_config,
+ ParseTool.parse_string_param_if_parameter,
]
def __init__(self, yaml_file: str):
@@ -178,7 +199,7 @@ class YamlWorkflow(YamlParser):
return workflow_name
- def parse_params(self, params: Any):
+ def parse_params(self, params: Any, key_path=""):
"""Recursively resolves the parameter values.
The function operates params only when it encounters a string; other
types continue recursively.
@@ -186,17 +207,23 @@ class YamlWorkflow(YamlParser):
if isinstance(params, str):
for parse_rule in self._parse_rules:
params_ = params
- params = parse_rule(params, base_folder=self._base_folder)
+ params = parse_rule(
+ params, base_folder=self._base_folder, key_path=key_path
+ )
if params_ != params:
logger.info(f"parse {params_} -> {params}")
elif isinstance(params, list):
for index in range(len(params)):
- params[index] = self.parse_params(params[index])
+ params[index] = self.parse_params(params[index], key_path)
elif isinstance(params, dict):
for key, value in params.items():
- params[key] = self.parse_params(value)
+ if not key_path:
+ new_key_path = key
+ else:
+ new_key_path = key_path + Symbol.POINT + key
+ params[key] = self.parse_params(value, new_key_path)
return params
diff --git a/src/pydolphinscheduler/examples/local_parameter_example.py
b/src/pydolphinscheduler/examples/local_parameter_example.py
new file mode 100644
index 0000000..f00d685
--- /dev/null
+++ b/src/pydolphinscheduler/examples/local_parameter_example.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+r"""
+A tutorial example set local parameter in pydolphinscheduler.
+
+Method 1:
+ task = Shell(..., input_params={"input":"a"}, output_params={"output":
"b"})
+
+Method 2:
+ task = Shell(...)
+ task.add_in("input", "a")
+ task.add_out("output", "b")
+"""
+
+from pydolphinscheduler.core.parameter import ParameterType
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.tasks.shell import Shell
+
+with Workflow(name="local_parameter_example", release_state="offline") as
workflow:
+
+ # [start parameter example]
+ # define a parameter "a", and use it in Shell task
+ example1_input_params = Shell(
+ name="example1_input_params",
+ command="echo ${a}",
+ input_params={
+ "a": "123",
+ },
+ )
+
+ # define a parameter "random_value", and pass it to the downstream tasks
+ example2_output_params = Shell(
+ name="example2_output_params",
+ command="""
+ val=$(echo $RANDOM)
+ echo "#{setValue(random_value=${val})}"
+ echo $val
+ """,
+ output_params={
+ "random_value": "",
+ },
+ )
+
+ # use the parameter "random_value", from upstream tasks
+ # we don't need to define input_params again if the parameter is from
upstram tasks
+ example2_input_params = Shell(
+ name="example2_input_params", command="""echo ${random_value}"""
+ )
+
+ example2_output_params >> example2_input_params
+ # [end parameter example]
+
+ # [start parameter define]
+ # Add parameter via task arguments
+ task_1 = Shell(
+ name="task_1",
+ command="echo hello pydolphinscheduler",
+ input_params={
+ "value_VARCHAR": "abc",
+ "value_INTEGER": 123,
+ "value_FLOAT": 0.1,
+ "value_BOOLEAN": True,
+ },
+ output_params={
+ "value_EMPTY": None,
+ },
+ )
+
+ # Add parameter via task instance's method
+ task_2 = Shell(name="task_2", command="echo hello pydolphinscheduler")
+
+ task_2.add_in("value_VARCHAR", "abc")
+ task_2.add_in("value_INTEGER", 123)
+ task_2.add_in("value_FLOAT", 0.1)
+ task_2.add_in("value_BOOLEAN", True)
+ task_2.add_out("value_EMPTY")
+
+ # Task 1 is the same as task 2
+
+ # Others parameter types which cannot be converted automatically, must
declare type explicitly
+ task_3 = Shell(
+ name="task_3",
+ command="echo '123' >> test.txt",
+ input_params={
+ "value_LONG": ParameterType.LONG("1000000"),
+ "value_DATE": ParameterType.DATE("2022-01-02"),
+ "value_TIME": ParameterType.TIME("2022-01-01"),
+ "value_TIMESTAMP": ParameterType.TIMESTAMP(123123124125),
+ "value_LIST": ParameterType.LIST("123123"),
+ },
+ output_params={
+ "output_INTEGER": ParameterType.INTEGER(100),
+ "output_LIST": ParameterType.LIST(),
+ "output_FILE": ParameterType.FILE("test.txt"),
+ },
+ )
+
+ workflow.submit()
+ # [end parameter define]
+# [end workflow_declare]
diff --git a/tests/core/test_local_parameter.py
b/tests/core/test_local_parameter.py
new file mode 100644
index 0000000..4a1b0c2
--- /dev/null
+++ b/tests/core/test_local_parameter.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test parameter."""
+
+
+import pytest
+
+from pydolphinscheduler.core.parameter import (
+ BaseDataType,
+ Direction,
+ ParameterHelper,
+ ParameterType,
+)
+from pydolphinscheduler.exceptions import PyDSParamException
+
+
[email protected](
+ "value, expect",
+ [
+ (123456, ParameterType.INTEGER),
+ (0.5, ParameterType.FLOAT),
+ ("abc", ParameterType.VARCHAR),
+ (None, ParameterType.VARCHAR),
+ (True, ParameterType.BOOLEAN),
+ (False, ParameterType.BOOLEAN),
+ ],
+)
+def test_infer_type_of_parameters(value, expect):
+ """Test the infer function."""
+ cls = ParameterHelper.infer_parameter_type(value)
+ assert cls == expect
+
+
[email protected](
+ "value",
+ [list(), dict(), set()],
+)
+def test_infer_type_of_parameters_error(value):
+ """Test the infer function error."""
+ with pytest.raises(
+ PyDSParamException,
+ match="Can not infer parameter type",
+ ):
+ ParameterHelper.infer_parameter_type(value)
+
+
[email protected](
+ "value, expect_type, expect_value",
+ [
+ (ParameterType.VARCHAR("123"), "VARCHAR", "123"),
+ (ParameterType.VARCHAR(123), "VARCHAR", "123"),
+ (ParameterType.VARCHAR(), "VARCHAR", ""),
+ (ParameterType.LONG(123), "LONG", "123"),
+ (ParameterType.LONG(), "LONG", ""),
+ (ParameterType.INTEGER(123), "INTEGER", 123),
+ (ParameterType.INTEGER("123"), "INTEGER", 123),
+ (ParameterType.INTEGER(), "INTEGER", ""),
+ (ParameterType.FLOAT(123), "FLOAT", float(123)),
+ (ParameterType.FLOAT("123"), "FLOAT", float(123)),
+ (ParameterType.FLOAT(), "FLOAT", ""),
+ (ParameterType.DOUBLE(123), "DOUBLE", "123"),
+ (ParameterType.DOUBLE(), "DOUBLE", ""),
+ (ParameterType.DATE("2022-01-01"), "DATE", "2022-01-01"),
+ (ParameterType.DATE(), "DATE", ""),
+ (ParameterType.TIME("2022-01-01"), "TIME", "2022-01-01"),
+ (ParameterType.TIME(), "TIME", ""),
+ (ParameterType.TIMESTAMP(123123123), "TIMESTAMP", "123123123"),
+ (ParameterType.TIMESTAMP(), "TIMESTAMP", ""),
+ (ParameterType.BOOLEAN(True), "BOOLEAN", True),
+ (ParameterType.BOOLEAN(), "BOOLEAN", ""),
+ (ParameterType.LIST("abc"), "LIST", "abc"),
+ (ParameterType.LIST(), "LIST", ""),
+ (ParameterType.FILE("task1.output"), "FILE", "task1.output"),
+ ],
+)
+def test_parameter_define(value: BaseDataType, expect_type: str, expect_value):
+ """Test the parameter define."""
+ assert value.data_type == expect_type
+ assert value.value == expect_value
+
+
+def test_convert_params():
+ """Test the ParameterHelper convert_params function."""
+ params = {
+ "value_INTEGER": 123,
+ "value_LONG": ParameterType.LONG("1000000"),
+ }
+ results = ParameterHelper.convert_params(params, direction=Direction.IN)
+ expect = [
+ {"prop": "value_INTEGER", "direct": "IN", "type": "INTEGER", "value":
123},
+ {"prop": "value_LONG", "direct": "IN", "type": "LONG", "value":
"1000000"},
+ ]
+ assert results == expect
+
+ results = ParameterHelper.convert_params(params, direction=Direction.OUT)
+ expect = [
+ {"prop": "value_INTEGER", "direct": "OUT", "type": "INTEGER", "value":
123},
+ {"prop": "value_LONG", "direct": "OUT", "type": "LONG", "value":
"1000000"},
+ ]
+
+ assert results == expect
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 250573a..d34b6d2 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -25,6 +25,7 @@ from unittest.mock import PropertyMock, patch
import pytest
+from pydolphinscheduler.core.parameter import ParameterType
from pydolphinscheduler.core.task import Task, TaskRelation
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.exceptions import PyResPluginException
@@ -509,3 +510,66 @@ def test_python_resource_list(
resource_list=resources,
)
assert task.resource_list == expect
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_local_parameter(m_code_version):
+ """Test task local_params."""
+ base_local_params = [
+ {"prop": "base", "direct": "IN", "type": "VARCHAR", "value": "2022"},
+ ]
+
+ task = Task(name="test", task_type="task_type",
local_params=base_local_params)
+
+ assert task.local_params == base_local_params
+
+ task = Task(
+ name="test",
+ task_type="task_type",
+ input_params={"a": 123, "b": True},
+ output_params={"c": "ccc", "d": ParameterType.LONG(123)},
+ )
+
+ expect = [
+ {"prop": "a", "direct": "IN", "type": "INTEGER", "value": 123},
+ {"prop": "b", "direct": "IN", "type": "BOOLEAN", "value": True},
+ {"prop": "c", "direct": "OUT", "type": "VARCHAR", "value": "ccc"},
+ {"prop": "d", "direct": "OUT", "type": "LONG", "value": "123"},
+ ]
+
+ assert task.local_params == expect
+
+ task = Task(
+ name="test",
+ task_type="task_type",
+ local_params=base_local_params,
+ input_params={"a": 123, "b": True},
+ output_params={"c": "ccc", "d": ParameterType.LONG(123)},
+ )
+
+ expect = [
+ {"prop": "base", "direct": "IN", "type": "VARCHAR", "value": "2022"},
+ {"prop": "a", "direct": "IN", "type": "INTEGER", "value": 123},
+ {"prop": "b", "direct": "IN", "type": "BOOLEAN", "value": True},
+ {"prop": "c", "direct": "OUT", "type": "VARCHAR", "value": "ccc"},
+ {"prop": "d", "direct": "OUT", "type": "LONG", "value": "123"},
+ ]
+
+ assert task.local_params == expect
+
+ task.add_in("e", "${e}")
+ task.add_out("f")
+
+ new_params = [
+ {"prop": "e", "direct": "IN", "type": "VARCHAR", "value": "${e}"},
+ {"prop": "f", "direct": "OUT", "type": "VARCHAR", "value": ""},
+ ]
+ expect.extend(new_params)
+
+ def sorted_func(x):
+ return (x["prop"], x["direct"])
+
+ assert sorted(task.local_params, key=sorted_func) == sorted(expect,
key=sorted_func)
diff --git a/tests/core/test_yaml_workflow.py b/tests/core/test_yaml_workflow.py
index 60cf813..244965f 100644
--- a/tests/core/test_yaml_workflow.py
+++ b/tests/core/test_yaml_workflow.py
@@ -24,6 +24,7 @@ from unittest.mock import patch
import pytest
from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.parameter import ParameterType
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.core.yaml_workflow import (
ParseTool,
@@ -68,6 +69,21 @@ def test_parse_tool_config(string_param, expect_key):
assert expect == ParseTool.parse_string_param_if_config(string_param)
[email protected](
+ "string_param, key_path, expect",
+ [
+ ("VARCHAR()", "input_params", ParameterType.VARCHAR()),
+ ("FILE(data.txt)", "output_params", ParameterType.FILE("data.txt")),
+ (123, "output_params", 123),
+ (True, "output_params", True),
+ ],
+)
+def test_parse_tool_parameter(string_param, key_path, expect):
+ """Test parsing parameter."""
+ value = ParseTool.parse_string_param_if_parameter(string_param,
key_path=key_path)
+ assert value == expect
+
+
def test_parse_possible_yaml_file():
"""Test parsing possible path."""
folder = Path(path_yaml_example)