This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
     new f2aef53  Fix bug in python example (#7681)
f2aef53 is described below

commit f2aef53ad132554415b368f16f5a61dd1932de20
Author: Jiajie Zhong <[email protected]>
AuthorDate: Tue Dec 28 18:52:55 2021 +0800

    Fix bug in python example (#7681)
    
    Fix example bug in switch and datax task type.
    Fix misunderstanding for condition node
    
    * [python] Fix switch example workflow name conflict to dependent
    
    * [python] Fix task condition missing branch success and fail
    
    * [python] Task datax add more detail example
---
 .../examples/task_conditions_example.py            | 37 +++++++------
 .../examples/task_datax_example.py                 | 60 ++++++++++++++++++--
 .../examples/task_switch_example.py                |  2 +-
 .../pydolphinscheduler/examples/tutorial.py        |  6 +-
 .../src/pydolphinscheduler/core/task.py            |  4 +-
 .../src/pydolphinscheduler/tasks/condition.py      | 22 +++++++-
 .../tests/tasks/test_condition.py                  | 64 ++++++++++++++--------
 7 files changed, 141 insertions(+), 54 deletions(-)

diff --git 
a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
 
b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
index 8b66b79..1415206 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
@@ -22,11 +22,11 @@ This example will create five task in single workflow, with 
four shell task and
 condition have one upstream which we declare explicit with syntax `parent >> 
condition`, and three downstream
 automatically set dependence by condition task by passing parameter 
`condition`. The graph of this workflow
 like:
-pre_task_success_1 ->
-                      \
-pre_task_success_2 ->  --> conditions -> end
-                      /
-pre_task_fail      ->
+pre_task_1 ->                     -> success_branch
+             \                  /
+pre_task_2 ->  -> conditions ->
+             /                  \
+pre_task_3 ->                     -> fail_branch
 .
 """
 
@@ -34,23 +34,24 @@ from pydolphinscheduler.core.process_definition import 
ProcessDefinition
 from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, 
Conditions
 from pydolphinscheduler.tasks.shell import Shell
 
-with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") 
as pd:
-    condition_pre_task_1 = Shell(
-        name="pre_task_success_1", command="echo pre_task_success_1"
-    )
-    condition_pre_task_2 = Shell(
-        name="pre_task_success_2", command="echo pre_task_success_2"
-    )
-    condition_pre_task_3 = Shell(name="pre_task_fail", command="echo 
pre_task_fail")
+with ProcessDefinition(name="task_conditions_example_1", 
tenant="tenant_exists") as pd:
+    pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
+    pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
+    pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
     cond_operator = And(
         And(
-            SUCCESS(condition_pre_task_1, condition_pre_task_2),
-            FAILURE(condition_pre_task_3),
+            SUCCESS(pre_task_1, pre_task_2),
+            FAILURE(pre_task_3),
         ),
     )
 
-    end = Shell(name="end", command="echo parent")
+    success_branch = Shell(name="success_branch", command="success_branch 
parent")
+    fail_branch = Shell(name="fail_branch", command="echo fail_branch")
 
-    condition = Conditions(name="conditions", condition=cond_operator)
-    condition >> end
+    condition = Conditions(
+        name="conditions",
+        condition=cond_operator,
+        success_task=success_branch,
+        failed_task=fail_branch,
+    )
     pd.submit()
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py 
b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
index c9ca80c..9b4254a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
@@ -29,10 +29,61 @@ from pydolphinscheduler.core.process_definition import 
ProcessDefinition
 from pydolphinscheduler.tasks.datax import CustomDataX, DataX
 
 # datax json template
-JSON_TEMPLATE = ""
+JSON_TEMPLATE = {
+    "job": {
+        "content": [
+            {
+                "reader": {
+                    "name": "mysqlreader",
+                    "parameter": {
+                        "username": "usr",
+                        "password": "pwd",
+                        "column": [
+                            "id",
+                            "name",
+                            "code",
+                            "description"
+                        ],
+                        "splitPk": "id",
+                        "connection": [
+                            {
+                                "table": [
+                                    "source_table"
+                                ],
+                                "jdbcUrl": [
+                                    "jdbc:mysql://127.0.0.1:3306/source_db"
+                                ]
+                            }
+                        ]
+                    }
+                },
+                "writer": {
+                    "name": "mysqlwriter",
+                    "parameter": {
+                        "writeMode": "insert",
+                        "username": "usr",
+                        "password": "pwd",
+                        "column": [
+                            "id",
+                            "name"
+                        ],
+                        "connection": [
+                            {
+                                "jdbcUrl": 
"jdbc:mysql://127.0.0.1:3306/target_db",
+                                "table": [
+                                    "target_table"
+                                ]
+                            }
+                        ]
+                    }
+                }
+            }
+        ]
+    }
+}
 
 with ProcessDefinition(
-    name="task_datax",
+    name="task_datax_1",
     tenant="tenant_exists",
 ) as pd:
     # This task synchronizes the data in `t_ds_project`
@@ -45,6 +96,7 @@ with ProcessDefinition(
         target_table="target_table",
     )
 
-    # you can custom json_template of datax to sync data.
-    task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE)
+    # you can custom json_template of datax to sync data. This task create job
+    # same as task1 do
+    task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
     pd.run()
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py 
b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
index 5ab2aa5..b47b8e3 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
@@ -34,7 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell
 from pydolphinscheduler.tasks.switch import Branch, Default, Switch, 
SwitchCondition
 
 with ProcessDefinition(
-    name="task_dependent_external",
+    name="task_switch_example",
     tenant="tenant_exists",
 ) as pd:
     parent = Shell(name="parent", command="echo parent")
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py 
b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
index d58bd75..52a8e97 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
@@ -34,18 +34,14 @@ from pydolphinscheduler.core.process_definition import 
ProcessDefinition
 from pydolphinscheduler.tasks.shell import Shell
 
 with ProcessDefinition(
-    name="tutorial",
-    schedule="0 0 0 * * ? *",
-    start_time="2021-01-01",
+    name="aklsfkkalsfjkol",
     tenant="tenant_exists",
 ) as pd:
-    task_parent = Shell(name="task_parent", command="echo hello 
pydolphinscheduler")
     task_child_one = Shell(name="task_child_one", command="echo 'child one'")
     task_child_two = Shell(name="task_child_two", command="echo 'child two'")
     task_union = Shell(name="task_union", command="echo union")
 
     task_group = [task_child_one, task_child_two]
-    task_parent.set_downstream(task_group)
 
     task_union << task_group
 
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 8a90efc..89fda57 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -26,6 +26,7 @@ from pydolphinscheduler.constants import (
     TaskFlag,
     TaskPriority,
     TaskTimeoutFlag,
+    TaskType,
 )
 from pydolphinscheduler.core.base import Base
 from pydolphinscheduler.core.process_definition import (
@@ -156,7 +157,8 @@ class Task(Base):
         self.resource_list = resource_list or []
         self.dependence = dependence or {}
         self.wait_start_timeout = wait_start_timeout or {}
-        self.condition_result = condition_result or 
self.DEFAULT_CONDITION_RESULT
+        if task_type != TaskType.CONDITIONS:
+            self.condition_result = condition_result or 
self.DEFAULT_CONDITION_RESULT
 
     @property
     def process_definition(self) -> Optional[ProcessDefinition]:
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
index 905a41b..a5773b2 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
@@ -157,9 +157,19 @@ class Or(ConditionOperator):
 class Conditions(Task):
     """Task condition object, declare behavior for condition task to 
dolphinscheduler."""
 
-    def __init__(self, name: str, condition: ConditionOperator, *args, 
**kwargs):
+    def __init__(
+        self,
+        name: str,
+        condition: ConditionOperator,
+        success_task: Task,
+        failed_task: Task,
+        *args,
+        **kwargs,
+    ):
         super().__init__(name, TaskType.CONDITIONS, *args, **kwargs)
         self.condition = condition
+        self.success_task = success_task
+        self.failed_task = failed_task
         # Set condition tasks as current task downstream
         self._set_dep()
 
@@ -171,6 +181,15 @@ class Conditions(Task):
                 for status in cond.args:
                     upstream.extend(list(status.tasks))
         self.set_upstream(upstream)
+        self.set_downstream([self.success_task, self.failed_task])
+
+    @property
+    def condition_result(self) -> Dict:
+        """Get condition result define for java gateway."""
+        return {
+            "successNode": [self.success_task.code],
+            "failedNode": [self.failed_task.code],
+        }
 
     @property
     def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> 
Dict:
@@ -182,4 +201,5 @@ class Conditions(Task):
         """
         params = super().task_params
         params["dependence"] = self.condition.get_define()
+        params["conditionResult"] = self.condition_result
         return params
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py 
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
index 9933c4f..6085de2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
@@ -324,7 +324,7 @@ def test_condition_operator_set_define_attr_mix_operator(
     "pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
     return_value=(123, 1),
 )
-def test_dependent_get_define(mock_condition_code_version, 
mock_task_code_version):
+def test_condition_get_define(mock_condition_code_version, 
mock_task_code_version):
     """Test task condition :func:`get_define`."""
     common_task = Task(name="common_task", task_type="test_task_condition")
     cond_operator = And(
@@ -372,7 +372,10 @@ def test_dependent_get_define(mock_condition_code_version, 
mock_task_code_versio
                     },
                 ],
             },
-            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "conditionResult": {
+                "successNode": [common_task.code],
+                "failedNode": [common_task.code],
+            },
             "waitStartTimeout": {},
         },
         "flag": "YES",
@@ -385,7 +388,9 @@ def test_dependent_get_define(mock_condition_code_version, 
mock_task_code_versio
         "timeout": 0,
     }
 
-    task = Conditions(name, condition=cond_operator)
+    task = Conditions(
+        name, condition=cond_operator, success_task=common_task, 
failed_task=common_task
+    )
     assert task.get_define() == expect
 
 
@@ -396,49 +401,60 @@ def 
test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
 def test_condition_set_dep_workflow(mock_task_code_version):
     """Test task condition set dependence in workflow level."""
     with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
-        condition_pre_task_1 = Task(name="pre_task_success_1", 
task_type=TEST_TYPE)
-        condition_pre_task_2 = Task(name="pre_task_success_2", 
task_type=TEST_TYPE)
-        condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
+        pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
+        pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
+        pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
         cond_operator = And(
             And(
-                SUCCESS(condition_pre_task_1, condition_pre_task_2),
-                FAILURE(condition_pre_task_3),
+                SUCCESS(pre_task_1, pre_task_2),
+                FAILURE(pre_task_3),
             ),
         )
-        end = Task(name="end", task_type=TEST_TYPE)
 
-        condition = Conditions(name="conditions", condition=cond_operator)
-        condition >> end
+        success_branch = Task(name="success_branch", task_type=TEST_TYPE)
+        fail_branch = Task(name="fail_branch", task_type=TEST_TYPE)
+
+        condition = Conditions(
+            name="conditions",
+            condition=cond_operator,
+            success_task=success_branch,
+            failed_task=fail_branch,
+        )
 
         # General tasks test
-        assert len(pd.tasks) == 5
+        assert len(pd.tasks) == 6
         assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
             [
+                pre_task_1,
+                pre_task_2,
+                pre_task_3,
+                success_branch,
+                fail_branch,
                 condition,
-                condition_pre_task_1,
-                condition_pre_task_2,
-                condition_pre_task_3,
-                end,
             ],
             key=lambda t: t.name,
         )
         # Task dep test
-        assert end._upstream_task_codes == {condition.code}
-        assert condition._downstream_task_codes == {end.code}
+        assert success_branch._upstream_task_codes == {condition.code}
+        assert fail_branch._upstream_task_codes == {condition.code}
+        assert condition._downstream_task_codes == {
+            success_branch.code,
+            fail_branch.code,
+        }
 
         # Condition task dep after ProcessDefinition function get_define called
         assert condition._upstream_task_codes == {
-            condition_pre_task_1.code,
-            condition_pre_task_2.code,
-            condition_pre_task_3.code,
+            pre_task_1.code,
+            pre_task_2.code,
+            pre_task_3.code,
         }
         assert all(
             [
                 child._downstream_task_codes == {condition.code}
                 for child in [
-                    condition_pre_task_1,
-                    condition_pre_task_2,
-                    condition_pre_task_3,
+                    pre_task_1,
+                    pre_task_2,
+                    pre_task_3,
                 ]
             ]
         )

Reply via email to