This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 4460f3b [impv] Add option param for workflow and task (#43)
4460f3b is described below
commit 4460f3bcfb8f2a8e0fe09c67b8bf8d078ae5fa2b
Author: Jay Chung <[email protected]>
AuthorDate: Wed Dec 14 16:32:18 2022 +0800
[impv] Add option param for workflow and task (#43)
---
src/pydolphinscheduler/core/task.py | 11 ++++++++++-
src/pydolphinscheduler/core/workflow.py | 2 ++
tests/core/test_task.py | 18 ++++++++++++++++++
3 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/src/pydolphinscheduler/core/task.py
b/src/pydolphinscheduler/core/task.py
index 816d1b5..32d87bc 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -141,6 +141,8 @@ class Task(Base):
wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
+ *args,
+ **kwargs,
):
super().__init__(name, description)
@@ -155,7 +157,14 @@ class Task(Base):
self.timeout_notify_strategy = timeout_notify_strategy
self._timeout: timedelta = timeout
self._workflow = None
- self.workflow: Workflow = workflow or WorkflowContext.get()
+ if "process_definition" in kwargs:
+ warnings.warn(
+ "The `process_definition` parameter is deprecated, please use
`workflow` instead.",
+ DeprecationWarning,
+ )
+ self.workflow = kwargs.pop("process_definition")
+ else:
+ self.workflow: Workflow = workflow or WorkflowContext.get()
self._upstream_task_codes: Set[int] = set()
self._downstream_task_codes: Set[int] = set()
self._task_relation: Set[TaskRelation] = set()
diff --git a/src/pydolphinscheduler/core/workflow.py
b/src/pydolphinscheduler/core/workflow.py
index f972a8f..ca25b7c 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -130,6 +130,8 @@ class Workflow(Base):
param: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
resource_list: Optional[List[Resource]] = None,
+ *args,
+ **kwargs,
):
super().__init__(name, description)
self.schedule = schedule
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 1742cd2..40a3e9c 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -18,6 +18,7 @@
"""Test Task class function."""
import logging
import re
+import warnings
from datetime import timedelta
from typing import Set, Tuple
from unittest.mock import PropertyMock, patch
@@ -343,6 +344,23 @@ def test_add_duplicate(caplog):
)
+def test_use_deprecated_pd(caplog):
+ """Test raise warning when using process_defintion assign workflow."""
+ wf = Workflow("test_deprecated_pd")
+ with warnings.catch_warnings(record=True) as w:
+ task = TaskWithCode(
+ name="test_deprecated_pd",
+ task_type="test",
+ code=123,
+ version=2,
+ process_definition=wf,
+ )
+ assert len(w) == 1
+ assert issubclass(w[-1].category, DeprecationWarning)
+ assert "deprecated" in str(w[-1].message)
+ assert task.workflow == wf
+
+
@pytest.mark.parametrize(
"val, expected",
[