This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 129b056 feat: Add stmdency handle @task decorate statement dependency
(#63)
129b056 is described below
commit 129b056a4ac36be1f00cff83b8c3fa4d7f4c2075
Author: Jay Chung <[email protected]>
AuthorDate: Thu Dec 22 18:26:15 2022 +0800
feat: Add stmdency handle @task decorate statement dependency (#63)
Before this, we can not handle statement dependency in task
decorate, including global variable, other function call
or import module
This patch we introduce stmdency to handle those issue and
make it
---
docs/source/tasks/func_wrap.rst | 55 ++++++++++++++++++++--
setup.py | 1 +
.../examples/tutorial_decorator.py | 46 ++++++++++++------
src/pydolphinscheduler/tasks/func_wrap.py | 34 +++++++------
src/pydolphinscheduler/tasks/python.py | 4 --
tests/tasks/test_func_wrap.py | 10 +++-
6 files changed, 111 insertions(+), 39 deletions(-)
diff --git a/docs/source/tasks/func_wrap.rst b/docs/source/tasks/func_wrap.rst
index a4a2972..eaa74b9 100644
--- a/docs/source/tasks/func_wrap.rst
+++ b/docs/source/tasks/func_wrap.rst
@@ -15,10 +15,59 @@
specific language governing permissions and limitations
under the License.
-Python Function Wrapper
-=======================
+Python Function Decorate
+========================
-A decorator covert Python function into pydolphinscheduler's task.
+A decorator covert Python function into pydolphinscheduler's task. Python
function decorator use decorate
+:code:`@task` from :code:`from pydolphinscheduler.tasks.func_wrap import task`
to convert Python function into
+a single Python task of dolphinscheduler.
+
+Because we have to covert the whole Python definition into multiple Python
task in dolphinscheduler, and all of
+the seperated Python task will be executed in the different Python process, so
we need to separate not only the
+python function code, but also the all variables and the imported modules
related to decorated function.
+
+For example, we decorated function ``depend_import`` in definition
+
+.. code-block:: python
+
+ import time
+
+ @task
+ def depend_import():
+ time.sleep(2)
+
+and we can see functon ``depend_import`` depend on other modules, it use
:code:`time.sleep(2)` from module :code:`time`
+to sleep 2 seconds. So when we want to separate this function into
dolphinscheduler task, need to include the imported
+:code:`time` module.
+
+which means we not only post code
+
+.. code-block:: python
+
+ def depend_import():
+ time.sleep(2)
+
+ depend_import()
+
+to dolphinscheduler Python task, we post the dependencies of this function as
well, so you will see this in
+dolphinscheduler Python task to make it work. And if you use the global
variables or other function in the
+decorated function, it will also including them as well.
+
+.. code-block:: python
+
+ import time
+
+ def depend_import():
+ time.sleep(2)
+
+ depend_import()
+
+
+.. note::
+
+ We use third party library `stmdency <https://pypi.org/project/stmdency>`_
to get the dependencies statement
+ of current function, so if you find some unexpected behavior you can report
bug to `apache-dolphinscheduler`
+ or `stmdency`.
Example
-------
diff --git a/setup.py b/setup.py
index 9f3c756..99fddac 100644
--- a/setup.py
+++ b/setup.py
@@ -42,6 +42,7 @@ prod = [
"click>=8.0.0",
"py4j~=0.10",
"ruamel.yaml",
+ "stmdency>=0.0.2",
]
build = [
diff --git a/src/pydolphinscheduler/examples/tutorial_decorator.py
b/src/pydolphinscheduler/examples/tutorial_decorator.py
index f3b878b..ed6de36 100644
--- a/src/pydolphinscheduler/examples/tutorial_decorator.py
+++ b/src/pydolphinscheduler/examples/tutorial_decorator.py
@@ -30,6 +30,8 @@ task_parent --> --> task_union
it will instantiate and run all the task it have.
"""
+import time
+
# [start tutorial]
# [start package_import]
# Import Workflow object to define your workflow attributes
@@ -40,30 +42,44 @@ from pydolphinscheduler.tasks.func_wrap import task
# [end package_import]
+scope_global = "global-var"
+
# [start task_declare]
@task
-def task_parent():
+def print_something():
"""First task in this workflow."""
- print("echo hello pydolphinscheduler")
+ print("hello python function wrap task")
@task
-def task_child_one():
- """Child task will be run parallel after task ``task_parent`` finished."""
- print("echo 'child one'")
+def depend_import():
+ """Depend on import module."""
+ time.sleep(2)
@task
-def task_child_two():
- """Child task will be run parallel after task ``task_parent`` finished."""
- print("echo 'child two'")
+def depend_global_var():
+ """Depend on global var."""
+ print(f"Use global variable {scope_global}")
+
+
+@task
+def depend_local_var():
+ """Depend on local variable."""
+ scope_global = "local"
+ print(f"Use local variable overwrite global {scope_global}")
+
+
+def foo():
+ """Call in other task."""
+ print("this is a global function")
@task
-def task_union():
- """Last task in this workflow."""
- print("echo union")
+def depend_func():
+ """Depend on global function."""
+ foo()
# [end task_declare]
@@ -78,13 +94,13 @@ with Workflow(
# [end workflow_declare]
# [start task_relation_declare]
- task_group = [task_child_one(), task_child_two()]
- task_parent().set_downstream(task_group)
+ task_group = [depend_import(), depend_global_var()]
+ print_something().set_downstream(task_group)
- task_union() << task_group
+ task_group >> depend_local_var() >> depend_func()
# [end task_relation_declare]
# [start submit_or_run]
- workflow.run()
+ workflow.submit()
# [end submit_or_run]
# [end tutorial]
diff --git a/src/pydolphinscheduler/tasks/func_wrap.py
b/src/pydolphinscheduler/tasks/func_wrap.py
index c0b73a1..3c22315 100644
--- a/src/pydolphinscheduler/tasks/func_wrap.py
+++ b/src/pydolphinscheduler/tasks/func_wrap.py
@@ -19,43 +19,47 @@
import functools
import inspect
-import itertools
import types
+from pathlib import Path
+
+from stmdency.extractor import Extractor
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.python import Python
-def _get_func_str(func: types.FunctionType) -> str:
- """Get Python function string without indent from decorator.
+def _exists_other_decorator(func: types.FunctionType) -> None:
+ """Check if the function has other decorators except @task.
:param func: The function which wraps by decorator ``@task``.
"""
lines = inspect.getsourcelines(func)[0]
- src_strip = ""
- lead_space_num = None
for line in lines:
- if lead_space_num is None:
- lead_space_num = sum(1 for _ in itertools.takewhile(str.isspace,
line))
- if line.strip() == "@task":
- continue
- elif line.strip().startswith("@"):
+ strip_line = line.strip()
+ if strip_line.startswith("@") and not strip_line == "@task":
raise PyDSParamException(
"Do no support other decorators for function ``task``
decorator."
)
- src_strip += line[lead_space_num:]
- return src_strip
def task(func: types.FunctionType):
- """Decorate which covert Python function into pydolphinscheduler task."""
+ """Decorate which covert Python functions into pydolphinscheduler task.
+
+ :param func: The function which wraps by decorator ``@task``.
+ """
@functools.wraps(func)
def wrapper(*args, **kwargs):
- func_str = _get_func_str(func)
+ _exists_other_decorator(func)
+ loc = func.__code__.co_filename
+ extractor = Extractor(Path(loc).open("r").read())
+ stm = extractor.get_code(func.__name__)
return Python(
- name=kwargs.get("name", func.__name__), definition=func_str,
*args, **kwargs
+ name=kwargs.get("name", func.__name__),
+ definition=f"{stm}{func.__name__}()",
+ *args,
+ **kwargs,
)
return wrapper
diff --git a/src/pydolphinscheduler/tasks/python.py
b/src/pydolphinscheduler/tasks/python.py
index 593cc52..bec8336 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -80,10 +80,6 @@ class Python(Task):
pattern = re.compile("^def (\\w+)\\(")
find = pattern.findall(definition)
if not find:
- log.warning(
- "Python definition is simple script instead of function,
with value %s",
- definition,
- )
return definition
# Keep function str and function callable always have one blank
line
func_str = (
diff --git a/tests/tasks/test_func_wrap.py b/tests/tasks/test_func_wrap.py
index aa0da26..10d97c6 100644
--- a/tests/tasks/test_func_wrap.py
+++ b/tests/tasks/test_func_wrap.py
@@ -49,7 +49,10 @@ def test_single_task_outside(mock_code):
pd_task = workflow.tasks[12345]
assert pd_task.name == "foo"
- assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
+ assert (
+ pd_task.raw_script
+ == 'TASK_NAME = "test_task"\n\n\ndef foo():\n
print(TASK_NAME)\nfoo()'
+ )
@patch(
@@ -70,7 +73,10 @@ def test_single_task_inside(mock_code):
pd_task = workflow.tasks[12345]
assert pd_task.name == "foo"
- assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
+ assert (
+ pd_task.raw_script
+ == 'TASK_NAME = "test_task"\n\n\ndef foo():\n
print(TASK_NAME)\nfoo()'
+ )
@patch(