This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new ed1e3c6 feat: Add online_schedule for workflow control schedule state
(#73)
ed1e3c6 is described below
commit ed1e3c6879fb7cb275e59dfc9482c46c92c3283f
Author: Jay Chung <[email protected]>
AuthorDate: Fri Feb 24 15:41:31 2023 +0800
feat: Add online_schedule for workflow control schedule state (#73)
close: #71
---
src/pydolphinscheduler/core/workflow.py | 13 +++++++++++++
src/pydolphinscheduler/java_gateway.py | 2 ++
tests/core/test_workflow.py | 16 ++++++++++++++++
3 files changed, 31 insertions(+)
diff --git a/src/pydolphinscheduler/core/workflow.py
b/src/pydolphinscheduler/core/workflow.py
index 169b5b8..582faa6 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -57,6 +57,11 @@ class Workflow(Base):
TODO: maybe we should rename this class, currently use DS object name.
+ :param online_schedule: Whether the online workflow is schedule. It will
be automatically configured
+ according to :param:``schedule`` configuration. If the
:param:``schedule`` is assigned with valid
+ value, :param:``online_schedule`` will be set to ``True``. But you can
also manually specify
+ :param:``online_schedule``. For example if you only want to set the
workflow :param:``schedule`` but
+ do not want to online the workflow schedule, you can set
:param:``online_schedule`` to ``False``.
:param execution_type: Decision which behavior to run when workflow have
multiple instances.
when workflow schedule interval is too short, it may cause multiple
instances run at the
same time. We can use this parameter to control the behavior about how
to run those workflows
@@ -115,6 +120,7 @@ class Workflow(Base):
name: str,
description: Optional[str] = None,
schedule: Optional[str] = None,
+ online_schedule: Optional[bool] = None,
start_time: Optional[Union[str, datetime]] = None,
end_time: Optional[Union[str, datetime]] = None,
timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
@@ -143,6 +149,12 @@ class Workflow(Base):
self._EXPECT_SCHEDULE_CHAR_NUM,
schedule,
)
+
+ # handle workflow schedule state according to init value
+ if self.schedule and online_schedule is None:
+ self.online_schedule = True
+ else:
+ self.online_schedule = online_schedule or False
self._start_time = start_time
self._end_time = end_time
self.timezone = timezone
@@ -440,6 +452,7 @@ class Workflow(Base):
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),
json.dumps(self.schedule_json) if self.schedule_json else None,
+ self.online_schedule,
None,
)
return self._workflow_code
diff --git a/src/pydolphinscheduler/java_gateway.py
b/src/pydolphinscheduler/java_gateway.py
index 92ad19f..7ca6a79 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -264,6 +264,7 @@ class GatewayEntryPoint:
task_relation_json: str,
task_definition_json: str,
schedule: Optional[str] = None,
+ online_schedule: Optional[bool] = None,
other_params_json: Optional[str] = None,
):
"""Create or update workflow through java gateway."""
@@ -274,6 +275,7 @@ class GatewayEntryPoint:
description,
global_params,
schedule,
+ online_schedule,
warning_type,
warning_group_id,
timeout,
diff --git a/tests/core/test_workflow.py b/tests/core/test_workflow.py
index c96b0ec..d3f83f5 100644
--- a/tests/core/test_workflow.py
+++ b/tests/core/test_workflow.py
@@ -123,6 +123,22 @@ def test_set_release_state(value, expect):
), "Workflow set attribute release_state do not return expect value."
[email protected](
+ "value,expect",
+ [
+ ({}, False),
+ ({"schedule": "0 0 0 * * ? *"}, True),
+ ({"schedule": "0 0 0 * * ? *", "online_schedule": False}, False),
+ ],
+)
+def test_set_online_schedule(value, expect):
+ """Test workflow set online_schedule attributes."""
+ with Workflow(TEST_WORKFLOW_NAME, **value) as workflow:
+ assert (
+ getattr(workflow, "online_schedule") == expect
+ ), f"Workflow attribute online_schedule do not in expect value with
case {value}."
+
+
@pytest.mark.parametrize(
"value",
[