This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 20b2a2b [feat] Add execute type to workflow (#9)
20b2a2b is described below
commit 20b2a2bdd1673a2ffaedfd48abc1fdfacc2d50e0
Author: Jay Chung <[email protected]>
AuthorDate: Sat Nov 12 10:44:09 2022 +0800
[feat] Add execute type to workflow (#9)
Up to now, we can only submit workflow with parallel
mode. this patch give users ability specific execute
type to workflow
---
docs/source/concept.rst | 32 ++++++++++
docs/source/config.rst | 72 ++++++++++++-----------
src/pydolphinscheduler/configuration.py | 3 +
src/pydolphinscheduler/core/process_definition.py | 39 +++++++++++-
src/pydolphinscheduler/default_config.yaml | 3 +
src/pydolphinscheduler/java_gateway.py | 2 +-
tests/core/test_process_definition.py | 19 ++++++
tests/utils/test_yaml_parser.py | 1 +
8 files changed, 134 insertions(+), 37 deletions(-)
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 9a9527d..de49c9c 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -90,6 +90,38 @@ Tenant is the user who run task command in machine or in
virtual machine. it cou
Make should tenant exists in target machine, otherwise it will raise an
error when you try to run command
+Execution Type
+~~~~~~~~~~~~~~
+
+Decision which behavior to run when process definition have multiple
instances. when process definition
+schedule interval is too short, it may cause multiple instances run at the
same time. We can use this
+parameter to control the behavior about how to run those process definition
instances. Currently we
+have four execution type:
+
+* ``parallel`` (default value): it means all instances will allow to run even
though the previous
+ instance is not finished.
+* ``serial_wait``: it means the all instance will wait for the previous
instance to finish, and
+ all the waiting instances will be executed base on scheduling order.
+* ``serial_discard``: it means the all instance will be discard(abandon) if
the previous instance
+ is not finished.
+* ``serial_priority``: it means the all instance will wait for the previous
instance to finish,
+ and all the waiting instances will be executed base on process definition
priority order.
+
+Parameter ``execution type`` can be set in
+
+* Direct assign statement. You can pick execute type from above and direct
assign to parameter
+ ``execution_type``.
+
+ .. code-block:: python
+
+ pd = ProcessDefinition(
+ name="process-definition",
+ execution_type="parallel"
+ )
+
+* Via environment variables, configurations file setting, for more detail
about those way setting, you can see
+ you can read :doc:`config` section.
+
Tasks
-----
diff --git a/docs/source/config.rst b/docs/source/config.rst
index 29a143d..3f7fff8 100644
--- a/docs/source/config.rst
+++ b/docs/source/config.rst
@@ -78,41 +78,43 @@ All Configurations in Environment Variables
All environment variables as below, and you could modify their value via `Bash
<by bash>`_ or `Python OS Module <by python os module>`_
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Variable Section | Variable Name | description
|
-+==================+====================================+====================================================================================================================+
-| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway
address, will use its value when it is set.
|
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway
port, will use its value when it is set.
|
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java
gateway auto convert, will use its value when it is set.
|
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_USER_NAME`` | Default user name,
will use when user's ``name`` when does not specify.
|
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_USER_PASSWORD`` | Default user
password, will use when user's ``password`` when does not specify.
|
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Default User | ``PYDS_USER_EMAIL`` | Default user email,
will use when user's ``email`` when does not specify.
|
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_USER_PHONE`` | Default user phone,
will use when user's ``phone`` when does not specify.
|
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_USER_STATE`` | Default user state,
will use when user's ``state`` when does not specify.
|
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow
project name, will use its value when workflow does not specify the attribute
``project``. |
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_TENANT`` | Default workflow
tenant, will use its value when workflow does not specify the attribute
``tenant``. |
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow
user, will use its value when workflow does not specify the attribute ``user``.
|
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow
queue, will use its value when workflow does not specify the attribute
``queue``. |
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow
worker group, will use its value when workflow does not specify the attribute
``worker_group``. |
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow
release state, will use its value when workflow does not specify the attribute
``release_state``. |
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow
worker group, will use its value when workflow does not specify the attribute
``timezone``. |
-+
+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow
warning type, will use its value when workflow does not specify the attribute
``warning_type``. |
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Variable Section | Variable Name | description
|
++==================+====================================+=====================================================================================================================+
+| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway
address, will use its value when it is set.
|
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway
port, will use its value when it is set.
|
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java
gateway auto convert, will use its value when it is set.
|
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_NAME`` | Default user name,
will use when user's ``name`` when does not specify.
|
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_PASSWORD`` | Default user
password, will use when user's ``password`` when does not specify.
|
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Default User | ``PYDS_USER_EMAIL`` | Default user email,
will use when user's ``email`` when does not specify.
|
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_PHONE`` | Default user phone,
will use when user's ``phone`` when does not specify.
|
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_STATE`` | Default user state,
will use when user's ``state`` when does not specify.
|
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow
project name, will use its value when workflow does not specify the attribute
``project``. |
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_TENANT`` | Default workflow
tenant, will use its value when workflow does not specify the attribute
``tenant``. |
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow
user, will use its value when workflow does not specify the attribute ``user``.
|
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow
queue, will use its value when workflow does not specify the attribute
``queue``. |
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow
worker group, will use its value when workflow does not specify the attribute
``worker_group``. |
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow
release state, will use its value when workflow does not specify the attribute
``release_state``. |
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow
worker group, will use its value when workflow does not specify the attribute
``timezone``. |
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow
warning type, will use its value when workflow does not specify the attribute
``warning_type``. |
++
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_EXECUTION_TYPE`` | Default workflow
execution type, will use its value when workflow does not specify the attribute
``execution_type``.|
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
.. note::
diff --git a/src/pydolphinscheduler/configuration.py
b/src/pydolphinscheduler/configuration.py
index 860f986..2f0c2c0 100644
--- a/src/pydolphinscheduler/configuration.py
+++ b/src/pydolphinscheduler/configuration.py
@@ -189,5 +189,8 @@ WORKFLOW_TIME_ZONE = os.environ.get(
WORKFLOW_WARNING_TYPE = os.environ.get(
"PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
)
+WORKFLOW_EXECUTION_TYPE = os.environ.get(
+ "PYDS_WORKFLOW_EXECUTION_TYPE",
configs.get("default.workflow.execution_type")
+)
# End Common Configuration Setting
diff --git a/src/pydolphinscheduler/core/process_definition.py
b/src/pydolphinscheduler/core/process_definition.py
index 62de7ed..3487435 100644
--- a/src/pydolphinscheduler/core/process_definition.py
+++ b/src/pydolphinscheduler/core/process_definition.py
@@ -57,6 +57,20 @@ class ProcessDefinition(Base):
TODO: maybe we should rename this class, currently use DS object name.
+ :param execution_type: Decision which behavior to run when process
definition have multiple instances.
+ when process definition schedule interval is too short, it may cause
multiple instances run at the
+ same time. We can use this parameter to control the behavior about how
to run those process definition
+ instances. Currently we have four execution type:
+
+ * ``PARALLEL``: Default value, all instances will allow to run even
though the previous
+ instance is not finished.
+ * ``SERIAL_WAIT``: All instance will wait for the previous instance
to finish, and all
+ the waiting instances will be executed base on scheduling order.
+ * ``SERIAL_DISCARD``: All instances will be discard(abandon) if the
previous instance is not
+ finished.
+ * ``SERIAL_PRIORITY``: means the all instance will wait for the
previous instance to finish, and
+ all the waiting instances will be executed base on process
definition priority order.
+
:param user: The user for current process definition. Will create a new
one if it do not exists. If your
parameter ``project`` already exists but project's create do not
belongs to ``user``, will grant
``project`` to ``user`` automatically.
@@ -86,6 +100,7 @@ class ProcessDefinition(Base):
"worker_group",
"warning_type",
"warning_group_id",
+ "execution_type",
"timeout",
"release_state",
"param",
@@ -109,6 +124,7 @@ class ProcessDefinition(Base):
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
warning_group_id: Optional[int] = 0,
+ execution_type: Optional[str] = configuration.WORKFLOW_EXECUTION_TYPE,
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
@@ -132,6 +148,17 @@ class ProcessDefinition(Base):
else:
self.warning_type = warning_type.strip().upper()
self.warning_group_id = warning_group_id
+ if execution_type is None or execution_type.strip().upper() not in (
+ "PARALLEL",
+ "SERIAL_WAIT",
+ "SERIAL_DISCARD",
+ "SERIAL_PRIORITY",
+ ):
+ raise PyDSParamException(
+ "Parameter `execution_type` with unexpect value `%s`",
execution_type
+ )
+ else:
+ self._execution_type = execution_type
self.timeout = timeout
self._release_state = release_state
self.param = param
@@ -225,6 +252,16 @@ class ProcessDefinition(Base):
"""Set attribute release_state."""
self._release_state = val.lower()
+ @property
+ def execution_type(self) -> str:
+ """Get attribute execution_type."""
+ return self._execution_type.upper()
+
+ @execution_type.setter
+ def execution_type(self, val: str) -> None:
+ """Set attribute execution_type."""
+ self._execution_type = val
+
@property
def param_json(self) -> Optional[List[Dict]]:
"""Return param json base on self.param."""
@@ -390,6 +427,7 @@ class ProcessDefinition(Base):
json.dumps(self.param_json),
self.warning_type,
self.warning_group_id,
+ self.execution_type,
self.timeout,
self.worker_group,
self._tenant,
@@ -399,7 +437,6 @@ class ProcessDefinition(Base):
json.dumps(self.task_definition_json),
json.dumps(self.schedule_json) if self.schedule_json else None,
None,
- None,
)
if len(self.resource_list) > 0:
for res in self.resource_list:
diff --git a/src/pydolphinscheduler/default_config.yaml
b/src/pydolphinscheduler/default_config.yaml
index 98d7b99..5ad3064 100644
--- a/src/pydolphinscheduler/default_config.yaml
+++ b/src/pydolphinscheduler/default_config.yaml
@@ -56,3 +56,6 @@ default:
# change to ``FAILURE`` if you want to warn users when workflow failed.
All available enum value are
# ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL``
warning_type: NONE
+ # Default execution type about how to run multiple workflow instances,
default value is ``parallel`` which
+ # mean run all workflow instances parallel and the other value is
``SERIAL_WAIT``, ``SERIAL_DISCARD``, ``SERIAL_PRIORITY``
+ execution_type: parallel
diff --git a/src/pydolphinscheduler/java_gateway.py
b/src/pydolphinscheduler/java_gateway.py
index 54bb0a3..cd03d32 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -254,6 +254,7 @@ class JavaGate:
global_params: str,
warning_type: str,
warning_group_id: int,
+ execution_type: str,
timeout: int,
worker_group: str,
tenant_code: str,
@@ -262,7 +263,6 @@ class JavaGate:
task_definition_json: str,
schedule: Optional[str] = None,
other_params_json: Optional[str] = None,
- execution_type: Optional[str] = None,
):
"""Create or update process definition through java gateway."""
return self.java_gateway.entry_point.createOrUpdateProcessDefinition(
diff --git a/tests/core/test_process_definition.py
b/tests/core/test_process_definition.py
index 30445bf..c8fffc2 100644
--- a/tests/core/test_process_definition.py
+++ b/tests/core/test_process_definition.py
@@ -67,6 +67,7 @@ def test_process_definition_key_attr(func):
("worker_group", configuration.WORKFLOW_WORKER_GROUP),
("warning_type", configuration.WORKFLOW_WARNING_TYPE),
("warning_group_id", 0),
+ ("execution_type", configuration.WORKFLOW_EXECUTION_TYPE.upper()),
("release_state", 1),
],
)
@@ -89,6 +90,7 @@ def test_process_definition_default_value(name, value):
("worker_group", str, "worker_group"),
("warning_type", str, "FAILURE"),
("warning_group_id", int, 1),
+ ("execution_type", str, "PARALLEL"),
("timeout", int, 1),
("param", dict, {"key": "value"}),
(
@@ -210,6 +212,22 @@ def test_warn_type_not_support_type(val: str):
ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
[email protected](
+ "val",
+ [
+ "ALLL",
+ "",
+ None,
+ ],
+)
+def test_execute_type_not_support_type(val: str):
+ """Test process definition param execute_type not support type error."""
+ with pytest.raises(
+ PyDSParamException, match="Parameter `execution_type` with unexpect
value.*?"
+ ):
+ ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, execution_type=val)
+
+
@pytest.mark.parametrize(
"param, expect",
[
@@ -321,6 +339,7 @@ def test_process_definition_get_define_without_task():
"workerGroup": configuration.WORKFLOW_WORKER_GROUP,
"warningType": configuration.WORKFLOW_WARNING_TYPE,
"warningGroupId": 0,
+ "executionType": "PARALLEL",
"timeout": 0,
"releaseState": 1,
"param": None,
diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py
index ad3aaf7..3abdda6 100644
--- a/tests/utils/test_yaml_parser.py
+++ b/tests/utils/test_yaml_parser.py
@@ -63,6 +63,7 @@ expects = [
"default.workflow.release_state": ("online", "offline"),
"default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"),
"default.workflow.warning_type": ("NONE", "SUCCESS"),
+ "default.workflow.execution_type": ("parallel", "serial_wait"),
},
]