This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1417967 [python] Task condition missing two downstream param (#7783)
1417967 is described below
commit 1417967d9eebd8e2a5cd4f3b47449cb5be1bdb5d
Author: Jiajie Zhong <[email protected]>
AuthorDate: Wed Jan 5 19:58:44 2022 +0800
[python] Task condition missing two downstream param (#7783)
* [python] Task condition missing two downstream param
We add two downstream tasks to set task condition
success and failed node
close: #7763
* Add getter and setter property condition_resulth in base task
---
.../examples/task_conditions_example.py | 35 ++++++------
.../src/pydolphinscheduler/core/task.py | 12 +++-
.../src/pydolphinscheduler/tasks/condition.py | 21 ++++++-
.../tests/tasks/test_condition.py | 64 ++++++++++++++--------
4 files changed, 89 insertions(+), 43 deletions(-)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
index 8b66b79..6c1b039 100644
---
a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
+++
b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
@@ -22,11 +22,11 @@ This example will create five task in single workflow, with
four shell task and
condition have one upstream which we declare explicit with syntax `parent >>
condition`, and three downstream
automatically set dependence by condition task by passing parameter
`condition`. The graph of this workflow
like:
-pre_task_success_1 ->
- \
-pre_task_success_2 -> --> conditions -> end
- /
-pre_task_fail ->
+pre_task_1 -> -> success_branch
+ \ /
+pre_task_2 -> -> conditions ->
+ / \
+pre_task_3 -> -> fail_branch
.
"""
@@ -35,22 +35,23 @@ from pydolphinscheduler.tasks.condition import FAILURE,
SUCCESS, And, Conditions
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists")
as pd:
- condition_pre_task_1 = Shell(
- name="pre_task_success_1", command="echo pre_task_success_1"
- )
- condition_pre_task_2 = Shell(
- name="pre_task_success_2", command="echo pre_task_success_2"
- )
- condition_pre_task_3 = Shell(name="pre_task_fail", command="echo
pre_task_fail")
+ pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
+ pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
+ pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
cond_operator = And(
And(
- SUCCESS(condition_pre_task_1, condition_pre_task_2),
- FAILURE(condition_pre_task_3),
+ SUCCESS(pre_task_1, pre_task_2),
+ FAILURE(pre_task_3),
),
)
- end = Shell(name="end", command="echo parent")
+ success_branch = Shell(name="success_branch", command="echo
success_branch")
+ fail_branch = Shell(name="fail_branch", command="echo fail_branch")
- condition = Conditions(name="conditions", condition=cond_operator)
- condition >> end
+ condition = Conditions(
+ name="conditions",
+ condition=cond_operator,
+ success_task=success_branch,
+ failed_task=fail_branch,
+ )
pd.submit()
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 8a90efc..693f508 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -156,7 +156,7 @@ class Task(Base):
self.resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
- self.condition_result = condition_result or
self.DEFAULT_CONDITION_RESULT
+ self._condition_result = condition_result or
self.DEFAULT_CONDITION_RESULT
@property
def process_definition(self) -> Optional[ProcessDefinition]:
@@ -169,6 +169,16 @@ class Task(Base):
self._process_definition = process_definition
@property
+ def condition_result(self) -> Dict:
+ """Get attribute condition_result."""
+ return self._condition_result
+
+ @condition_result.setter
+ def condition_result(self, condition_result: Optional[Dict]):
+ """Set attribute condition_result."""
+ self._condition_result = condition_result
+
+ @property
def task_params(self) -> Optional[Dict]:
"""Get task parameter object.
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
index 905a41b..895a29b 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
@@ -157,9 +157,19 @@ class Or(ConditionOperator):
class Conditions(Task):
"""Task condition object, declare behavior for condition task to
dolphinscheduler."""
- def __init__(self, name: str, condition: ConditionOperator, *args,
**kwargs):
+ def __init__(
+ self,
+ name: str,
+ condition: ConditionOperator,
+ success_task: Task,
+ failed_task: Task,
+ *args,
+ **kwargs,
+ ):
super().__init__(name, TaskType.CONDITIONS, *args, **kwargs)
self.condition = condition
+ self.success_task = success_task
+ self.failed_task = failed_task
# Set condition tasks as current task downstream
self._set_dep()
@@ -171,6 +181,15 @@ class Conditions(Task):
for status in cond.args:
upstream.extend(list(status.tasks))
self.set_upstream(upstream)
+ self.set_downstream([self.success_task, self.failed_task])
+
+ @property
+ def condition_result(self) -> Dict:
+ """Get condition result define for java gateway."""
+ return {
+ "successNode": [self.success_task.code],
+ "failedNode": [self.failed_task.code],
+ }
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
index 9933c4f..6085de2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
@@ -324,7 +324,7 @@ def test_condition_operator_set_define_attr_mix_operator(
"pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
return_value=(123, 1),
)
-def test_dependent_get_define(mock_condition_code_version,
mock_task_code_version):
+def test_condition_get_define(mock_condition_code_version,
mock_task_code_version):
"""Test task condition :func:`get_define`."""
common_task = Task(name="common_task", task_type="test_task_condition")
cond_operator = And(
@@ -372,7 +372,10 @@ def test_dependent_get_define(mock_condition_code_version,
mock_task_code_versio
},
],
},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "conditionResult": {
+ "successNode": [common_task.code],
+ "failedNode": [common_task.code],
+ },
"waitStartTimeout": {},
},
"flag": "YES",
@@ -385,7 +388,9 @@ def test_dependent_get_define(mock_condition_code_version,
mock_task_code_versio
"timeout": 0,
}
- task = Conditions(name, condition=cond_operator)
+ task = Conditions(
+ name, condition=cond_operator, success_task=common_task,
failed_task=common_task
+ )
assert task.get_define() == expect
@@ -396,49 +401,60 @@ def
test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
def test_condition_set_dep_workflow(mock_task_code_version):
"""Test task condition set dependence in workflow level."""
with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
- condition_pre_task_1 = Task(name="pre_task_success_1",
task_type=TEST_TYPE)
- condition_pre_task_2 = Task(name="pre_task_success_2",
task_type=TEST_TYPE)
- condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
+ pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
+ pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
+ pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
cond_operator = And(
And(
- SUCCESS(condition_pre_task_1, condition_pre_task_2),
- FAILURE(condition_pre_task_3),
+ SUCCESS(pre_task_1, pre_task_2),
+ FAILURE(pre_task_3),
),
)
- end = Task(name="end", task_type=TEST_TYPE)
- condition = Conditions(name="conditions", condition=cond_operator)
- condition >> end
+ success_branch = Task(name="success_branch", task_type=TEST_TYPE)
+ fail_branch = Task(name="fail_branch", task_type=TEST_TYPE)
+
+ condition = Conditions(
+ name="conditions",
+ condition=cond_operator,
+ success_task=success_branch,
+ failed_task=fail_branch,
+ )
# General tasks test
- assert len(pd.tasks) == 5
+ assert len(pd.tasks) == 6
assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
[
+ pre_task_1,
+ pre_task_2,
+ pre_task_3,
+ success_branch,
+ fail_branch,
condition,
- condition_pre_task_1,
- condition_pre_task_2,
- condition_pre_task_3,
- end,
],
key=lambda t: t.name,
)
# Task dep test
- assert end._upstream_task_codes == {condition.code}
- assert condition._downstream_task_codes == {end.code}
+ assert success_branch._upstream_task_codes == {condition.code}
+ assert fail_branch._upstream_task_codes == {condition.code}
+ assert condition._downstream_task_codes == {
+ success_branch.code,
+ fail_branch.code,
+ }
# Condition task dep after ProcessDefinition function get_define called
assert condition._upstream_task_codes == {
- condition_pre_task_1.code,
- condition_pre_task_2.code,
- condition_pre_task_3.code,
+ pre_task_1.code,
+ pre_task_2.code,
+ pre_task_3.code,
}
assert all(
[
child._downstream_task_codes == {condition.code}
for child in [
- condition_pre_task_1,
- condition_pre_task_2,
- condition_pre_task_3,
+ pre_task_1,
+ pre_task_2,
+ pre_task_3,
]
]
)