This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new f2aef53 Fix bug in python example (#7681)
f2aef53 is described below
commit f2aef53ad132554415b368f16f5a61dd1932de20
Author: Jiajie Zhong <[email protected]>
AuthorDate: Tue Dec 28 18:52:55 2021 +0800
Fix bug in python example (#7681)
Fix example bug in switch and datax task type.
Fix misunderstanding for condition node
* [python] Fix switch example workflow name conflict to dependent
* [python] Fix task condition missing branch success and fail
* [python] Task datax add more detail example
---
.../examples/task_conditions_example.py | 37 +++++++------
.../examples/task_datax_example.py | 60 ++++++++++++++++++--
.../examples/task_switch_example.py | 2 +-
.../pydolphinscheduler/examples/tutorial.py | 6 +-
.../src/pydolphinscheduler/core/task.py | 4 +-
.../src/pydolphinscheduler/tasks/condition.py | 22 +++++++-
.../tests/tasks/test_condition.py | 64 ++++++++++++++--------
7 files changed, 141 insertions(+), 54 deletions(-)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
index 8b66b79..1415206 100644
---
a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
+++
b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
@@ -22,11 +22,11 @@ This example will create five task in single workflow, with
four shell task and
condition have one upstream which we declare explicit with syntax `parent >>
condition`, and three downstream
automatically set dependence by condition task by passing parameter
`condition`. The graph of this workflow
like:
-pre_task_success_1 ->
- \
-pre_task_success_2 -> --> conditions -> end
- /
-pre_task_fail ->
+pre_task_1 -> -> success_branch
+ \ /
+pre_task_2 -> -> conditions ->
+ / \
+pre_task_3 -> -> fail_branch
.
"""
@@ -34,23 +34,24 @@ from pydolphinscheduler.core.process_definition import
ProcessDefinition
from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And,
Conditions
from pydolphinscheduler.tasks.shell import Shell
-with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists")
as pd:
- condition_pre_task_1 = Shell(
- name="pre_task_success_1", command="echo pre_task_success_1"
- )
- condition_pre_task_2 = Shell(
- name="pre_task_success_2", command="echo pre_task_success_2"
- )
- condition_pre_task_3 = Shell(name="pre_task_fail", command="echo
pre_task_fail")
+with ProcessDefinition(name="task_conditions_example_1",
tenant="tenant_exists") as pd:
+ pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
+ pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
+ pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
cond_operator = And(
And(
- SUCCESS(condition_pre_task_1, condition_pre_task_2),
- FAILURE(condition_pre_task_3),
+ SUCCESS(pre_task_1, pre_task_2),
+ FAILURE(pre_task_3),
),
)
- end = Shell(name="end", command="echo parent")
+ success_branch = Shell(name="success_branch", command="success_branch
parent")
+ fail_branch = Shell(name="fail_branch", command="echo fail_branch")
- condition = Conditions(name="conditions", condition=cond_operator)
- condition >> end
+ condition = Conditions(
+ name="conditions",
+ condition=cond_operator,
+ success_task=success_branch,
+ failed_task=fail_branch,
+ )
pd.submit()
diff --git
a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
index c9ca80c..9b4254a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
@@ -29,10 +29,61 @@ from pydolphinscheduler.core.process_definition import
ProcessDefinition
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
# datax json template
-JSON_TEMPLATE = ""
+JSON_TEMPLATE = {
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "usr",
+ "password": "pwd",
+ "column": [
+ "id",
+ "name",
+ "code",
+ "description"
+ ],
+ "splitPk": "id",
+ "connection": [
+ {
+ "table": [
+ "source_table"
+ ],
+ "jdbcUrl": [
+ "jdbc:mysql://127.0.0.1:3306/source_db"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "mysqlwriter",
+ "parameter": {
+ "writeMode": "insert",
+ "username": "usr",
+ "password": "pwd",
+ "column": [
+ "id",
+ "name"
+ ],
+ "connection": [
+ {
+ "jdbcUrl":
"jdbc:mysql://127.0.0.1:3306/target_db",
+ "table": [
+ "target_table"
+ ]
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+}
with ProcessDefinition(
- name="task_datax",
+ name="task_datax_1",
tenant="tenant_exists",
) as pd:
# This task synchronizes the data in `t_ds_project`
@@ -45,6 +96,7 @@ with ProcessDefinition(
target_table="target_table",
)
- # you can custom json_template of datax to sync data.
- task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE)
+ # you can custom json_template of datax to sync data. This task create job
+ # same as task1 do
+ task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
pd.run()
diff --git
a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
index 5ab2aa5..b47b8e3 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
@@ -34,7 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.tasks.switch import Branch, Default, Switch,
SwitchCondition
with ProcessDefinition(
- name="task_dependent_external",
+ name="task_switch_example",
tenant="tenant_exists",
) as pd:
parent = Shell(name="parent", command="echo parent")
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
index d58bd75..52a8e97 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
@@ -34,18 +34,14 @@ from pydolphinscheduler.core.process_definition import
ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(
- name="tutorial",
- schedule="0 0 0 * * ? *",
- start_time="2021-01-01",
+ name="aklsfkkalsfjkol",
tenant="tenant_exists",
) as pd:
- task_parent = Shell(name="task_parent", command="echo hello
pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
task_group = [task_child_one, task_child_two]
- task_parent.set_downstream(task_group)
task_union << task_group
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 8a90efc..89fda57 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -26,6 +26,7 @@ from pydolphinscheduler.constants import (
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
+ TaskType,
)
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.core.process_definition import (
@@ -156,7 +157,8 @@ class Task(Base):
self.resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
- self.condition_result = condition_result or
self.DEFAULT_CONDITION_RESULT
+ if task_type != TaskType.CONDITIONS:
+ self.condition_result = condition_result or
self.DEFAULT_CONDITION_RESULT
@property
def process_definition(self) -> Optional[ProcessDefinition]:
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
index 905a41b..a5773b2 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
@@ -157,9 +157,19 @@ class Or(ConditionOperator):
class Conditions(Task):
"""Task condition object, declare behavior for condition task to
dolphinscheduler."""
- def __init__(self, name: str, condition: ConditionOperator, *args,
**kwargs):
+ def __init__(
+ self,
+ name: str,
+ condition: ConditionOperator,
+ success_task: Task,
+ failed_task: Task,
+ *args,
+ **kwargs,
+ ):
super().__init__(name, TaskType.CONDITIONS, *args, **kwargs)
self.condition = condition
+ self.success_task = success_task
+ self.failed_task = failed_task
# Set condition tasks as current task downstream
self._set_dep()
@@ -171,6 +181,15 @@ class Conditions(Task):
for status in cond.args:
upstream.extend(list(status.tasks))
self.set_upstream(upstream)
+ self.set_downstream([self.success_task, self.failed_task])
+
+ @property
+ def condition_result(self) -> Dict:
+ """Get condition result define for java gateway."""
+ return {
+ "successNode": [self.success_task.code],
+ "failedNode": [self.failed_task.code],
+ }
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
@@ -182,4 +201,5 @@ class Conditions(Task):
"""
params = super().task_params
params["dependence"] = self.condition.get_define()
+ params["conditionResult"] = self.condition_result
return params
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
index 9933c4f..6085de2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
@@ -324,7 +324,7 @@ def test_condition_operator_set_define_attr_mix_operator(
"pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
return_value=(123, 1),
)
-def test_dependent_get_define(mock_condition_code_version,
mock_task_code_version):
+def test_condition_get_define(mock_condition_code_version,
mock_task_code_version):
"""Test task condition :func:`get_define`."""
common_task = Task(name="common_task", task_type="test_task_condition")
cond_operator = And(
@@ -372,7 +372,10 @@ def test_dependent_get_define(mock_condition_code_version,
mock_task_code_versio
},
],
},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "conditionResult": {
+ "successNode": [common_task.code],
+ "failedNode": [common_task.code],
+ },
"waitStartTimeout": {},
},
"flag": "YES",
@@ -385,7 +388,9 @@ def test_dependent_get_define(mock_condition_code_version,
mock_task_code_versio
"timeout": 0,
}
- task = Conditions(name, condition=cond_operator)
+ task = Conditions(
+ name, condition=cond_operator, success_task=common_task,
failed_task=common_task
+ )
assert task.get_define() == expect
@@ -396,49 +401,60 @@ def
test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
def test_condition_set_dep_workflow(mock_task_code_version):
"""Test task condition set dependence in workflow level."""
with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
- condition_pre_task_1 = Task(name="pre_task_success_1",
task_type=TEST_TYPE)
- condition_pre_task_2 = Task(name="pre_task_success_2",
task_type=TEST_TYPE)
- condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
+ pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
+ pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
+ pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
cond_operator = And(
And(
- SUCCESS(condition_pre_task_1, condition_pre_task_2),
- FAILURE(condition_pre_task_3),
+ SUCCESS(pre_task_1, pre_task_2),
+ FAILURE(pre_task_3),
),
)
- end = Task(name="end", task_type=TEST_TYPE)
- condition = Conditions(name="conditions", condition=cond_operator)
- condition >> end
+ success_branch = Task(name="success_branch", task_type=TEST_TYPE)
+ fail_branch = Task(name="fail_branch", task_type=TEST_TYPE)
+
+ condition = Conditions(
+ name="conditions",
+ condition=cond_operator,
+ success_task=success_branch,
+ failed_task=fail_branch,
+ )
# General tasks test
- assert len(pd.tasks) == 5
+ assert len(pd.tasks) == 6
assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
[
+ pre_task_1,
+ pre_task_2,
+ pre_task_3,
+ success_branch,
+ fail_branch,
condition,
- condition_pre_task_1,
- condition_pre_task_2,
- condition_pre_task_3,
- end,
],
key=lambda t: t.name,
)
# Task dep test
- assert end._upstream_task_codes == {condition.code}
- assert condition._downstream_task_codes == {end.code}
+ assert success_branch._upstream_task_codes == {condition.code}
+ assert fail_branch._upstream_task_codes == {condition.code}
+ assert condition._downstream_task_codes == {
+ success_branch.code,
+ fail_branch.code,
+ }
# Condition task dep after ProcessDefinition function get_define called
assert condition._upstream_task_codes == {
- condition_pre_task_1.code,
- condition_pre_task_2.code,
- condition_pre_task_3.code,
+ pre_task_1.code,
+ pre_task_2.code,
+ pre_task_3.code,
}
assert all(
[
child._downstream_task_codes == {condition.code}
for child in [
- condition_pre_task_1,
- condition_pre_task_2,
- condition_pre_task_3,
+ pre_task_1,
+ pre_task_2,
+ pre_task_3,
]
]
)