This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b1edce2 [python] Add checker pd attr param and fix switch example
(#7818)
b1edce2 is described below
commit b1edce2b113ae0e79770e7b5a9d1e005e8784f7f
Author: Jiajie Zhong <[email protected]>
AuthorDate: Wed Jan 5 19:59:20 2022 +0800
[python] Add checker pd attr param and fix switch example (#7818)
* [python] Add checker pd attr param and fix switch example
* Correct submit self.param to java gateway
* Fix missing parameter for switch example
* Add mechanism checker before submit to java gateway
close: #7803
* Fix mock get task code and version
* Change judge statement and add comment
---
.../examples/task_switch_example.py | 3 +-
.../pydolphinscheduler/core/process_definition.py | 38 ++++++++++-
.../tests/core/test_process_definition.py | 77 ++++++++++++++++++++++
3 files changed, 114 insertions(+), 4 deletions(-)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
index b47b8e3..2b30d0a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
@@ -34,8 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.tasks.switch import Branch, Default, Switch,
SwitchCondition
with ProcessDefinition(
- name="task_switch_example",
- tenant="tenant_exists",
+ name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
) as pd:
parent = Shell(name="parent", command="echo parent")
switch_child_1 = Shell(name="switch_child_1", command="echo
switch_child_1")
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 70d4e6b..4941a85 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set
from pydolphinscheduler.constants import (
ProcessDefinitionDefault,
ProcessDefinitionReleaseState,
+ TaskType,
)
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.exceptions import PyDSParamException,
PyDSTaskNoFoundException
@@ -97,7 +98,7 @@ class ProcessDefinition(Base):
worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
- param: Optional[List] = None,
+ param: Optional[Dict] = None,
):
super().__init__(name, description)
self.schedule = schedule
@@ -190,6 +191,22 @@ class ProcessDefinition(Base):
self._end_time = val
@property
+ def param_json(self) -> Optional[List[Dict]]:
+ """Return param json base on self.param."""
+ # Handle empty dict and None value
+ if not self.param:
+ return None
+ return [
+ {
+ "prop": k,
+ "direct": "IN",
+ "type": "VARCHAR",
+ "value": v,
+ }
+ for k, v in self.param.items()
+ ]
+
+ @property
def task_definition_json(self) -> List[Dict]:
"""Return all tasks definition in list of dict."""
if not self.tasks:
@@ -323,16 +340,33 @@ class ProcessDefinition(Base):
# Project model need User object exists
self.project.create_if_not_exists(self._user)
+ def _pre_submit_check(self):
+ """Check specific condition satisfy before.
+
+ This method should be called before process definition submit to java
gateway
+ For now, we have below checker:
+ * `self.param` should be set if task `switch` in this workflow.
+ """
+ if (
+ any([task.task_type == TaskType.SWITCH for task in
self.tasks.values()])
+ and self.param is None
+ ):
+ raise PyDSParamException(
+ "Parameter param must be provider if task Switch in process
definition."
+ )
+
def submit(self) -> int:
"""Submit ProcessDefinition instance to java gateway."""
self._ensure_side_model_exists()
+ self._pre_submit_check()
+
gateway = launch_gateway()
self._process_definition_code =
gateway.entry_point.createOrUpdateProcessDefinition(
self._user,
self._project,
self.name,
str(self.description) if self.description else "",
- str(self.param) if self.param else None,
+ json.dumps(self.param_json),
json.dumps(self.schedule_json) if self.schedule_json else None,
json.dumps(self.task_location),
self.timeout,
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 8491878..694f9e4 100644
---
a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -19,6 +19,7 @@
from datetime import datetime
from typing import Any
+from unittest.mock import patch
import pytest
from freezegun import freeze_time
@@ -30,10 +31,12 @@ from pydolphinscheduler.constants import (
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.side import Project, Tenant, User
+from pydolphinscheduler.tasks.switch import Branch, Default, Switch,
SwitchCondition
from pydolphinscheduler.utils.date import conv_to_schedule
from tests.testing.task import Task
TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
+TEST_TASK_TYPE = "test-task-type"
@pytest.mark.parametrize("func", ["run", "submit", "start"])
@@ -151,6 +154,80 @@ def test__parse_datetime_not_support_type(val: Any):
pd._parse_datetime(val)
[email protected](
+ "param, expect",
+ [
+ (
+ None,
+ None,
+ ),
+ (
+ {},
+ None,
+ ),
+ (
+ {"key1": "val1"},
+ [
+ {
+ "prop": "key1",
+ "direct": "IN",
+ "type": "VARCHAR",
+ "value": "val1",
+ }
+ ],
+ ),
+ (
+ {
+ "key1": "val1",
+ "key2": "val2",
+ },
+ [
+ {
+ "prop": "key1",
+ "direct": "IN",
+ "type": "VARCHAR",
+ "value": "val1",
+ },
+ {
+ "prop": "key2",
+ "direct": "IN",
+ "type": "VARCHAR",
+ "value": "val2",
+ },
+ ],
+ ),
+ ],
+)
+def test_property_param_json(param, expect):
+ """Test ProcessDefinition's property param_json."""
+ pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
+ assert pd.param_json == expect
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test__pre_submit_check_switch_without_param(mock_code_version):
+ """Test :func:`_pre_submit_check` if process definition with switch but
without attribute param."""
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+ parent = Task(name="parent", task_type=TEST_TASK_TYPE)
+ switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
+ switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
+ switch_condition = SwitchCondition(
+ Branch(condition="${var} > 1", task=switch_child_1),
+ Default(task=switch_child_2),
+ )
+
+ switch = Switch(name="switch", condition=switch_condition)
+ parent >> switch
+ with pytest.raises(
+ PyDSParamException,
+ match="Parameter param must be provider if task Switch in process
definition.",
+ ):
+ pd._pre_submit_check()
+
+
def test_process_definition_get_define_without_task():
"""Test process definition function get_define without task."""
expect = {