zhongjiajie commented on code in PR #11611:
URL: https://github.com/apache/dolphinscheduler/pull/11611#discussion_r956552218


##########
dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py:
##########
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test YAML process."""
+
+import os
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.yaml_process_define import (
+    ParseTool,
+    create_process_definition,
+    get_task_cls,
+)
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from tests.testing.task import Task
+
+
+def get_examples_folder():
+    """Get exmaple folder path for testing."""
+    base_folder = os.path.abspath(__file__)
+    examples_path = os.path.join(base_folder, "../../../examples/yaml_define")
+    return os.path.abspath(examples_path)

Review Comment:
   We have a convenient method to get the path, you can see more detail in 
https://github.com/apache/dolphinscheduler/blob/7a766cbcf2de82ea401ede11051b1059ef97919a/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py#L26



##########
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py:
##########
@@ -0,0 +1,456 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Parse YAML file to create process."""
+
+import logging
+import os
+import re
+from typing import Any, Dict
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+log = logging.getLogger(__file__)
+
+
+class ParseTool:
+    """Enhanced parsing tools."""
+
+    @staticmethod
+    def parse_string_param_if_file(string_param: str, **kwargs):
+        """Use $FILE{"data_path"} to load file from "data_path"."""
+        if string_param.startswith("$FILE"):
+            path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
+            base_folder = kwargs.get("base_folder", ".")
+            path = ParseTool.get_possible_path(path, base_folder)
+            with open(path, "r") as read_file:
+                string_param = "".join(read_file)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_env(string_param: str, **kwargs):
+        """Use $ENV{env_name} to load environment variable "env_name"."""
+        if "$ENV" in string_param:
+            key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
+            env_value = os.environ.get(key)
+            string_param = string_param.replace("$ENV{%s}" % key, env_value)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_config(string_param: str, **kwargs):
+        """Use ${CONFIG.var_name} to load variable "var_name" from 
configuration."""
+        if "${CONFIG" in string_param:
+            key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
+            if hasattr(configuration, key):
+                string_param = getattr(configuration, key)
+            else:
+                string_param = configuration.get_single_config(key)
+
+        return string_param
+
+    @staticmethod
+    def get_possible_path(file_path, base_folder):
+        """Get file possible path.
+
+        Return new path if file_path is not exists, but base_folder + 
file_path exists
+        """
+        possible_path = file_path
+        if not os.path.exists(file_path):
+            new_path = os.path.join(base_folder, file_path)
+            if os.path.exists(new_path):
+                possible_path = new_path
+                print(f"{file_path} not exists, convert to {possible_path}")
+
+        return possible_path
+
+
+def get_task_cls(task_type) -> Task:
+    """Get the task class object by task_type (case compatible)."""
+    # only get task class from tasks.__all__
+    all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__}
+    task_type_cap = task_type.capitalize()
+    if task_type_cap not in all_task_types:
+        raise PyDSTaskNoFoundException(f"cant not find task {task_type}")
+
+    standard_name = all_task_types[task_type_cap]
+    return getattr(tasks, standard_name)
+
+
+class YamlProcess(YamlParser):
+    """Yaml parser for create process.
+
+    :param yaml_file: yaml file path.
+
+        examples1 ::
+
+            parser = YamlParser(yaml=...)
+            parser.create_process_definition()
+
+        examples2 ::
+
+            YamlParser(yaml=...).create_process_definition()
+
+    """
+
+    _parse_rules = [
+        ParseTool.parse_string_param_if_file,
+        ParseTool.parse_string_param_if_env,
+        ParseTool.parse_string_param_if_config,
+    ]
+
+    def __init__(self, yaml_file: str):
+        with open(yaml_file, "r") as f:
+            content = f.read()
+
+        self._base_folder = os.path.split(yaml_file)[0]
+        content = self.prepare_refer_process(content)
+        super().__init__(content)
+
+    def create_process_definition(self):
+        """Create process main function."""
+        # get process parameters with key "process"
+        process_params = self["process"]
+
+        # pop "run" parameter, used at the end
+        is_run = process_params.pop("run", False)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        process_params = self.parse_params(process_params)
+
+        process_name = process_params["name"]
+        print(f"Create Process: {process_name}")
+        with ProcessDefinition(**process_params) as pd:
+
+            # save dependencies between tasks
+            dependencies = {}
+
+            # save name and task mapping
+            name2task = {}
+
+            # get task datas with key "tasks"
+            for task_data in self["tasks"]:
+                task = self.parse_task(task_data, name2task)
+
+                deps = task_data.get("deps", [])
+                if deps:
+                    dependencies[task.name] = deps
+                name2task[task.name] = task
+
+            # build dependencies between task
+            for downstream_task_name, deps in dependencies.items():
+                downstream_task = name2task[downstream_task_name]
+                for upstram_task_name in deps:

Review Comment:
   typo here, should be  `upstream_task_name `



##########
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py:
##########
@@ -0,0 +1,456 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Parse YAML file to create process."""
+
+import logging
+import os
+import re
+from typing import Any, Dict
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+log = logging.getLogger(__file__)
+
+
+class ParseTool:
+    """Enhanced parsing tools."""
+
+    @staticmethod
+    def parse_string_param_if_file(string_param: str, **kwargs):
+        """Use $FILE{"data_path"} to load file from "data_path"."""
+        if string_param.startswith("$FILE"):
+            path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
+            base_folder = kwargs.get("base_folder", ".")
+            path = ParseTool.get_possible_path(path, base_folder)
+            with open(path, "r") as read_file:
+                string_param = "".join(read_file)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_env(string_param: str, **kwargs):
+        """Use $ENV{env_name} to load environment variable "env_name"."""
+        if "$ENV" in string_param:
+            key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
+            env_value = os.environ.get(key)
+            string_param = string_param.replace("$ENV{%s}" % key, env_value)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_config(string_param: str, **kwargs):
+        """Use ${CONFIG.var_name} to load variable "var_name" from 
configuration."""
+        if "${CONFIG" in string_param:
+            key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
+            if hasattr(configuration, key):
+                string_param = getattr(configuration, key)
+            else:
+                string_param = configuration.get_single_config(key)
+
+        return string_param
+
+    @staticmethod
+    def get_possible_path(file_path, base_folder):
+        """Get file possible path.
+
+        Return new path if file_path is not exists, but base_folder + 
file_path exists
+        """
+        possible_path = file_path
+        if not os.path.exists(file_path):
+            new_path = os.path.join(base_folder, file_path)
+            if os.path.exists(new_path):
+                possible_path = new_path
+                print(f"{file_path} not exists, convert to {possible_path}")
+
+        return possible_path
+
+
+def get_task_cls(task_type) -> Task:
+    """Get the task class object by task_type (case compatible)."""
+    # only get task class from tasks.__all__
+    all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__}
+    task_type_cap = task_type.capitalize()
+    if task_type_cap not in all_task_types:
+        raise PyDSTaskNoFoundException(f"cant not find task {task_type}")
+
+    standard_name = all_task_types[task_type_cap]
+    return getattr(tasks, standard_name)
+
+
+class YamlProcess(YamlParser):
+    """Yaml parser for create process.
+
+    :param yaml_file: yaml file path.
+
+        examples1 ::
+
+            parser = YamlParser(yaml=...)
+            parser.create_process_definition()
+
+        examples2 ::
+
+            YamlParser(yaml=...).create_process_definition()
+
+    """
+
+    _parse_rules = [
+        ParseTool.parse_string_param_if_file,
+        ParseTool.parse_string_param_if_env,
+        ParseTool.parse_string_param_if_config,
+    ]
+
+    def __init__(self, yaml_file: str):
+        with open(yaml_file, "r") as f:
+            content = f.read()
+
+        self._base_folder = os.path.split(yaml_file)[0]
+        content = self.prepare_refer_process(content)
+        super().__init__(content)
+
+    def create_process_definition(self):
+        """Create process main function."""
+        # get process parameters with key "process"
+        process_params = self["process"]
+
+        # pop "run" parameter, used at the end
+        is_run = process_params.pop("run", False)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        process_params = self.parse_params(process_params)
+
+        process_name = process_params["name"]
+        print(f"Create Process: {process_name}")
+        with ProcessDefinition(**process_params) as pd:
+
+            # save dependencies between tasks
+            dependencies = {}
+
+            # save name and task mapping
+            name2task = {}
+
+            # get task datas with key "tasks"
+            for task_data in self["tasks"]:
+                task = self.parse_task(task_data, name2task)
+
+                deps = task_data.get("deps", [])
+                if deps:
+                    dependencies[task.name] = deps
+                name2task[task.name] = task
+
+            # build dependencies between task
+            for downstream_task_name, deps in dependencies.items():
+                downstream_task = name2task[downstream_task_name]
+                for upstram_task_name in deps:
+                    upstram_task = name2task[upstram_task_name]
+                    upstram_task >> downstream_task
+
+            pd.submit()
+            # if set is_run, run the process after submit
+            if is_run:
+                log.info(f"run process: {pd}")
+                pd.run()
+
+        return process_name
+
+    def parse_params(self, params: Any):
+        """Recursively resolves the parameter values.
+
+        The function operates params only when it encounters a string; other 
types continue recursively.
+        """
+        if isinstance(params, str):
+            for parse_rule in self._parse_rules:
+                params_ = params
+                params = parse_rule(params, base_folder=self._base_folder)
+                if params_ != params:
+                    print(f"parse {params_} -> {params}")
+
+        elif isinstance(params, list):
+            for index in range(len(params)):
+                params[index] = self.parse_params(params[index])
+
+        elif isinstance(params, dict):
+            for key, value in params.items():
+                params[key] = self.parse_params(value)
+
+        return params
+
+    @classmethod
+    def parse(cls, yaml_file: str):
+        """Recursively resolves the parameter values.
+
+        The function operates params only when it encounters a string; other 
types continue recursively.
+        """
+        process_name = cls(yaml_file).create_process_definition()
+        return process_name
+
+    def prepare_refer_process(self, content):
+        """Allow YAML files to reference process derived from other YAML 
files."""
+        process_paths = re.findall(r"\$PROCESS\{\"(.*?)\"\}", content)
+        for process_path in process_paths:
+            print(
+                f"find special token {process_path}, load process form 
{process_path}"
+            )
+            possible_path = ParseTool.get_possible_path(process_path, 
self._base_folder)
+            process_name = YamlProcess.parse(possible_path)
+            content = content.replace('$PROCESS{"%s"}' % process_path, 
process_name)
+
+        return content
+
+    def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
+        """Parse various types of tasks.
+
+        :param task_data: dict.
+                {
+                    "task_type": "Shell",
+                    "params": {"name": "shell_task", "command":"ehco hellp"}
+                }
+
+        :param name2task: Dict[str, Task]), mapping of task_name and task
+
+
+        Some task type have special parse func:
+            if task type is Switch, use parse_switch;
+            if task type is Condition, use parse_condition;
+            if task type is Dependent, use parse_dependent;
+            other, we pass all task_params as input to task class, like 
"task_cls(**task_params)".
+        """
+        task_type = task_data["task_type"]
+        task_params = task_data["params"]
+        task_cls = get_task_cls(task_type)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        task_params = self.parse_params(task_params)
+
+        if task_cls == tasks.Switch:
+            task = self.parse_switch(task_params, name2task)
+
+        elif task_cls == tasks.Condition:
+            task = self.parse_condition(task_params, name2task)
+
+        elif task_cls == tasks.Dependent:
+            task = self.parse_dependent(task_params, name2task)
+
+        else:
+            task = task_cls(**task_params)
+        print(task_type, task)

Review Comment:
   do we really need `print` here?



##########
dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py:
##########
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test YAML process."""
+
+import os
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.yaml_process_define import (
+    ParseTool,
+    create_process_definition,
+    get_task_cls,
+)
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from tests.testing.task import Task
+
+
+def get_examples_folder():
+    """Get exmaple folder path for testing."""

Review Comment:
   ```suggestion
       """Get example folder path for testing."""
   ```



##########
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py:
##########
@@ -0,0 +1,456 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Parse YAML file to create process."""
+
+import logging
+import os
+import re
+from typing import Any, Dict
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+log = logging.getLogger(__file__)
+
+
+class ParseTool:
+    """Enhanced parsing tools."""
+
+    @staticmethod
+    def parse_string_param_if_file(string_param: str, **kwargs):
+        """Use $FILE{"data_path"} to load file from "data_path"."""
+        if string_param.startswith("$FILE"):
+            path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
+            base_folder = kwargs.get("base_folder", ".")
+            path = ParseTool.get_possible_path(path, base_folder)
+            with open(path, "r") as read_file:
+                string_param = "".join(read_file)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_env(string_param: str, **kwargs):
+        """Use $ENV{env_name} to load environment variable "env_name"."""
+        if "$ENV" in string_param:
+            key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
+            env_value = os.environ.get(key)
+            string_param = string_param.replace("$ENV{%s}" % key, env_value)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_config(string_param: str, **kwargs):
+        """Use ${CONFIG.var_name} to load variable "var_name" from 
configuration."""
+        if "${CONFIG" in string_param:
+            key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
+            if hasattr(configuration, key):
+                string_param = getattr(configuration, key)
+            else:
+                string_param = configuration.get_single_config(key)
+
+        return string_param
+
+    @staticmethod
+    def get_possible_path(file_path, base_folder):
+        """Get file possible path.
+
+        Return new path if file_path is not exists, but base_folder + 
file_path exists
+        """
+        possible_path = file_path
+        if not os.path.exists(file_path):

Review Comment:
   can you use module `Pathlib` to handle the path issue instead of `os.path`?



##########
dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py:
##########
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test YAML process."""
+
+import os
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.yaml_process_define import (
+    ParseTool,
+    create_process_definition,
+    get_task_cls,
+)
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from tests.testing.task import Task
+
+
+def get_examples_folder():
+    """Get exmaple folder path for testing."""
+    base_folder = os.path.abspath(__file__)
+    examples_path = os.path.join(base_folder, "../../../examples/yaml_define")
+    return os.path.abspath(examples_path)
+
+
[email protected](
+    "string_param, expect",
+    [
+        ("$ENV{PROJECT_NAME}", "~/pydolphinscheduler"),
+    ],
+)
+def test_parse_tool_env(string_param, expect):
+    """Test parsing the environment variable."""
+    os.environ["PROJECT_NAME"] = expect
+    assert expect == ParseTool.parse_string_param_if_env(string_param)

Review Comment:
   can you also add test for function `get_possible_path`?



##########
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py:
##########
@@ -0,0 +1,456 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Parse YAML file to create process."""
+
+import logging
+import os
+import re
+from typing import Any, Dict
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+log = logging.getLogger(__file__)
+
+
+class ParseTool:
+    """Enhanced parsing tools."""
+
+    @staticmethod
+    def parse_string_param_if_file(string_param: str, **kwargs):
+        """Use $FILE{"data_path"} to load file from "data_path"."""
+        if string_param.startswith("$FILE"):
+            path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
+            base_folder = kwargs.get("base_folder", ".")
+            path = ParseTool.get_possible_path(path, base_folder)
+            with open(path, "r") as read_file:
+                string_param = "".join(read_file)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_env(string_param: str, **kwargs):
+        """Use $ENV{env_name} to load environment variable "env_name"."""
+        if "$ENV" in string_param:
+            key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
+            env_value = os.environ.get(key)
+            string_param = string_param.replace("$ENV{%s}" % key, env_value)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_config(string_param: str, **kwargs):
+        """Use ${CONFIG.var_name} to load variable "var_name" from 
configuration."""
+        if "${CONFIG" in string_param:
+            key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
+            if hasattr(configuration, key):
+                string_param = getattr(configuration, key)
+            else:
+                string_param = configuration.get_single_config(key)
+
+        return string_param
+
+    @staticmethod
+    def get_possible_path(file_path, base_folder):
+        """Get file possible path.
+
+        Return new path if file_path is not exists, but base_folder + 
file_path exists
+        """
+        possible_path = file_path
+        if not os.path.exists(file_path):
+            new_path = os.path.join(base_folder, file_path)
+            if os.path.exists(new_path):
+                possible_path = new_path
+                print(f"{file_path} not exists, convert to {possible_path}")
+
+        return possible_path
+
+
+def get_task_cls(task_type) -> Task:
+    """Get the task class object by task_type (case compatible)."""
+    # only get task class from tasks.__all__
+    all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__}
+    task_type_cap = task_type.capitalize()
+    if task_type_cap not in all_task_types:
+        raise PyDSTaskNoFoundException(f"cant not find task {task_type}")
+
+    standard_name = all_task_types[task_type_cap]
+    return getattr(tasks, standard_name)
+
+
+class YamlProcess(YamlParser):
+    """Yaml parser for create process.
+
+    :param yaml_file: yaml file path.
+
+        examples1 ::
+
+            parser = YamlParser(yaml=...)
+            parser.create_process_definition()
+
+        examples2 ::
+
+            YamlParser(yaml=...).create_process_definition()
+
+    """
+
+    _parse_rules = [
+        ParseTool.parse_string_param_if_file,
+        ParseTool.parse_string_param_if_env,
+        ParseTool.parse_string_param_if_config,
+    ]
+
+    def __init__(self, yaml_file: str):
+        with open(yaml_file, "r") as f:
+            content = f.read()
+
+        self._base_folder = os.path.split(yaml_file)[0]
+        content = self.prepare_refer_process(content)
+        super().__init__(content)
+
+    def create_process_definition(self):
+        """Create process main function."""
+        # get process parameters with key "process"
+        process_params = self["process"]
+
+        # pop "run" parameter, used at the end
+        is_run = process_params.pop("run", False)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        process_params = self.parse_params(process_params)
+
+        process_name = process_params["name"]
+        print(f"Create Process: {process_name}")
+        with ProcessDefinition(**process_params) as pd:
+
+            # save dependencies between tasks
+            dependencies = {}
+
+            # save name and task mapping
+            name2task = {}
+
+            # get task datas with key "tasks"
+            for task_data in self["tasks"]:
+                task = self.parse_task(task_data, name2task)
+
+                deps = task_data.get("deps", [])
+                if deps:
+                    dependencies[task.name] = deps
+                name2task[task.name] = task
+
+            # build dependencies between task
+            for downstream_task_name, deps in dependencies.items():
+                downstream_task = name2task[downstream_task_name]
+                for upstram_task_name in deps:
+                    upstram_task = name2task[upstram_task_name]
+                    upstram_task >> downstream_task
+
+            pd.submit()
+            # if set is_run, run the process after submit
+            if is_run:
+                log.info(f"run process: {pd}")
+                pd.run()
+
+        return process_name
+
+    def parse_params(self, params: Any):
+        """Recursively resolves the parameter values.
+
+        The function operates params only when it encounters a string; other 
types continue recursively.
+        """
+        if isinstance(params, str):
+            for parse_rule in self._parse_rules:
+                params_ = params
+                params = parse_rule(params, base_folder=self._base_folder)
+                if params_ != params:
+                    print(f"parse {params_} -> {params}")
+
+        elif isinstance(params, list):
+            for index in range(len(params)):
+                params[index] = self.parse_params(params[index])
+
+        elif isinstance(params, dict):
+            for key, value in params.items():
+                params[key] = self.parse_params(value)
+
+        return params
+
+    @classmethod
+    def parse(cls, yaml_file: str):
+        """Recursively resolves the parameter values.
+
+        The function operates params only when it encounters a string; other 
types continue recursively.
+        """
+        process_name = cls(yaml_file).create_process_definition()
+        return process_name
+
+    def prepare_refer_process(self, content):
+        """Allow YAML files to reference process derived from other YAML 
files."""
+        process_paths = re.findall(r"\$PROCESS\{\"(.*?)\"\}", content)
+        for process_path in process_paths:
+            print(
+                f"find special token {process_path}, load process form 
{process_path}"
+            )
+            possible_path = ParseTool.get_possible_path(process_path, 
self._base_folder)
+            process_name = YamlProcess.parse(possible_path)
+            content = content.replace('$PROCESS{"%s"}' % process_path, 
process_name)
+
+        return content
+
+    def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
+        """Parse various types of tasks.
+
+        :param task_data: dict.
+                {
+                    "task_type": "Shell",
+                    "params": {"name": "shell_task", "command":"ehco hellp"}
+                }
+
+        :param name2task: Dict[str, Task]), mapping of task_name and task
+
+
+        Some task type have special parse func:
+            if task type is Switch, use parse_switch;
+            if task type is Condition, use parse_condition;
+            if task type is Dependent, use parse_dependent;
+            other, we pass all task_params as input to task class, like 
"task_cls(**task_params)".
+        """
+        task_type = task_data["task_type"]
+        task_params = task_data["params"]
+        task_cls = get_task_cls(task_type)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        task_params = self.parse_params(task_params)
+
+        if task_cls == tasks.Switch:
+            task = self.parse_switch(task_params, name2task)
+
+        elif task_cls == tasks.Condition:
+            task = self.parse_condition(task_params, name2task)
+
+        elif task_cls == tasks.Dependent:
+            task = self.parse_dependent(task_params, name2task)
+
+        else:
+            task = task_cls(**task_params)
+        print(task_type, task)
+        return task
+
+    def parse_switch(self, task_params, name2task):
+        """Parse Switch Task.
+
+        This is an example Yaml fragment of task_params
+
+        name: switch
+        condition:
+          - ["${var} > 1", switch_child_1]
+          - switch_child_2
+        """
+        from pydolphinscheduler.tasks.switch import (
+            Branch,
+            Default,
+            Switch,
+            SwitchCondition,
+        )
+
+        condition_datas = task_params["condition"]
+        conditions = []
+        for condition_data in condition_datas:
+
+            # if condition_data is string, for example: switch_child_2, set it 
to Default branch
+            if isinstance(condition_data, str):
+                conditions.append(Default(task=name2task.get(condition_data)))
+
+            # if condition_data is list of 2 items, for example:
+            # ["${var} > 1", switch_child_1], set it to Branch
+            elif isinstance(condition_data, list) and len(condition_data) == 2:
+                cond, task_name = condition_data
+                conditions.append(Branch(cond, task=name2task.get(task_name)))
+            else:
+                raise Exception(f"cant not parse {condition_data}")
+
+        switch = Switch(
+            name=task_params["name"], condition=SwitchCondition(*conditions)
+        )
+        return switch
+
+    def parse_condition(self, task_params, name2task):
+        """Parse Condition Task.
+
+        This is an example Yaml fragment of task_params
+
+        name: condition
+        success_task: success_branch
+        failed_task: fail_branch
+        OP: AND
+        groups:
+          -
+            OP: AND
+            groups:
+              - [pre_task_1, true]
+              - [pre_task_2, true]
+              - [pre_task_3, false]
+          -
+            OP: AND
+            groups:
+              - [pre_task_1, false]
+              - [pre_task_2, true]
+              - [pre_task_3, true]
+
+        """
+        from pydolphinscheduler.tasks.condition import (
+            FAILURE,
+            SUCCESS,
+            And,
+            Condition,
+            Or,
+        )
+
+        def get_op_cls(op):
+            cls = None
+            if op.lower() == "and":
+                cls = And
+            elif op.lower() == "or":
+                cls = Or
+            else:
+                raise Exception(f"OP must be in And or Or, but get: {op}")

Review Comment:
   please change all `exception` to `Exception("OP must be in And or Or, but 
get: %s", op)` instead of f-strings, f-strings will always format the string, 
but `Exception("OP must be in And or Or, but get: %s", op)` will only format 
when exception raise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to