This is an automated email from the ASF dual-hosted git repository.

zhoujieguang pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 68d2670  [improve] More pythonic way for parameter local_params in 
class task  (#53)
68d2670 is described below

commit 68d2670242ca9e47d4eb4b878334ab3fe972b262
Author: JieguangZhou <[email protected]>
AuthorDate: Tue Dec 20 17:16:28 2022 +0800

    [improve] More pythonic way for parameter local_params in class task  (#53)
---
 .gitignore                                         |   3 +
 docs/source/concept.rst                            |  32 +++++
 examples/yaml_define/MoreConfiguration.yaml        |  14 +-
 src/pydolphinscheduler/core/parameter.py           | 147 +++++++++++++++++++++
 src/pydolphinscheduler/core/task.py                |  94 ++++++++++++-
 src/pydolphinscheduler/core/yaml_workflow.py       |  35 ++++-
 .../examples/local_parameter_example.py            | 116 ++++++++++++++++
 tests/core/test_local_parameter.py                 | 116 ++++++++++++++++
 tests/core/test_task.py                            |  64 +++++++++
 tests/core/test_yaml_workflow.py                   |  16 +++
 10 files changed, 627 insertions(+), 10 deletions(-)

diff --git a/.gitignore b/.gitignore
index d20c274..aaf4a8f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,3 +15,6 @@ build/
 .coverage
 coverage.xml
 htmlcov/
+
+# the pydolphinscheduler config
+config.yaml
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 084980c..28a797a 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -261,6 +261,38 @@ After that, we could see new file named ``bare-create.py`` 
is be created in reso
    Both parameter ``resource_list`` in workflow and task is list of string 
which mean you could upload and reference
    multiple files. For more complex usage, you could read 
:doc:`howto/multi-resources`.
 
+Local Parameters
+----------------
+
+In DolphinScheduler, we can define parameter in task, aka Local Parameter.
+
+We can set parameters to variables in tasks to better manage our tasks.
+
+For example:
+
+
+.. literalinclude:: 
../../src/pydolphinscheduler/examples/local_parameter_example.py
+   :start-after: [start parameter example]
+   :end-before: [end parameter example]
+   :language: python
+
+
+There are two ways to define local parameters:
+
+.. literalinclude:: 
../../src/pydolphinscheduler/examples/local_parameter_example.py
+   :start-after: [start parameter define]
+   :end-before: [end parameter define]
+   :language: python
+
+
+Full example:
+
+.. literalinclude:: 
../../src/pydolphinscheduler/examples/local_parameter_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+   :language: python
+
+
 Authentication Token
 --------------------
 
diff --git a/examples/yaml_define/MoreConfiguration.yaml 
b/examples/yaml_define/MoreConfiguration.yaml
index 678315a..849ee4b 100644
--- a/examples/yaml_define/MoreConfiguration.yaml
+++ b/examples/yaml_define/MoreConfiguration.yaml
@@ -30,11 +30,19 @@ tasks:
     command: |
       echo "$ENV{HOME}"
       echo "${n}"
+      echo "123" >> text.txt
     task_priority: "HIGH"
     delay_time: 20
     fail_retry_times: 30
     fail_retry_interval: 5
     timeout: 60
-    is_cahce: true
-    local_params:
-      - { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }
+    is_cache: true
+    input_params:
+      value_VARCHAR: "abc"
+      value_INTEGER: 123
+      value_FLOAT: 0.1
+      value_BOOLEAN: True
+    output_params:
+      value_output: ""
+      value_output2: VARCHAR()
+      value_data_path: FILE(text.txt)
diff --git a/src/pydolphinscheduler/core/parameter.py 
b/src/pydolphinscheduler/core/parameter.py
new file mode 100644
index 0000000..6bac357
--- /dev/null
+++ b/src/pydolphinscheduler/core/parameter.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""DolphinScheduler parameter object."""
+
+from pydolphinscheduler.exceptions import PyDSParamException
+
+
+class Direction:
+    """Constants for direction."""
+
+    IN = "IN"
+    OUT = "OUT"
+
+
+class BaseDataType:
+    """Base data type.
+
+    Use to convert value to ParameterType
+    """
+
+    def __init__(self, value=None):
+        self.data_type = self.__class__.__name__
+        self.value = self.convert_value(value) if value is not None else ""
+
+    def convert_value(self, value=None):
+        """Convert value."""
+        if value is None or value == "":
+            return ""
+        else:
+            return self._convert(value)
+
+    def _convert(self, value=None):
+        return str(value)
+
+    def __eq__(self, data):
+        return (
+            type(self) == type(data)
+            and self.data_type == data.data_type
+            and self.value == data.value
+        )
+
+
+def create_data_type(class_name, convert_func=None):
+    """Create ParameterType and set the convert_func."""
+    convert = convert_func or BaseDataType._convert
+    return type(class_name, (BaseDataType,), {"_convert": convert})
+
+
+class ParameterType:
+    """ParameterType corresponds to dolphinscheduler."""
+
+    VARCHAR = create_data_type("VARCHAR", str)
+    LONG = create_data_type("LONG")
+    INTEGER = create_data_type("INTEGER", int)
+    FLOAT = create_data_type("FLOAT", float)
+    DOUBLE = create_data_type("DOUBLE")
+    DATE = create_data_type("DATE")
+    TIME = create_data_type("TIME")
+    TIMESTAMP = create_data_type("TIMESTAMP")
+    BOOLEAN = create_data_type("BOOLEAN", bool)
+    LIST = create_data_type("LIST")
+    FILE = create_data_type("FILE")
+
+    type_sets = {
+        key: value for key, value in locals().items() if not 
key.startswith("_")
+    }
+
+    _TYPE_MAPPING = {
+        "int": INTEGER,
+        "float": FLOAT,
+        "ScalarFloat": FLOAT,
+        "str": VARCHAR,
+        "bool": BOOLEAN,
+        "NoneType": VARCHAR,
+    }
+
+
+class Parameter:
+    """Parameter."""
+
+    def __init__(self, name, direction, data_type, value=None):
+        self.name = name
+        self.direction = direction
+        self.data_type = data_type
+        self.value = value or ""
+
+    @property
+    def data(self):
+        """Convert to local_params in task define."""
+        return {
+            "prop": self.name,
+            "direct": self.direction,
+            "type": self.data_type,
+            "value": self.value,
+        }
+
+
+class ParameterHelper:
+    """Use for task to handle parameters."""
+
+    @staticmethod
+    def convert_params(params, direction):
+        """Convert params to format local_params.
+
+        :param params: dict[str, Any], the input_params or output_params of 
Task.
+        :param direction: [Direction.IN | Direction.OUT], direction of 
parameter.
+        """
+        parameters = []
+        params = params or {}
+        if not isinstance(params, dict):
+            raise PyDSParamException("input_params must be a dict")
+        for key, value in params.items():
+            if not isinstance(value, BaseDataType):
+                data_type_cls = ParameterHelper.infer_parameter_type(value)
+                value = data_type_cls(value)
+
+            parameter = Parameter(key, direction, value.data_type, value.value)
+            parameters.append(parameter)
+        return [p.data for p in parameters]
+
+    @staticmethod
+    def infer_parameter_type(value):
+        """Infer to ParameterType from the input value."""
+        value_type = type(value).__name__
+
+        if value_type not in ParameterType._TYPE_MAPPING:
+            raise PyDSParamException(
+                f"Can not infer parameter type {value}, please use 
ParameterType"
+            )
+
+        data_type_cls = ParameterType._TYPE_MAPPING[value_type]
+        return data_type_cls
diff --git a/src/pydolphinscheduler/core/task.py 
b/src/pydolphinscheduler/core/task.py
index 7b942eb..f0d3e83 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -33,6 +33,7 @@ from pydolphinscheduler.constants import (
     TaskPriority,
     TaskTimeoutFlag,
 )
+from pydolphinscheduler.core.parameter import BaseDataType, Direction, 
ParameterHelper
 from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.core.resource_plugin import ResourcePlugin
 from pydolphinscheduler.core.workflow import Workflow, WorkflowContext
@@ -82,7 +83,28 @@ class TaskRelation(Base):
 
 
 class Task(Base):
-    """Task object, parent class for all exactly task type."""
+    """Task object, parent class for all exactly task type.
+
+    :param name: The name of the task. Node names within the same workflow 
must be unique.
+    :param task_type:
+    :param description: default None
+    :param flag: default TaskFlag.YES,
+    :param task_priority: default TaskPriority.MEDIUM
+    :param worker_group: default configuration.WORKFLOW_WORKER_GROUP
+    :param environment_name: default None
+    :param delay_time: deault 0
+    :param fail_retry_times: default 0
+    :param fail_retry_interval: default 1
+    :param timeout_notify_strategy: default, None
+    :param timeout: default None
+    :param resource_list: default None
+    :param wait_start_timeout: default None
+    :param condition_result: default None,
+    :param resource_plugin: default None
+    :param is_cache: default False
+    :param input_params: default None, input parameters, {param_name: 
param_value}
+    :param output_params: default None, input parameters, {param_name: 
param_value}
+    """
 
     _DEFINE_ATTR = {
         "name",
@@ -137,13 +159,14 @@ class Task(Base):
         timeout_notify_strategy: Optional = None,
         timeout: Optional[timedelta] = None,
         workflow: Optional[Workflow] = None,
-        local_params: Optional[List] = None,
         resource_list: Optional[List] = None,
         dependence: Optional[Dict] = None,
         wait_start_timeout: Optional[Dict] = None,
         condition_result: Optional[Dict] = None,
         resource_plugin: Optional[ResourcePlugin] = None,
         is_cache: Optional[bool] = False,
+        input_params: Optional[Dict] = None,
+        output_params: Optional[Dict] = None,
         *args,
         **kwargs,
     ):
@@ -161,6 +184,8 @@ class Task(Base):
         self.timeout_notify_strategy = timeout_notify_strategy
         self._timeout: timedelta = timeout
         self._workflow = None
+        self._input_params = input_params or {}
+        self._output_params = output_params or {}
         if "process_definition" in kwargs:
             warnings.warn(
                 "The `process_definition` parameter is deprecated, please use 
`workflow` instead.",
@@ -169,6 +194,16 @@ class Task(Base):
             self.workflow = kwargs.pop("process_definition")
         else:
             self.workflow: Workflow = workflow or WorkflowContext.get()
+
+        if "local_params" in kwargs:
+            warnings.warn(
+                """The `local_params` parameter is deprecated,
+                please use `input_params` and `output_params` instead.
+                """,
+                DeprecationWarning,
+            )
+            self._local_params = kwargs.get("local_params")
+
         self._upstream_task_codes: Set[int] = set()
         self._downstream_task_codes: Set[int] = set()
         self._task_relation: Set[TaskRelation] = set()
@@ -185,7 +220,6 @@ class Task(Base):
             )
 
         # Attribute for task param
-        self.local_params = local_params or []
         self._resource_list = resource_list or []
         self.dependence = dependence or {}
         self.wait_start_timeout = wait_start_timeout or {}
@@ -415,3 +449,57 @@ class Task(Base):
         if self._environment_name is None:
             return None
         return gateway.query_environment_info(self._environment_name)
+
+    @property
+    def local_params(self):
+        """Convert local params."""
+        local_params = (
+            copy.deepcopy(self._local_params) if hasattr(self, 
"_local_params") else []
+        )
+        local_params.extend(
+            ParameterHelper.convert_params(self._input_params, Direction.IN)
+        )
+        local_params.extend(
+            ParameterHelper.convert_params(self._output_params, Direction.OUT)
+        )
+        return local_params
+
+    def add_in(
+        self,
+        name: str,
+        value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
+    ):
+        """Add input parameters.
+
+        :param name: name of the input parameter.
+        :param value: value of the input parameter.
+
+        It could be simply command::
+
+            task.add_in("a")
+            task.add_in("b", 123)
+            task.add_in("c", bool)
+            task.add_in("d", ParameterType.LONG(123))
+
+        """
+        self._input_params[name] = value
+
+    def add_out(
+        self,
+        name: str,
+        value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
+    ):
+        """Add output parameters.
+
+        :param name: name of the output parameter.
+        :param value: value of the output parameter.
+
+        It could be simply command::
+
+            task.add_out("a")
+            task.add_out("b", 123)
+            task.add_out("c", bool)
+            task.add_out("d", ParameterType.LONG(123))
+
+        """
+        self._output_params[name] = value
diff --git a/src/pydolphinscheduler/core/yaml_workflow.py 
b/src/pydolphinscheduler/core/yaml_workflow.py
index 507a518..b7a4744 100644
--- a/src/pydolphinscheduler/core/yaml_workflow.py
+++ b/src/pydolphinscheduler/core/yaml_workflow.py
@@ -24,6 +24,8 @@ from pathlib import Path
 from typing import Any, Dict
 
 from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.constants import Symbol
+from pydolphinscheduler.core.parameter import ParameterType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
@@ -75,6 +77,24 @@ class ParseTool:
 
         return string_param
 
+    @staticmethod
+    def parse_string_param_if_parameter(string_param: str, **kwargs):
+        """Use TYPE(value) to set local params."""
+        key_path = kwargs.get("key_path")
+        if key_path.split(Symbol.POINT)[0] not in {"input_params", 
"output_params"}:
+            return string_param
+
+        if not isinstance(string_param, str):
+            return string_param
+
+        result = re.findall(r"^(.*?)\((.*?)\)", string_param)
+        if len(result) == 1 and len(result[0]) == 2:
+            type_ = result[0][0].rstrip()
+            value = result[0][1].rstrip()
+            return ParameterType.type_sets[type_](value)
+        else:
+            return string_param
+
     @staticmethod
     def get_possible_path(file_path, base_folder):
         """Get file possible path.
@@ -123,6 +143,7 @@ class YamlWorkflow(YamlParser):
         ParseTool.parse_string_param_if_file,
         ParseTool.parse_string_param_if_env,
         ParseTool.parse_string_param_if_config,
+        ParseTool.parse_string_param_if_parameter,
     ]
 
     def __init__(self, yaml_file: str):
@@ -178,7 +199,7 @@ class YamlWorkflow(YamlParser):
 
         return workflow_name
 
-    def parse_params(self, params: Any):
+    def parse_params(self, params: Any, key_path=""):
         """Recursively resolves the parameter values.
 
         The function operates params only when it encounters a string; other 
types continue recursively.
@@ -186,17 +207,23 @@ class YamlWorkflow(YamlParser):
         if isinstance(params, str):
             for parse_rule in self._parse_rules:
                 params_ = params
-                params = parse_rule(params, base_folder=self._base_folder)
+                params = parse_rule(
+                    params, base_folder=self._base_folder, key_path=key_path
+                )
                 if params_ != params:
                     logger.info(f"parse {params_} -> {params}")
 
         elif isinstance(params, list):
             for index in range(len(params)):
-                params[index] = self.parse_params(params[index])
+                params[index] = self.parse_params(params[index], key_path)
 
         elif isinstance(params, dict):
             for key, value in params.items():
-                params[key] = self.parse_params(value)
+                if not key_path:
+                    new_key_path = key
+                else:
+                    new_key_path = key_path + Symbol.POINT + key
+                params[key] = self.parse_params(value, new_key_path)
 
         return params
 
diff --git a/src/pydolphinscheduler/examples/local_parameter_example.py 
b/src/pydolphinscheduler/examples/local_parameter_example.py
new file mode 100644
index 0000000..f00d685
--- /dev/null
+++ b/src/pydolphinscheduler/examples/local_parameter_example.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+r"""
+A tutorial example set local parameter in pydolphinscheduler.
+
+Method 1:
+    task = Shell(..., input_params={"input":"a"}, output_params={"output": 
"b"})
+
+Method 2:
+    task = Shell(...)
+    task.add_in("input", "a")
+    task.add_out("output", "b")
+"""
+
+from pydolphinscheduler.core.parameter import ParameterType
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.tasks.shell import Shell
+
+with Workflow(name="local_parameter_example", release_state="offline") as 
workflow:
+
+    # [start parameter example]
+    # define a parameter "a", and use it in Shell task
+    example1_input_params = Shell(
+        name="example1_input_params",
+        command="echo ${a}",
+        input_params={
+            "a": "123",
+        },
+    )
+
+    # define a parameter "random_value", and pass it to the downstream tasks
+    example2_output_params = Shell(
+        name="example2_output_params",
+        command="""
+        val=$(echo $RANDOM)
+        echo "#{setValue(random_value=${val})}"
+        echo $val
+        """,
+        output_params={
+            "random_value": "",
+        },
+    )
+
+    # use the parameter "random_value", from upstream tasks
+    # we don't need to define input_params again if the parameter is from 
upstram tasks
+    example2_input_params = Shell(
+        name="example2_input_params", command="""echo ${random_value}"""
+    )
+
+    example2_output_params >> example2_input_params
+    # [end parameter example]
+
+    # [start parameter define]
+    # Add parameter via task arguments
+    task_1 = Shell(
+        name="task_1",
+        command="echo hello pydolphinscheduler",
+        input_params={
+            "value_VARCHAR": "abc",
+            "value_INTEGER": 123,
+            "value_FLOAT": 0.1,
+            "value_BOOLEAN": True,
+        },
+        output_params={
+            "value_EMPTY": None,
+        },
+    )
+
+    # Add parameter via task instance's method
+    task_2 = Shell(name="task_2", command="echo hello pydolphinscheduler")
+
+    task_2.add_in("value_VARCHAR", "abc")
+    task_2.add_in("value_INTEGER", 123)
+    task_2.add_in("value_FLOAT", 0.1)
+    task_2.add_in("value_BOOLEAN", True)
+    task_2.add_out("value_EMPTY")
+
+    # Task 1 is the same as task 2
+
+    # Others parameter types which cannot be converted automatically, must 
declare type explicitly
+    task_3 = Shell(
+        name="task_3",
+        command="echo '123' >> test.txt",
+        input_params={
+            "value_LONG": ParameterType.LONG("1000000"),
+            "value_DATE": ParameterType.DATE("2022-01-02"),
+            "value_TIME": ParameterType.TIME("2022-01-01"),
+            "value_TIMESTAMP": ParameterType.TIMESTAMP(123123124125),
+            "value_LIST": ParameterType.LIST("123123"),
+        },
+        output_params={
+            "output_INTEGER": ParameterType.INTEGER(100),
+            "output_LIST": ParameterType.LIST(),
+            "output_FILE": ParameterType.FILE("test.txt"),
+        },
+    )
+
+    workflow.submit()
+    # [end parameter define]
+# [end workflow_declare]
diff --git a/tests/core/test_local_parameter.py 
b/tests/core/test_local_parameter.py
new file mode 100644
index 0000000..4a1b0c2
--- /dev/null
+++ b/tests/core/test_local_parameter.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test parameter."""
+
+
+import pytest
+
+from pydolphinscheduler.core.parameter import (
+    BaseDataType,
+    Direction,
+    ParameterHelper,
+    ParameterType,
+)
+from pydolphinscheduler.exceptions import PyDSParamException
+
+
[email protected](
+    "value, expect",
+    [
+        (123456, ParameterType.INTEGER),
+        (0.5, ParameterType.FLOAT),
+        ("abc", ParameterType.VARCHAR),
+        (None, ParameterType.VARCHAR),
+        (True, ParameterType.BOOLEAN),
+        (False, ParameterType.BOOLEAN),
+    ],
+)
+def test_infer_type_of_parameters(value, expect):
+    """Test the infer function."""
+    cls = ParameterHelper.infer_parameter_type(value)
+    assert cls == expect
+
+
[email protected](
+    "value",
+    [list(), dict(), set()],
+)
+def test_infer_type_of_parameters_error(value):
+    """Test the infer function error."""
+    with pytest.raises(
+        PyDSParamException,
+        match="Can not infer parameter type",
+    ):
+        ParameterHelper.infer_parameter_type(value)
+
+
[email protected](
+    "value, expect_type, expect_value",
+    [
+        (ParameterType.VARCHAR("123"), "VARCHAR", "123"),
+        (ParameterType.VARCHAR(123), "VARCHAR", "123"),
+        (ParameterType.VARCHAR(), "VARCHAR", ""),
+        (ParameterType.LONG(123), "LONG", "123"),
+        (ParameterType.LONG(), "LONG", ""),
+        (ParameterType.INTEGER(123), "INTEGER", 123),
+        (ParameterType.INTEGER("123"), "INTEGER", 123),
+        (ParameterType.INTEGER(), "INTEGER", ""),
+        (ParameterType.FLOAT(123), "FLOAT", float(123)),
+        (ParameterType.FLOAT("123"), "FLOAT", float(123)),
+        (ParameterType.FLOAT(), "FLOAT", ""),
+        (ParameterType.DOUBLE(123), "DOUBLE", "123"),
+        (ParameterType.DOUBLE(), "DOUBLE", ""),
+        (ParameterType.DATE("2022-01-01"), "DATE", "2022-01-01"),
+        (ParameterType.DATE(), "DATE", ""),
+        (ParameterType.TIME("2022-01-01"), "TIME", "2022-01-01"),
+        (ParameterType.TIME(), "TIME", ""),
+        (ParameterType.TIMESTAMP(123123123), "TIMESTAMP", "123123123"),
+        (ParameterType.TIMESTAMP(), "TIMESTAMP", ""),
+        (ParameterType.BOOLEAN(True), "BOOLEAN", True),
+        (ParameterType.BOOLEAN(), "BOOLEAN", ""),
+        (ParameterType.LIST("abc"), "LIST", "abc"),
+        (ParameterType.LIST(), "LIST", ""),
+        (ParameterType.FILE("task1.output"), "FILE", "task1.output"),
+    ],
+)
+def test_parameter_define(value: BaseDataType, expect_type: str, expect_value):
+    """Test the parameter define."""
+    assert value.data_type == expect_type
+    assert value.value == expect_value
+
+
+def test_convert_params():
+    """Test the ParameterHelper convert_params function."""
+    params = {
+        "value_INTEGER": 123,
+        "value_LONG": ParameterType.LONG("1000000"),
+    }
+    results = ParameterHelper.convert_params(params, direction=Direction.IN)
+    expect = [
+        {"prop": "value_INTEGER", "direct": "IN", "type": "INTEGER", "value": 
123},
+        {"prop": "value_LONG", "direct": "IN", "type": "LONG", "value": 
"1000000"},
+    ]
+    assert results == expect
+
+    results = ParameterHelper.convert_params(params, direction=Direction.OUT)
+    expect = [
+        {"prop": "value_INTEGER", "direct": "OUT", "type": "INTEGER", "value": 
123},
+        {"prop": "value_LONG", "direct": "OUT", "type": "LONG", "value": 
"1000000"},
+    ]
+
+    assert results == expect
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 250573a..d34b6d2 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -25,6 +25,7 @@ from unittest.mock import PropertyMock, patch
 
 import pytest
 
+from pydolphinscheduler.core.parameter import ParameterType
 from pydolphinscheduler.core.task import Task, TaskRelation
 from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.exceptions import PyResPluginException
@@ -509,3 +510,66 @@ def test_python_resource_list(
         resource_list=resources,
     )
     assert task.resource_list == expect
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_local_parameter(m_code_version):
+    """Test task local_params."""
+    base_local_params = [
+        {"prop": "base", "direct": "IN", "type": "VARCHAR", "value": "2022"},
+    ]
+
+    task = Task(name="test", task_type="task_type", 
local_params=base_local_params)
+
+    assert task.local_params == base_local_params
+
+    task = Task(
+        name="test",
+        task_type="task_type",
+        input_params={"a": 123, "b": True},
+        output_params={"c": "ccc", "d": ParameterType.LONG(123)},
+    )
+
+    expect = [
+        {"prop": "a", "direct": "IN", "type": "INTEGER", "value": 123},
+        {"prop": "b", "direct": "IN", "type": "BOOLEAN", "value": True},
+        {"prop": "c", "direct": "OUT", "type": "VARCHAR", "value": "ccc"},
+        {"prop": "d", "direct": "OUT", "type": "LONG", "value": "123"},
+    ]
+
+    assert task.local_params == expect
+
+    task = Task(
+        name="test",
+        task_type="task_type",
+        local_params=base_local_params,
+        input_params={"a": 123, "b": True},
+        output_params={"c": "ccc", "d": ParameterType.LONG(123)},
+    )
+
+    expect = [
+        {"prop": "base", "direct": "IN", "type": "VARCHAR", "value": "2022"},
+        {"prop": "a", "direct": "IN", "type": "INTEGER", "value": 123},
+        {"prop": "b", "direct": "IN", "type": "BOOLEAN", "value": True},
+        {"prop": "c", "direct": "OUT", "type": "VARCHAR", "value": "ccc"},
+        {"prop": "d", "direct": "OUT", "type": "LONG", "value": "123"},
+    ]
+
+    assert task.local_params == expect
+
+    task.add_in("e", "${e}")
+    task.add_out("f")
+
+    new_params = [
+        {"prop": "e", "direct": "IN", "type": "VARCHAR", "value": "${e}"},
+        {"prop": "f", "direct": "OUT", "type": "VARCHAR", "value": ""},
+    ]
+    expect.extend(new_params)
+
+    def sorted_func(x):
+        return (x["prop"], x["direct"])
+
+    assert sorted(task.local_params, key=sorted_func) == sorted(expect, 
key=sorted_func)
diff --git a/tests/core/test_yaml_workflow.py b/tests/core/test_yaml_workflow.py
index 60cf813..244965f 100644
--- a/tests/core/test_yaml_workflow.py
+++ b/tests/core/test_yaml_workflow.py
@@ -24,6 +24,7 @@ from unittest.mock import patch
 import pytest
 
 from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.parameter import ParameterType
 from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.core.yaml_workflow import (
     ParseTool,
@@ -68,6 +69,21 @@ def test_parse_tool_config(string_param, expect_key):
     assert expect == ParseTool.parse_string_param_if_config(string_param)
 
 
[email protected](
+    "string_param, key_path, expect",
+    [
+        ("VARCHAR()", "input_params", ParameterType.VARCHAR()),
+        ("FILE(data.txt)", "output_params", ParameterType.FILE("data.txt")),
+        (123, "output_params", 123),
+        (True, "output_params", True),
+    ],
+)
+def test_parse_tool_parameter(string_param, key_path, expect):
+    """Test parsing parameter."""
+    value = ParseTool.parse_string_param_if_parameter(string_param, 
key_path=key_path)
+    assert value == expect
+
+
 def test_parse_possible_yaml_file():
     """Test parsing possible path."""
     folder = Path(path_yaml_example)

Reply via email to